Mesos libprocess —— 初探(1)

libprocess 是 mesos 主要使用基础库,mesos许多逻辑架构在此之上。本文记录最近阅读源码的收获,文中有纰漏之处,欢迎批评指正。

Actor 模型

Actor 模型最早在上世纪70年代提出.它把一切皆视为Actor, 每个Actor都是独立运行的进程或者线程,Actors 之间通过消息解耦,Actor内部采用串行的处理一条条消息,外部无法访问到Actor内部的数据,因此规避了锁和同步的问题。因此既能够利用多核处理器并发的能力,也能避免锁的滥用.

落实到Actor 运行调度实现,可以是用户态线程,内核态线程,或干脆是进程.

libprocess 参考 Erlang 启发而来.

Process 介绍

Process就是 Actor. Process 包含一个事件队列,以及一系列触发事件的函数,如 dispatch, delay, send, defer等. Actor 中核心的一个概念MailBox, 对应在 process 中,就是eventQueue 事件队列.

当扩展 Mesos 功能时,最好也保持mesos Actor式设计一致性. 因此,创建一个继承自 ProtobufProcess 的自定义类来实现功能.

首先,看下 Process 的继承链:

1
ProtobufProcess -> Process -> ProcessBase -> EventConsumer

Process 的核心属性和方法定义在基类 ProcessBase 中,方法和内容很多.
着重介绍以下:

  • 状态.主要由 ProcessManager 调度时使用.
1
2
3
4
5
6
7
8
9
10
11
12
13
enum class State
{
BOTTOM, // Uninitialized but events may be enqueued.
BLOCKED, // Initialized, no events enqueued.
READY, // Initialized, events enqueued.
TERMINATING // Initialized, no more events will be enqueued.
};

std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM);

// Flag for indicating that a terminate event has been injected.
std::atomic<bool> termination = ATOMIC_VAR_INIT(false);

  • HttpEndpoint 与 handlers. 用于 Http 的消息,route绑定.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Definition of an HTTP endpoint. The endpoint can be
// associated with an authentication realm, in which case:
//
// (1) `realm` and `authenticatedHandler` will be set.
// Libprocess will perform HTTP authentication for
// all requests to this endpoint (by default during
// HttpEvent consumption). The authentication principal
// will be passed to the `authenticatedHandler`.
//
// Otherwise, if the endpoint is not associated with an
// authentication realm:
//
// (2) Only `handler` will be set, and no authentication
// takes place.
struct HttpEndpoint
{
Option<HttpRequestHandler> handler;

Option<std::string> realm;
Option<AuthenticatedHttpRequestHandler> authenticatedHandler;
RouteOptions options;
};
  • eventqueue. 接受 process 收到消息.
1
2
3
// Queue of received events. We employ the PIMPL idiom here and use
// a pointer so we can hide the implementation of `EventQueue`.
std::unique_ptr<EventQueue> events;

Process Events

Events 有一下几种:

  • MessageEvent: 包含 from_PID, to_PID, name, data.
  • DispatchEvent: 包含 to_PID, func,functype.
  • HttpEvent: 包含 httprequest, response.
  • ExitedEvent: 包含 to_PID.
  • TerminateEvent: 包含 from_PID, inject(bool).

Process 运行机制 —— ProcessManager

Process 运行由 ProcessManager 调度。ProcessManager 是一个静态实例,定义在src/process.cpp文件中,
ProcessManager 中有如下的成员.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Delegate process name to receive root HTTP requests.
const Option<string> delegate;

// Map of all local spawned and running processes.
hashmap<string, ProcessBase*> processes;
std::recursive_mutex processes_mutex;

// Queue of runnable processes.
//
// See run_queue.hpp for more information about the run queue
// implementation.
RunQueue runq;

// Number of running processes, to support Clock::settle operation.
std::atomic_long running;

// Stores the thread handles so that we can join during shutdown.
vector<std::thread*> threads;

// Boolean used to signal processing threads to stop running.
std::atomic_bool joining_threads;

// List of rules applied to all incoming HTTP requests.
vector<Owned<firewall::FirewallRule>> firewallRules;
std::recursive_mutex firewall_mutex;

// Whether the process manager is finalizing or not.
// If true, no further processes will be spawned.
std::atomic_bool finalizing;

// Filter. Synchronized support for using the filter needs to be
// recursive in case a filter wants to do anything fancy (which is
// possible and likely given that filters will get used for
// testing).
std::atomic<Filter*> filter = ATOMIC_VAR_INIT(nullptr);
std::recursive_mutex filter_mutex;

