ProtobufProcess -> Process -> ProcessBase -> EventConsumer
Process 的核心属性和方法定义在基类 ProcessBase 中,方法和内容很多. 着重介绍以下:
状态.主要由 ProcessManager 调度时使用.
1 2 3 4 5 6 7 8 9 10 11 12 13
enum classState { BOTTOM, // Uninitialized but events may be enqueued. BLOCKED, // Initialized, no events enqueued. READY, // Initialized, events enqueued. TERMINATING // Initialized, no more events will be enqueued. };
std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM);
// Flag for indicating that a terminate event has been injected. std::atomic<bool> termination = ATOMIC_VAR_INIT(false);
// Definition of an HTTP endpoint. The endpoint can be // associated with an authentication realm, in which case: // // (1) `realm` and `authenticatedHandler` will be set. // Libprocess will perform HTTP authentication for // all requests to this endpoint (by default during // HttpEvent consumption). The authentication principal // will be passed to the `authenticatedHandler`. // // Otherwise, if the endpoint is not associated with an // authentication realm: // // (2) Only `handler` will be set, and no authentication // takes place. structHttpEndpoint { Option<HttpRequestHandler> handler;
// Queue of received events. We employ the PIMPL idiom here and use // a pointer so we can hide the implementation of `EventQueue`. std::unique_ptr<EventQueue> events;
Process Events
Events 有一下几种:
MessageEvent: 包含 from_PID, to_PID, name, data.
DispatchEvent: 包含 to_PID, func,functype.
HttpEvent: 包含 httprequest, response.
ExitedEvent: 包含 to_PID.
TerminateEvent: 包含 from_PID, inject(bool).
Process 运行机制 —— ProcessManager
Process 运行由 ProcessManager 调度。ProcessManager 是一个静态实例,定义在src/process.cpp文件中, ProcessManager 中有如下的成员.
// Delegate process name to receive root HTTP requests. const Option<string> delegate;
// Map of all local spawned and running processes. hashmap<string, ProcessBase*> processes; std::recursive_mutex processes_mutex;
// Queue of runnable processes. // // See run_queue.hpp for more information about the run queue // implementation. RunQueue runq;
// Number of running processes, to support Clock::settle operation. std::atomic_long running;
// Stores the thread handles so that we can join during shutdown. vector<std::thread*> threads;
// Boolean used to signal processing threads to stop running. std::atomic_bool joining_threads;
// List of rules applied to all incoming HTTP requests. vector<Owned<firewall::FirewallRule>> firewallRules; std::recursive_mutex firewall_mutex;
// Whether the process manager is finalizing or not. // If true, no further processes will be spawned. std::atomic_bool finalizing;
// Filter. Synchronized support for using the filter needs to be // recursive in case a filter wants to do anything fancy (which is // possible and likely given that filters will get used for // testing). std::atomic<Filter*> filter = ATOMIC_VAR_INIT(nullptr); std::recursive_mutex filter_mutex;
// We save the PID before enqueueing the process to avoid the race // condition that occurs when a user has a very short process and // the process gets run and cleaned up before we return from enqueue // (e.g., when 'manage' is set to true). UPID pid = process->self();
// Add process to the run queue (so 'initialize' will get invoked). enqueue(process);
// longProcessManager::init_threads(){ ... // Create processing threads. for (long i = 0; i < num_worker_threads; i++) { // Retain the thread handles so that we can join when shutting down. threads.emplace_back(new std::thread( [this]() { running.fetch_add(1); do { ProcessBase* process = dequeue(); if (process == nullptr) { if (joining_threads.load()) { break; } } else { resume(process); } } while (true); running.fetch_sub(1); ... })); } .... }
if (state == ProcessBase::State::BOTTOM) { // In the event that the process throws an exception, // we will abort the program. // // TODO(bmahler): Consider providing recovery mechanisms. try { process->initialize(); } catch (const std::exception& e) { LOG(FATAL) << "Aborting libprocess: '" << process->pid << "'" << " threw exception during initialization: " << e.what(); } catch (...) { LOG(FATAL) << "Aborting libprocess: '" << process->pid << "'" << " threw exception during initialization: unknown"; }
state = ProcessBase::State::READY; process->state.store(state); }
...
while (!terminate && !blocked) { Event* event = nullptr;
// NOTE: the event queue requires only a _single_ consumer at a // time ... this is where we act as that single consumer (and down // in `ProcessManager::cleanup` which we call from here).