swooleコヒーレントソース解読
コンシステントの作成
以下のコードはswoole 4に基づく.4.5-alpha, php7.1.26
実行プロセスに従ってswooleコラボレーションの実装を徐々に分析し、phpプログラムは次のようにします.
goは実はswoole_coroutine_createの別名:
まずzif_を実行しますswoole_coroutine_createは協程を作成します.
php_coro_Argsは、コールバック関数情報を保存するための構造です.
php_corutine::get_task()は現在実行中のタスクを取得するために使用され、最初の実行時に初期化されたmain_を取得する.task:
save_taskは、現在のphpスタック情報をmain_を使用して現在使用されているtaskに保存します.taskなので、これらの情報はmain_に保存されます.task上:
php_coro_taskこの構造は、現在のタスクのphpスタックを保存するために使用されます.
現在のphpスタックを保存するとcoroutineの作成を開始できます.
まず、ctxオブジェクトが作成され、contextオブジェクトは主にcスタックを管理するために使用されます.
make_fcontext関数はboost.contextライブラリで提供されているのは、アセンブリによって作成され、異なるプラットフォームで実装されています.ここではmake_を使用しています.x86_64_sysv_elf_gas.Sというファイル:
参照用レジスタは、rdi、rsi、rdx、rcx、r 8、r 9の順である
make_fcontext関数の実行が完了すると、コンテキストを保存するためのメモリレイアウトは次のようになります.
Coroutineオブジェクトがインスタンス化された後、runメソッドの実行が開始されます.runメソッドは、前の関連メソッドを実行したCoroutineオブジェクトをoriginに格納し、currentを現在のオブジェクトに設定します.
次はcスタックを切り替えるコアメソッド、swap_inとswap_out、最下位もboost.contextライブラリが提供しているので、まず交換を見てみましょう.
jump_fcontextの実行が完了すると、元のスタックメモリレイアウトは次のようになります.
context_funcにはパラメータがありますjump_fcontext実行後にrdiに書き込むthisはcontext_にパラメータとして与えられますfunc使用、fn_、private_data_は、ctxを構築するときに入力されるパラメータです.
main_funcは、現在のコヒーレンスに新しい実行スタックを割り当て、インスタンス化されたばかりのCoroutineにバインドし、コヒーレンスのコールバック関数を実行します.
次にユーザコールバック関数生成のopcodeを実行し、Co::sleep(1)に実行するとSystem::sleep(seconds)が呼び出され、現在のcoroutineにタイミングイベントが登録され、コールバック関数はsleep_timeout:
yield関数はphpスタックとcスタックの切り替えを担当します
まずphpスタックの切り替えを見て、on_yieldは初期化時に登録された関数です
前のtaskを手に入れると、上に保存した実行情報でEGを復元できます.プログラムは簡単です.vmstackとcurrent_execute_dataを交換すればいいです.
このときphpスタック実行状態はgo()関数を呼び出したばかりの状態に戻り(main_task)、cスタック切り替えがどのように処理されているかを見てみましょう.
思い出してin関数、swap_ctx_保存実行swap_in時のrsp,ctx_保存していますmake_fcontext初期化されたスタックトップ位置、もう一度jump_を見てみましょうfcontext実行:
この時点でphpとcスタックはswap_を実行するまで復元されましたinの状態で、コードはzif_に戻ります.swoole_coroutine_createの実行が完了しました:
なぜならexecute_dataはmain_に切り替えられましたtask上の主協程opcodeなので、次のopcodeは「echo“a”」で、sleepの後ろのコードをスキップすることに相当します.
一定のタイミングで、タイマはsleep関数に登録されたコールバック関数sleep_を呼び出します.timeout(呼び出しタイミングは後述)、起動協程は運転を継続します.
zend_vmはその後のopcode'echo"a"'を読み出し、実行を続行します
現在コールバック中のopcodeがすべて実行された後、PHPCoroutine::main_funcは、前に登録したdeferを1回実行し、FILOの順番でリソースをクリーンアップします.
main_func実行完了後Context::context_に戻るfuncメソッドは、現在のコヒーレンスを終了としてマークし、もう一度swap_をします.outはさっきのswapに戻りますinのところ、つまりresumeメソッドで、その後目覚ましの協程が実行済みかどうかをチェックし、end_ツールバーの
closeメソッドは、このコラボレーションのために作成されたvm_をクリーンアップします.stack、同時にmain_に戻りますtask,このときcスタックとphpスタックはいずれもメインコヒーレンスに切り替えられた
Reactorスケジューリング
では、タイミングイベントはいつ実行されますか?これは内部のReactorイベントループによって実現され、以下に具体的な実現を見る.
スレッドを作成すると、reactorが初期化されているかどうかを判断し、初期化されていない場合はactivate関数を呼び出してreactorを初期化します.activate関数には、次の手順があります.
1.reactor構造を初期化し、各種コールバック関数を登録する(読み書きイベントは対応プラットフォームの効率が最も高い多重apiを採用し、統一的なコールバック関数にカプセル化することは異なるapiの実現の詳細を遮蔽するのに役立つ)
2.php_経由swoole_register_shutdown_function("SwooleEvent::rshutdown")request_に登録shutdownフェーズで呼び出された関数(phpのライフサイクルを思い出し、スクリプトが終了すると呼び出されます)は、実際にイベントループがこのフェーズで実行されます.
3.プリエンプトスケジューリングスレッドをオンにします(これは後述します)
request_shutdownフェーズでは、登録されたSwooleEvent::rshutdown関数、swoole_event_rshutdownは、前に登録したwait関数を実行します.
タイミングイベントの登録を見てみましょう.まずtimerを初期化します.
次に、イベントを追加します.
1.time._next_msecとreactor.timeout_msecは、すべてのタイマで最も短いタイムアウト時間(相対値)を維持しています.
2.tnode.exec_msecとtnodeは最小スタックで保存され、スタックトップの要素が最も早くタイムアウトした要素です.
タイミング登録が完了すると、イベントループの実行を待つことができます.epollを例に挙げます.
epoll_の使用waitはfd読み書きイベントを待ってreactor->timeout_へmsec、fdイベントが来るのを待つ
1.epoll_の場合waitタイムアウト時にfd読み書きイベントが取得されず、onTimeout関数を実行し、タイミングイベントを処理する
2.fdイベントがあればfd読み書きイベントを処理し、今回トリガーされたイベントを処理した後、次のループに進む
この期間にfdイベントがない場合、タイミングイベントが実行されます.onTimeoutは、以前に登録された関数reactior_です.timeout, swTimer_select関数は、現在期限切れのイベントを実行してループを終了し、上記のsleep_に実行します.timeout関数では、sleepがスリープしているため、コパスが実行され続けます.
ここまで、プロセス全体が紹介されました.まとめてみましょう.コヒーレンススケジューリングにアクティブに関与していない場合、コヒーレンスはIO/タイミングイベントの実行時にアクティブに譲渡され、対応するイベントを登録し、request_を通過するshutdownフェーズのイベントループはイベントが来るのを待っていて、コモンのresumeをトリガして、マルチコモンの同時の効果 を達成します IO/タイミングイベントは、必ずしも時間通りではない プリエンプトスヶジューリング
以上から分かるように、もし協程にIO/タイミングイベントがないならば、実際に協程はタイミングを切り替えていないので、CPU密集型のシーンに対して、いくつかの協程はCPUタイムスライスが得られないため餓死することができて、Swoole 4.4はプリエンプトスケジューリングを導入してこの問題を理解するためにこの問題を解決した.
vm interruptはphp 7である.1.0以降に導入された実行メカニズムで、swooleはこの特性を使用して実現されるプリエンプトスケジューリングです.
1.ZEND_VM_INTERRUPT_CHECKはコマンドがjumpとcallのときに実行します
2.ZEND_VM_INTERRUPT_CHECKではEG(vm_interrupt)というフラグビットがチェックされ、1の場合zend_がトリガーされますinterrupt_functionの実行
具体的な実装について説明します.
初期化:
1.元の割り込み関数を保存し、zend_interrupt_functionを新しい割り込み関数に置き換える
2.スレッドを開いてinterrupt_を実行thread_loop
3.interrupt_thread_loopでは5 msごとにEG(vm_interrupt)を1に設定
割り込み関数coro_interrupt_functionは、現在のコヒーレンスがスケジューリング可能かどうかを確認し(前回の切り替え時間から10 msを超える)、できれば現在のコヒーレンスを直接譲り、プリエンプトスケジューリングを完了します
以下のコードはswoole 4に基づく.4.5-alpha, php7.1.26
実行プロセスに従ってswooleコラボレーションの実装を徐々に分析し、phpプログラムは次のようにします.
goは実はswoole_coroutine_createの別名:
PHP_FALIAS(go, swoole_coroutine_create, arginfo_swoole_coroutine_create);
まずzif_を実行しますswoole_coroutine_createは協程を作成します.
//
PHP_FUNCTION(swoole_coroutine_create)
{
...
//
ZEND_PARSE_PARAMETERS_START(1, -1)
Z_PARAM_FUNC(fci, fci_cache)
Z_PARAM_VARIADIC('*', fci.params, fci.param_count)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);
...
long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
if (sw_likely(cid > 0))
{
RETURN_LONG(cid);
}
else
{
RETURN_FALSE;
}
}
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
...
//
php_coro_args php_coro_args;
php_coro_args.fci_cache = fci_cache;
php_coro_args.argv = argv;
php_coro_args.argc = argc;
save_task(get_task()); // php task
// coroutine
return Coroutine::create(main_func, (void*) &php_coro_args);
}
php_coro_Argsは、コールバック関数情報を保存するための構造です.
// go()
struct php_coro_args
{
zend_fcall_info_cache *fci_cache; //
zval *argv; //
uint32_t argc; //
};
php_corutine::get_task()は現在実行中のタスクを取得するために使用され、最初の実行時に初期化されたmain_を取得する.task:
php_coro_task PHPCoroutine::main_task = {0};
// task, task
static inline php_coro_task* get_task()
{
php_coro_task *task = (php_coro_task *) Coroutine::get_current_task();
return task ? task : &main_task;
}
static inline void* get_current_task()
{
return sw_likely(current) ? current->get_task() : nullptr;
}
inline void* get_task()
{
return task;
}
save_taskは、現在のphpスタック情報をmain_を使用して現在使用されているtaskに保存します.taskなので、これらの情報はmain_に保存されます.task上:
void PHPCoroutine::save_task(php_coro_task *task)
{
save_vm_stack(task); // php
...
}
inline void PHPCoroutine::save_vm_stack(php_coro_task *task)
{
task->bailout = EG(bailout);
task->vm_stack_top = EG(vm_stack_top); //
task->vm_stack_end = EG(vm_stack_end); //
task->vm_stack = EG(vm_stack); //
task->vm_stack_page_size = EG(vm_stack_page_size);
task->error_handling = EG(error_handling);
task->exception_class = EG(exception_class);
task->exception = EG(exception);
}
php_coro_taskこの構造は、現在のタスクのphpスタックを保存するために使用されます.
struct php_coro_task
{
JMP_BUF *bailout; //
zval *vm_stack_top; //
zval *vm_stack_end; //
zend_vm_stack vm_stack; //
size_t vm_stack_page_size;
zend_execute_data *execute_data;
zend_error_handling_t error_handling;
zend_class_entry *exception_class;
zend_object *exception;
zend_output_globals *output_ptr;
/* for array_walk non-reentrancy */
php_swoole_fci *array_walk_fci;
swoole::Coroutine *co; // coroutine
std::stack *defer_tasks;
long pcid;
zend_object *context;
int64_t last_msec;
zend_bool enable_scheduler;
};
現在のphpスタックを保存するとcoroutineの作成を開始できます.
static inline long create(coroutine_func_t fn, void* args = nullptr)
{
return (new Coroutine(fn, args))->run();
}
Coroutine(coroutine_func_t fn, void *private_data) :
ctx(stack_size, fn, private_data) // stack size 2M
{
cid = ++last_cid; // id
coroutines[cid] = this; // corutines
if (sw_unlikely(count() > peak_num)) //
{
peak_num = count();
}
}
まず、ctxオブジェクトが作成され、contextオブジェクトは主にcスタックを管理するために使用されます.
#define SW_DEFAULT_C_STACK_SIZE (2 *1024 * 1024)
size_t Coroutine::stack_size = SW_DEFAULT_C_STACK_SIZE;
ctx(stack_size, fn, private_data)
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) :
fn_(fn), stack_size_(stack_size), private_data_(private_data)
{
end_ = false; //
swap_ctx_ = nullptr;
stack_ = (char*) sw_malloc(stack_size_); // c , 2M
...
void* sp = (void*) ((char*) stack_ + stack_size_); //
ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func); //
}
make_fcontext関数はboost.contextライブラリで提供されているのは、アセンブリによって作成され、異なるプラットフォームで実装されています.ここではmake_を使用しています.x86_64_sysv_elf_gas.Sというファイル:
参照用レジスタは、rdi、rsi、rdx、rcx、r 8、r 9の順である
make_fcontext:
/* first arg of make_fcontext() == top of context-stack */
/* rax = sp */
movq %rdi, %rax
/* shift address in RAX to lower 16 byte boundary */
/* rax = rax & -16 => rax = rax & (~0x10000 + 1) => rax = rax - rax%16, 16 */
andq $-16, %rax
/* reserve space for context-data on context-stack */
/* size for fc_mxcsr .. RIP + return-address for context-function */
/* on context-function entry: (RSP -0x8) % 16 == 0 */
/*lea “load effective address” ,
,lea ,
:lea eax,[ebx+8] ebx+8 eax, ebx+8 eax。
mov , :mov eax,[ebx+8] ebx+8 eax。*/
/* rax = rax - 0x48, 0x48 */
leaq -0x48(%rax), %rax
/* third arg of make_fcontext() == address of context-function */
/* context_func rax+0x38 */
movq %rdx, 0x38(%rax)
/* save MMX control- and status-word */
stmxcsr (%rax)
/* save x87 control-word */
fnstcw 0x4(%rax)
/* compute abs address of label finish */
/*
https://sourceware.org/binutils/docs/as/i386_002dMemory.html
The x86-64 architecture adds an RIP (instruction pointer relative) addressing.
This addressing mode is specified by using ‘rip’ as a base register. Only constant offsets are valid. For example:
AT&T: ‘1234(%rip)’, Intel: ‘[rip + 1234]’
Points to the address 1234 bytes past the end of the current instruction.
AT&T: ‘symbol(%rip)’, Intel: ‘[rip + symbol]’
Points to the symbol in RIP relative way, this is shorter than the default absolute addressing.
*/
/* rcx = finish */
leaq finish(%rip), %rcx
/* save address of finish as return-address for context-function */
/* will be entered after context-function returns */
/* finish rax+0x40 */
movq %rcx, 0x40(%rax)
/*return rax*/
ret /* return pointer to context-data */
finish:
/* exit code is zero */
xorq %rdi, %rdi
/* exit application */
call _exit@PLT
hlt
make_fcontext関数の実行が完了すると、コンテキストを保存するためのメモリレイアウトは次のようになります.
/****************************************************************************************
* |
Coroutineオブジェクトがインスタンス化された後、runメソッドの実行が開始されます.runメソッドは、前の関連メソッドを実行したCoroutineオブジェクトをoriginに格納し、currentを現在のオブジェクトに設定します.
static sw_co_thread_local Coroutine* current;
Coroutine *origin;
inline long run()
{
long cid = this->cid;
origin = current; // orign
current = this; // current
ctx.swap_in(); //
...
}
次はcスタックを切り替えるコアメソッド、swap_inとswap_out、最下位もboost.contextライブラリが提供しているので、まず交換を見てみましょう.
bool Context::swap_in()
{
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
return true;
}
// jump_x86_64_sysv_elf_gas.S
jump_fcontext:
/* , , rbp rip, call jump_fcontext push rip, jmp jump_fcontext. */
/* rip , jump_fcontext return true */
pushq %rbp /* save RBP */
pushq %rbx /* save RBX */
pushq %r15 /* save R15 */
pushq %r14 /* save R14 */
pushq %r13 /* save R13 */
pushq %r12 /* save R12 */
/* prepare stack for FPU */
leaq -0x8(%rsp), %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 1f
/* save MMX control- and status-word */
stmxcsr (%rsp)
/* save x87 control-word */
fnstcw 0x4(%rsp)
1:
/* store RSP (pointing to context-data) in RDI */
/* *swap_ctx_ = rsp, */
movq %rsp, (%rdi)
/* restore RSP (pointing to context-data) from RSI */
/* rsp = ctx_, make_fcontext */
movq %rsi, %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 2f
/* restore MMX control- and status-word */
ldmxcsr (%rsp)
/* restore x87 control-word */
fldcw 0x4(%rsp)
2:
/* prepare stack for FPU */
leaq 0x8(%rsp), %rsp
/* , */
popq %r12 /* restrore R12 */
popq %r13 /* restrore R13 */
popq %r14 /* restrore R14 */
popq %r15 /* restrore R15 */
popq %rbx /* restrore RBX */
popq %rbp /* restrore RBP */
/* restore return-address */
/* r8 = make_fcontext( make_fcontext ) */
popq %r8
/* use third arg as return-value after jump */
/* rax = this */
movq %rdx, %rax
/* use third arg as first arg in context function */
/* rdi = this */
movq %rdx, %rdi
/* indirect jump to context */
/* context_func */
jmp *%r8
jump_fcontextの実行が完了すると、元のスタックメモリレイアウトは次のようになります.
/****************************************************************************************
* |
context_funcにはパラメータがありますjump_fcontext実行後にrdiに書き込むthisはcontext_にパラメータとして与えられますfunc使用、fn_、private_data_は、ctxを構築するときに入力されるパラメータです.
void Context::context_func(void *arg)
{
Context *_this = (Context *) arg;
_this->fn_(_this->private_data_); // main_func(php_coro_args)
_this->end_ = true;
_this->swap_out();
}
main_funcは、現在のコヒーレンスに新しい実行スタックを割り当て、インスタンス化されたばかりのCoroutineにバインドし、コヒーレンスのコールバック関数を実行します.
void PHPCoroutine::main_func(void *arg)
{
...
// EG vmstack, go() , main_task
vm_stack_init();
call = (zend_execute_data *) (EG(vm_stack_top));
task = (php_coro_task *) EG(vm_stack_top);
EG(vm_stack_top) = (zval *) ((char *) call + PHP_CORO_TASK_SLOT * sizeof(zval)); // task
call = zend_vm_stack_push_call_frame(call_info, func, argc, object_or_called_scope); //
EG(bailout) = NULL;
EG(current_execute_data) = call;
EG(error_handling) = EH_NORMAL;
EG(exception_class) = NULL;
EG(exception) = NULL;
save_vm_stack(task); // vmstack task
record_last_msec(task); //
task->output_ptr = NULL;
task->array_walk_fci = NULL;
task->co = Coroutine::get_current(); // coroutine
task->co->set_task((void *) task); // coroutine task
task->defer_tasks = nullptr;
task->pcid = task->co->get_origin_cid(); // id
task->context = nullptr;
task->enable_scheduler = 1;
if (EXPECTED(func->type == ZEND_USER_FUNCTION))
{
...
// execute_data
zend_init_func_execute_data(call, &func->op_array, retval);
//
zend_execute_ex(EG(current_execute_data));
}
...
}
次にユーザコールバック関数生成のopcodeを実行し、Co::sleep(1)に実行するとSystem::sleep(seconds)が呼び出され、現在のcoroutineにタイミングイベントが登録され、コールバック関数はsleep_timeout:
int System::sleep(double sec)
{
Coroutine* co = Coroutine::get_current_safe(); // coroutine
if (swoole_timer_add((long) (sec * 1000), SW_FALSE, sleep_timeout, co) == NULL) // couroutine
{
return -1;
}
co->yield(); //
return 0;
}
//
static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
((Coroutine *) tnode->data)->resume();
}
yield関数はphpスタックとcスタックの切り替えを担当します
void Coroutine::yield()
{
SW_ASSERT(current == this || on_bailout != nullptr);
state = SW_CORO_WAITING; // waiting
if (sw_likely(on_yield))
{
on_yield(task); // php
}
current = origin; //
ctx.swap_out(); // c
}
まずphpスタックの切り替えを見て、on_yieldは初期化時に登録された関数です
void PHPCoroutine::init()
{
Coroutine::set_on_yield(on_yield);
Coroutine::set_on_resume(on_resume);
Coroutine::set_on_close(on_close);
}
void PHPCoroutine::on_yield(void *arg)
{
php_coro_task *task = (php_coro_task *) arg; // task
php_coro_task *origin_task = get_origin_task(task); // task
save_task(task); //
restore_task(origin_task); //
}
前のtaskを手に入れると、上に保存した実行情報でEGを復元できます.プログラムは簡単です.vmstackとcurrent_execute_dataを交換すればいいです.
void PHPCoroutine::restore_task(php_coro_task *task)
{
restore_vm_stack(task);
...
}
inline void PHPCoroutine::restore_vm_stack(php_coro_task *task)
{
EG(bailout) = task->bailout;
EG(vm_stack_top) = task->vm_stack_top;
EG(vm_stack_end) = task->vm_stack_end;
EG(vm_stack) = task->vm_stack;
EG(vm_stack_page_size) = task->vm_stack_page_size;
EG(current_execute_data) = task->execute_data;
EG(error_handling) = task->error_handling;
EG(exception_class) = task->exception_class;
EG(exception) = task->exception;
...
}
このときphpスタック実行状態はgo()関数を呼び出したばかりの状態に戻り(main_task)、cスタック切り替えがどのように処理されているかを見てみましょう.
bool Context::swap_out()
{
jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true);
return true;
}
思い出してin関数、swap_ctx_保存実行swap_in時のrsp,ctx_保存していますmake_fcontext初期化されたスタックトップ位置、もう一度jump_を見てみましょうfcontext実行:
// jump_x86_64_sysv_elf_gas.S
jump_fcontext:
/* , , rbp rip, call jump_fcontext push rip, jmp jump_fcontext. */
/* rip , swap_out jump_fcontext return true */
pushq %rbp /* save RBP */
pushq %rbx /* save RBX */
pushq %r15 /* save R15 */
pushq %r14 /* save R14 */
pushq %r13 /* save R13 */
pushq %r12 /* save R12 */
/* prepare stack for FPU */
leaq -0x8(%rsp), %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 1f
/* save MMX control- and status-word */
stmxcsr (%rsp)
/* save x87 control-word */
fnstcw 0x4(%rsp)
1:
/* store RSP (pointing to context-data) in RDI */
/* *ctx_ = rsp, */
movq %rsp, (%rdi)
/* restore RSP (pointing to context-data) from RSI */
/* rsp = swap_ctx_, swap_in rsp */
movq %rsi, %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 2f
/* restore MMX control- and status-word */
ldmxcsr (%rsp)
/* restore x87 control-word */
fldcw 0x4(%rsp)
2:
/* prepare stack for FPU */
leaq 0x8(%rsp), %rsp
/* swap_in */
popq %r12 /* restrore R12 */
popq %r13 /* restrore R13 */
popq %r14 /* restrore R14 */
popq %r15 /* restrore R15 */
popq %rbx /* restrore RBX */
popq %rbp /* restrore RBP */
/* restore return-address */
/* r8 = Context::swap_in::return true */
popq %r8
/* use third arg as return-value after jump */
/* rax = this */
movq %rdx, %rax
/* use third arg as first arg in context function */
/* rdi = this */
movq %rdx, %rdi
/* indirect jump to context */
/* swap_in */
jmp *%r8
この時点でphpとcスタックはswap_を実行するまで復元されましたinの状態で、コードはzif_に戻ります.swoole_coroutine_createの実行が完了しました:
bool Context::swap_in()
{
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
return true; // ,
}
inline long run()
{
...
ctx.swap_in(); //
check_end(); // ,
return cid;
}
static inline long create(coroutine_func_t fn, void* args = nullptr)
{
return (new Coroutine(fn, args))->run();
}
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
...
return Coroutine::create(main_func, (void*) &php_coro_args);
}
PHP_FUNCTION(swoole_coroutine_create)
{
...
long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
...
RETURN_LONG(cid); // id
}
なぜならexecute_dataはmain_に切り替えられましたtask上の主協程opcodeなので、次のopcodeは「echo“a”」で、sleepの後ろのコードをスキップすることに相当します.
一定のタイミングで、タイマはsleep関数に登録されたコールバック関数sleep_を呼び出します.timeout(呼び出しタイミングは後述)、起動協程は運転を継続します.
//
static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
((Coroutine *) tnode->data)->resume();
}
//
void Coroutine::resume()
{
...
state = SW_CORO_RUNNING; //
if (sw_likely(on_resume))
{
on_resume(task); // php
}
origin = current;
current = this;
ctx.swap_in(); // c
...
}
// task
void PHPCoroutine::on_resume(void *arg)
{
php_coro_task *task = (php_coro_task *) arg;
php_coro_task *current_task = get_task();
save_task(current_task); //
restore_task(task); //
record_last_msec(task); //
}
zend_vmはその後のopcode'echo"a"'を読み出し、実行を続行します
現在コールバック中のopcodeがすべて実行された後、PHPCoroutine::main_funcは、前に登録したdeferを1回実行し、FILOの順番でリソースをクリーンアップします.
void PHPCoroutine::main_func(void *arg)
{
...
if (EXPECTED(func->type == ZEND_USER_FUNCTION))
{
...
// ,
zend_execute_ex(EG(current_execute_data));
}
if (task->defer_tasks)
{
std::stack *tasks = task->defer_tasks;
while (!tasks->empty())
{
php_swoole_fci *defer_fci = tasks->top();
tasks->pop(); // FILO
// defer
if (UNEXPECTED(sw_zend_call_function_anyway(&defer_fci->fci, &defer_fci->fci_cache) != SUCCESS))
{
...
}
}
}
// resources release
...
}
main_func実行完了後Context::context_に戻るfuncメソッドは、現在のコヒーレンスを終了としてマークし、もう一度swap_をします.outはさっきのswapに戻りますinのところ、つまりresumeメソッドで、その後目覚ましの協程が実行済みかどうかをチェックし、end_ツールバーの
void Context::context_func(void *arg)
{
Context *_this = (Context *) arg;
_this->fn_(_this->private_data_); // main_func(closure)
_this->end_ = true; //
_this->swap_out(); // main c
}
void Coroutine::resume()
{
...
ctx.swap_in(); //
check_end(); //
}
inline void check_end()
{
if (ctx.is_end())
{
close();
}
}
inline bool is_end()
{
return end_;
}
closeメソッドは、このコラボレーションのために作成されたvm_をクリーンアップします.stack、同時にmain_に戻りますtask,このときcスタックとphpスタックはいずれもメインコヒーレンスに切り替えられた
void Coroutine::close()
{
...
state = SW_CORO_END; //
if (on_close)
{
on_close(task);
}
current = origin;
coroutines.erase(cid); //
delete this;
}
void PHPCoroutine::on_close(void *arg)
{
php_coro_task *task = (php_coro_task *) arg;
php_coro_task *origin_task = get_origin_task(task);
vm_stack_destroy(); // vm_stack
restore_task(origin_task); // main_task
}
Reactorスケジューリング
では、タイミングイベントはいつ実行されますか?これは内部のReactorイベントループによって実現され、以下に具体的な実現を見る.
スレッドを作成すると、reactorが初期化されているかどうかを判断し、初期化されていない場合はactivate関数を呼び出してreactorを初期化します.activate関数には、次の手順があります.
1.reactor構造を初期化し、各種コールバック関数を登録する(読み書きイベントは対応プラットフォームの効率が最も高い多重apiを採用し、統一的なコールバック関数にカプセル化することは異なるapiの実現の詳細を遮蔽するのに役立つ)
2.php_経由swoole_register_shutdown_function("SwooleEvent::rshutdown")request_に登録shutdownフェーズで呼び出された関数(phpのライフサイクルを思い出し、スクリプトが終了すると呼び出されます)は、実際にイベントループがこのフェーズで実行されます.
3.プリエンプトスケジューリングスレッドをオンにします(これは後述します)
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
...
if (sw_unlikely(!active))
{
activate();
}
...
}
inline void PHPCoroutine::activate()
{
...
/* init reactor and register event wait */
php_swoole_check_reactor();
/* replace interrupt function */
orig_interrupt_function = zend_interrupt_function; //
zend_interrupt_function = coro_interrupt_function; //
//
if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler)
{
/* create a thread to interrupt the coroutine that takes up too much time */
interrupt_thread_start();
}
...
active = true;
}
static sw_inline int php_swoole_check_reactor()
{
...
if (sw_unlikely(!SwooleG.main_reactor))
{
return php_swoole_reactor_init() == SW_OK ? 1 : -1;
}
...
}
int php_swoole_reactor_init()
{
...
if (!SwooleG.main_reactor)
{
swoole_event_init();
SwooleG.main_reactor->wait_exit = 1;
// rshutdown
php_swoole_register_shutdown_function("Swoole\\Event::rshutdown");
}
...
}
#define sw_reactor() (SwooleG.main_reactor)
#define SW_REACTOR_MAXEVENTS 4096
int swoole_event_init()
{
SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor));
if (swReactor_create(sw_reactor(), SW_REACTOR_MAXEVENTS) < 0)
{
...
}
...
}
int swReactor_create(swReactor *reactor, int max_event)
{
int ret;
bzero(reactor, sizeof(swReactor));
#ifdef HAVE_EPOLL
ret = swReactorEpoll_create(reactor, max_event);
#elif defined(HAVE_KQUEUE)
ret = swReactorKqueue_create(reactor, max_event);
#elif defined(HAVE_POLL)
ret = swReactorPoll_create(reactor, max_event);
#else
ret = swReactorSelect_create(reactor);
#endif
...
reactor->onTimeout = reactor_timeout; //
...
Socket::init_reactor(reactor);
...
}
int swReactorEpoll_create(swReactor *reactor, int max_event_num)
{
...
//binding method
reactor->add = swReactorEpoll_add;
reactor->set = swReactorEpoll_set;
reactor->del = swReactorEpoll_del;
reactor->wait = swReactorEpoll_wait;
reactor->free = swReactorEpoll_free;
}
request_shutdownフェーズでは、登録されたSwooleEvent::rshutdown関数、swoole_event_rshutdownは、前に登録したwait関数を実行します.
static PHP_FUNCTION(swoole_event_rshutdown)
{
/* prevent the program from jumping out of the rshutdown */
zend_try
{
PHP_FN(swoole_event_wait)(INTERNAL_FUNCTION_PARAM_PASSTHRU);
}
zend_end_try();
}
int swoole_event_wait()
{
int retval = sw_reactor()->wait(sw_reactor(), NULL);
swoole_event_free();
return retval;
}
タイミングイベントの登録を見てみましょう.まずtimerを初期化します.
int System::sleep(double sec)
{
Coroutine* co = Coroutine::get_current_safe(); // coroutine
if (swoole_timer_add((long) (sec * 1000), SW_FALSE, sleep_timeout, co) == NULL)
{
...
}
}
swTimer_node* swoole_timer_add(long ms, uchar persistent, swTimerCallback callback, void *private_data)
{
return swTimer_add(sw_timer(), ms, persistent, private_data, callback);
}
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
if (sw_unlikely(!timer->initialized))
{
if (sw_unlikely(swTimer_init(timer, _msec) != SW_OK)) // timer
{
return NULL;
}
}
...
}
static int swTimer_init(swTimer *timer, long msec)
{
...
timer->heap = swHeap_new(1024, SW_MIN_HEAP); //
timer->map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
timer->_current_id = -1; // id
timer->_next_msec = msec; //
timer->_next_id = 1;
timer->round = 0;
ret = swReactorTimer_init(SwooleG.main_reactor, timer, msec);
...
}
static int swReactorTimer_init(swReactor *reactor, swTimer *timer, long exec_msec)
{
reactor->check_timer = SW_TRUE;
reactor->timeout_msec = exec_msec; //
reactor->timer = timer;
timer->reactor = reactor;
timer->set = swReactorTimer_set;
timer->close = swReactorTimer_close;
...
}
次に、イベントを追加します.
1.time._next_msecとreactor.timeout_msecは、すべてのタイマで最も短いタイムアウト時間(相対値)を維持しています.
2.tnode.exec_msecとtnodeは最小スタックで保存され、スタックトップの要素が最も早くタイムアウトした要素です.
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
swTimer_node *tnode = sw_malloc(sizeof(swTimer_node));
int64_t now_msec = swTimer_get_relative_msec();
tnode->data = data;
tnode->type = SW_TIMER_TYPE_KERNEL;
tnode->exec_msec = now_msec + _msec; //
tnode->interval = interval ? _msec : 0; //
tnode->removed = 0;
tnode->callback = callback;
tnode->round = timer->round;
tnode->dtor = NULL;
if (timer->_next_msec < 0 || timer->_next_msec > _msec) // ,
{
timer->set(timer, _msec);
timer->_next_msec = _msec;
}
tnode->id = timer->_next_id++;
tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode); // , priority = tnode->exec_msec
if (sw_unlikely(swHashMap_add_int(timer->map, tnode->id, tnode) != SW_OK)) // hashmap tnodeid tnode
{
...
}
...
}
タイミング登録が完了すると、イベントループの実行を待つことができます.epollを例に挙げます.
epoll_の使用waitはfd読み書きイベントを待ってreactor->timeout_へmsec、fdイベントが来るのを待つ
1.epoll_の場合waitタイムアウト時にfd読み書きイベントが取得されず、onTimeout関数を実行し、タイミングイベントを処理する
2.fdイベントがあればfd読み書きイベントを処理し、今回トリガーされたイベントを処理した後、次のループに進む
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
...
reactor->running = 1;
reactor->start = 1;
while (reactor->running > 0)
{
...
n = epoll_wait(epoll_fd, events, max_event_num, reactor->timeout_msec);
if (n < 0)
{
...
//
}
else if (n == 0)
{
reactor->onTimeout(reactor);
}
for (i = 0; i < n; i++)
{
...
// fd
}
...
}
return 0;
}
この期間にfdイベントがない場合、タイミングイベントが実行されます.onTimeoutは、以前に登録された関数reactior_です.timeout, swTimer_select関数は、現在期限切れのイベントを実行してループを終了し、上記のsleep_に実行します.timeout関数では、sleepがスリープしているため、コパスが実行され続けます.
static void reactor_timeout(swReactor *reactor)
{
reactor_finish(reactor);
...
}
static void reactor_finish(swReactor *reactor)
{
//check timer
if (reactor->check_timer)
{
swTimer_select(reactor->timer);
}
...
//the event loop is empty
if (reactor->wait_exit && reactor->is_empty(reactor)) // ,
{
reactor->running = 0;
}
}
int swTimer_select(swTimer *timer)
{
int64_t now_msec = swTimer_get_relative_msec(); //
while ((tmp = swHeap_top(timer->heap))) //
{
tnode = tmp->data;
if (tnode->exec_msec > now_msec) //
{
break;
}
if (!tnode->removed)
{
tnode->callback(timer, tnode); //
}
timer->num--;
swHeap_pop(timer->heap);
swHashMap_del_int(timer->map, tnode->id);
}
...
}
ここまで、プロセス全体が紹介されました.まとめてみましょう.
以上から分かるように、もし協程にIO/タイミングイベントがないならば、実際に協程はタイミングを切り替えていないので、CPU密集型のシーンに対して、いくつかの協程はCPUタイムスライスが得られないため餓死することができて、Swoole 4.4はプリエンプトスケジューリングを導入してこの問題を理解するためにこの問題を解決した.
vm interruptはphp 7である.1.0以降に導入された実行メカニズムで、swooleはこの特性を使用して実現されるプリエンプトスケジューリングです.
1.ZEND_VM_INTERRUPT_CHECKはコマンドがjumpとcallのときに実行します
2.ZEND_VM_INTERRUPT_CHECKではEG(vm_interrupt)というフラグビットがチェックされ、1の場合zend_がトリガーされますinterrupt_functionの実行
// php 7.1.26 src
#define ZEND_VM_INTERRUPT_CHECK() do { \
if (UNEXPECTED(EG(vm_interrupt))) { \
ZEND_VM_INTERRUPT(); \
} \
} while (0)
#define ZEND_VM_INTERRUPT() ZEND_VM_TAIL_CALL(zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS_PASSTHRU));
static ZEND_OPCODE_HANDLER_RET ZEND_FASTCALL zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS)
{
...
EG(vm_interrupt) = 0;
if (zend_interrupt_function) {
zend_interrupt_function(execute_data);
}
}
具体的な実装について説明します.
初期化:
1.元の割り込み関数を保存し、zend_interrupt_functionを新しい割り込み関数に置き換える
2.スレッドを開いてinterrupt_を実行thread_loop
3.interrupt_thread_loopでは5 msごとにEG(vm_interrupt)を1に設定
inline void PHPCoroutine::activate()
{
...
/* replace interrupt function */
orig_interrupt_function = zend_interrupt_function; //
zend_interrupt_function = coro_interrupt_function; //
//
if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler) // enable_preemptive_scheduler
{
/* create a thread to interrupt the coroutine that takes up too much time */
interrupt_thread_start();
}
}
void PHPCoroutine::interrupt_thread_start()
{
zend_vm_interrupt = &EG(vm_interrupt);
interrupt_thread_running = true;
if (pthread_create(&interrupt_thread_id, NULL, (void * (*)(void *)) interrupt_thread_loop, NULL) < 0)
{
...
}
}
static const uint8_t MAX_EXEC_MSEC = 10;
void PHPCoroutine::interrupt_thread_loop()
{
static const useconds_t interval = (MAX_EXEC_MSEC / 2) * 1000;
while (interrupt_thread_running)
{
*zend_vm_interrupt = 1; // EG(vm_interrupt) = 1
usleep(interval); // 5ms
}
pthread_exit(0);
}
割り込み関数coro_interrupt_functionは、現在のコヒーレンスがスケジューリング可能かどうかを確認し(前回の切り替え時間から10 msを超える)、できれば現在のコヒーレンスを直接譲り、プリエンプトスケジューリングを完了します
static void coro_interrupt_function(zend_execute_data *execute_data)
{
php_coro_task *task = PHPCoroutine::get_task();
if (task && task->co && PHPCoroutine::is_schedulable(task))
{
task->co->yield(); //
}
if (orig_interrupt_function)
{
orig_interrupt_function(execute_data); //
}
}
static const uint8_t MAX_EXEC_MSEC = 10;
static inline bool is_schedulable(php_coro_task *task)
{
// enable_scheduler 1 10ms
return task->enable_scheduler && (swTimer_get_absolute_msec() - task->last_msec > MAX_EXEC_MSEC);
}