process 被 spawn 后,实际上是加入了 ProcessManager 的 runq.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
UPID ProcessManager::spawn(ProcessBase* process, bool manage){
...

// We save the PID before enqueueing the process to avoid the race
// condition that occurs when a user has a very short process and
// the process gets run and cleaned up before we return from enqueue
// (e.g., when 'manage' is set to true).
UPID pid = process->self();

// Add process to the run queue (so 'initialize' will get invoked).
enqueue(process);

VLOG(3) << "Spawned process " << pid;

return pid;

}

runq 是一个 concurrencyQueue ,有两个版本实现,可以通过参数进行选择(详见参考文献).
ProcessManager 维护一个 threads 数组,类似线程池.

ProcessManager 初始化时候会做这几件事:
· 创建全局实例 process_manager.
· 初始化 eventloop.
· init_threads().
· 初始化网络ip port socket.
· spawn profiler statics reaper help 等process.
等.

processManager 在 init_threads 时,对每个启动的 worker 线程写入 runloop 代码块. worker 线程工作过程:

· 不断从 processManager runq 中取出 process;
· 按顺序执行 process 的 eventqueue 中的 event;
· 执行完毕所有的 eventequeue 后,重新取出另一个 process;
· 如果 runq 为空则挂起.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
long ProcessManager::init_threads(){
...
// Create processing threads.
for (long i = 0; i < num_worker_threads; i++) {
// Retain the thread handles so that we can join when shutting down.
threads.emplace_back(new std::thread(
[this]() {
running.fetch_add(1);
do {
ProcessBase* process = dequeue();
if (process == nullptr) {
if (joining_threads.load()) {
break;
}
} else {
resume(process);
}
} while (true);
running.fetch_sub(1);
...
}));
}
....
}

resume 是 thread 中实际执行时间处理的函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void ProcessManager::resume(ProcessBase* process){
...
//检查状态
CHECK(state == ProcessBase::State::BOTTOM ||
state == ProcessBase::State::READY);

if (state == ProcessBase::State::BOTTOM) {
// In the event that the process throws an exception,
// we will abort the program.
//
// TODO(bmahler): Consider providing recovery mechanisms.
try {
process->initialize();
} catch (const std::exception& e) {
LOG(FATAL) << "Aborting libprocess: '" << process->pid << "'"
<< " threw exception during initialization: " << e.what();
} catch (...) {
LOG(FATAL) << "Aborting libprocess: '" << process->pid << "'"
<< " threw exception during initialization: unknown";
}

state = ProcessBase::State::READY;
process->state.store(state);
}

...

while (!terminate && !blocked) {
Event* event = nullptr;

// NOTE: the event queue requires only a _single_ consumer at a
// time ... this is where we act as that single consumer (and down
// in `ProcessManager::cleanup` which we call from here).

//获取事件
if (!process->events->consumer.empty()) {
event = process->events->consumer.dequeue();
} else {
...
}

process->Process(std::move(*event));
}
}

process 小例子

本文利用Process实现了一个小的mesos httpEndpoint服务.下面上代码.

  1. 定义自定义Process class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MyHttpProcess : public process::Process<MyHttpProcess> {
public:
//构造函数,初始化参数列表中设置pid,同时也是http api中的根path, eg ‘http://localhost:port/my/....’
MyHttpProcess() : process::ProcessBase("my") {}

//override initialize 虚函数,设置route和对应的handler
void initialize() {
route("/get", "help info", [this](const http::Request &request) {
return handler(request);
});
route("/post", "help info", [this](const http::Request &request) {
return postHandler(request);
});
}

//定义handler处理函数
Future <http::Response> handler(
const http::Request &request) {}
//定义handler处理函数
Future <http::Response> postHandler const http::Request &request) {}
}

  1. 启动 process. 启动 process 时机可以在任何你觉得合适的时机.但是 process 的生命周期要设计好. 这里依赖一个对象的创建和销毁来管理 process 的生命周期.
1
2
3
4
5
6
7
8
9
10
11
12
13
class MyObject {
public:
MyObject() : process(new MyHttpProcess()) {
process::spawn(process.get());
}

~MyObject() {
process::terminate(process.get());
process::wait(process.get());
}
private:
process::Owned <MyHttpProcess> process;
}

后续

后续详细介绍下 process 运行调度的边界情况, 以及介绍 libprocess 的事件流转相关特性.