Swooleソース分析-プロセス管理Swoole_Process
前言
PHPが持参したpcntlには、多くの不足があります. を提供する. はサポートする. の使用が容易である.
プロセスの初期化では、まず現在の環境を判断します.非 は使用できません.は、 と同様に、非同期の
現在の環境でプロセスを作成できる場合は、次のプロパティを初期化する必要があります. プロセスの入出力をメインプロセスパイプに関連付けるリダイレクトを設定する . は、 メインプロセスに残っているタイマと信号をクリアします. 設定 である. を実行する.コールバック関数で非同期システムが呼び出された場合、
メインプロセスとサブプロセスとの間で通信を行うには
非同期プログラムに信号処理関数を追加します.まず、プログラムは、現在のプロセス環境と登録された信号を検査し、条件に合致しない直接戻り、例えば、
これまで信号処理関数が存在していた場合、関数は以前のコールバック関数を上書きし、前の論理は再び実行され、その後破棄されます.
プッシュおよび消費メッセージは、
プロセスへの信号送信
swoole-1.7.2
は、PHP
のpcntl
拡張の代わりにプロセス管理モジュールを追加した.PHPが持参したpcntlには、多くの不足があります.
pcntl
は、プロセス間通信の機能pcntl
リダイレクト標準入出力pcntl
は、fork
のような元のインタフェースのみを提供し、エラーswoole_process
は、pcntl
よりも強力な機能を提供し、より使いやすいAPI
を提供し、PHP
をマルチプロセスプログラミングの面でより楽にします.swoole_process::__construct
サブプロセスの作成プロセスの初期化では、まず現在の環境を判断します.
CLI
モードではserver master
プロセスの下にあり、server
が開始された後はプロセスを作成できません.master
プロセスは、reator
スレッドを複数作成し、fork
後にマルチスレッドもコピーされるからです.AIO
を使用したプロセスではスレッドプールが使用され、fork
では非常に複雑なスレッド付きfork
の問題が発生します.現在の環境でプロセスを作成できる場合は、次のプロパティを初期化する必要があります.
process->id
:通常のクライアントプロセス、またはmaster
プロセスがserver
を起動していない状態であれば、php_swoole_worker_round_id
は作成されたprocess
プロセスの数であり、この場合は増加するだけでよい.server
が起動した場合、php_swoole_worker_round_id
には、worker
プロセスの数も加算されます.php_swoole_worker_round_id
が増加するとprocess->id
になります.swPipeUnsock_create
関数新規パイプstatic PHP_METHOD(swoole_process, __construct)
{
zend_bool redirect_stdin_and_stdout = 0;
long pipe_type = 2;
zval *callback;
//only cli env
if (!SWOOLE_G(cli))
{
swoole_php_fatal_error(E_ERROR, "swoole_process only can be used in PHP CLI mode.");
RETURN_FALSE;
}
if (SwooleG.serv && SwooleG.serv->gs->start == 1 && swIsMaster())
{
swoole_php_fatal_error(E_ERROR, "swoole_process can't be used in master process.");
RETURN_FALSE;
}
if (SwooleAIO.init)
{
swoole_php_fatal_error(E_ERROR, "unable to create process with async-io threads.");
RETURN_FALSE;
}
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE)
{
RETURN_FALSE;
}
char *func_name = NULL;
if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC))
{
swoole_php_fatal_error(E_ERROR, "function '%s' is not callable", func_name);
efree(func_name);
RETURN_FALSE;
}
efree(func_name);
swWorker *process = emalloc(sizeof(swWorker));
bzero(process, sizeof(swWorker));
int base = 1;
if (SwooleG.serv && SwooleG.serv->gs->start)
{
base = SwooleG.serv->worker_num + SwooleG.serv->task_worker_num + SwooleG.serv->user_worker_num;
}
if (php_swoole_worker_round_id == 0)
{
php_swoole_worker_round_id = base;
}
process->id = php_swoole_worker_round_id++;
if (redirect_stdin_and_stdout)
{
process->redirect_stdin = 1;
process->redirect_stdout = 1;
process->redirect_stderr = 1;
/**
* Forced to use stream pipe
*/
pipe_type = 1;
}
if (pipe_type > 0)
{
swPipe *_pipe = emalloc(sizeof(swPipe));
int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM;
if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)
{
RETURN_FALSE;
}
process->pipe_object = _pipe;
process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);
process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER);
process->pipe = process->pipe_master;
zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC);
}
swoole_set_object(getThis(), process);
zend_update_property(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("callback"), callback TSRMLS_CC);
}
swoole_process->start
プロセスの開始swoole_process->start
関数は、fork
の新しいプロセスに使用され、php_swoole_process_start
が呼び出されるstatic PHP_METHOD(swoole_process, start)
{
swWorker *process = swoole_get_object(getThis());
if (process->pid > 0 && kill(process->pid, 0) == 0)
{
swoole_php_fatal_error(E_WARNING, "process has already been started.");
RETURN_FALSE;
}
pid_t pid = fork();
if (pid < 0)
{
swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
else if (pid > 0)
{
process->pid = pid;
process->child_process = 0;
zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC);
RETURN_LONG(pid);
}
else
{
process->child_process = 1;
SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC));
}
RETURN_TRUE;
}
php_swoole_process_start
関数は、メインプロセスの残りのリダイレクトおよびクリーンアップのいくつかの機能を設定するために使用されます.STDIN_FILENO
入力、STDOUT_FILENO
出力、STDERR_FILENO
エラー出力をpipe_worker
にバインドし、リダイレクト機能を実現する.SwooleG.main_reactor
が存在する場合、関連メモリを削除して解放します.process_type
は0 _construct
コールバック関数php_swoole_event_wait
関数がイベントループを開始する.int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC)
{
process->pipe = process->pipe_worker;
process->pid = getpid();
if (process->redirect_stdin)
{
if (dup2(process->pipe, STDIN_FILENO) < 0)
{
swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
}
}
if (process->redirect_stdout)
{
if (dup2(process->pipe, STDOUT_FILENO) < 0)
{
swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
}
}
if (process->redirect_stderr)
{
if (dup2(process->pipe, STDERR_FILENO) < 0)
{
swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
}
}
/**
* Close EventLoop
*/
if (SwooleG.main_reactor)
{
SwooleG.main_reactor->free(SwooleG.main_reactor);
SwooleG.main_reactor = NULL;
swTraceLog(SW_TRACE_PHP, "destroy reactor");
}
bzero(&SwooleWG, sizeof(SwooleWG));
SwooleG.pid = process->pid;
if (SwooleG.process_type != SW_PROCESS_USERWORKER)
{
SwooleG.process_type = 0;
}
SwooleWG.id = process->id;
if (SwooleG.timer.fd)
{
swTimer_free(&SwooleG.timer);
bzero(&SwooleG.timer, sizeof(SwooleG.timer));
}
swSignal_clear();
zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC);
zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC);
zval *zcallback = sw_zend_read_property(swoole_process_class_entry_ptr, object, ZEND_STRL("callback"), 0 TSRMLS_CC);
zval **args[1];
if (zcallback == NULL || ZVAL_IS_NULL(zcallback))
{
swoole_php_fatal_error(E_ERROR, "no callback.");
return SW_ERR;
}
zval *retval = NULL;
args[0] = &object;
sw_zval_add_ref(&object);
if (sw_call_user_function_ex(EG(function_table), NULL, zcallback, &retval, 1, args, 0, NULL TSRMLS_CC) == FAILURE)
{
swoole_php_fatal_error(E_ERROR, "callback function error");
return SW_ERR;
}
if (EG(exception))
{
zend_exception_error(EG(exception), E_ERROR TSRMLS_CC);
}
if (retval)
{
sw_zval_ptr_dtor(&retval);
}
if (SwooleG.main_reactor)
{
php_swoole_event_wait();
}
SwooleG.running = 0;
zend_bailout();
return SW_OK;
}
swoole_process->write
/ swoole_process->read
メインプロセスとサブプロセスとの間で通信を行うには
write
とread
を使用します.swoole_event
を使用すると、パイプが自動的に非ブロックモードに変わり、reactor
によってイベントのループ読み書きが行われます.そうしないと、ブロック読み書きが採用されます.static PHP_METHOD(swoole_process, write)
{
char *data = NULL;
zend_size_t data_len = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &data_len) == FAILURE)
{
RETURN_FALSE;
}
if (data_len < 1)
{
swoole_php_fatal_error(E_WARNING, "the data to send is empty.");
RETURN_FALSE;
}
swWorker *process = swoole_get_object(getThis());
if (process->pipe == 0)
{
swoole_php_fatal_error(E_WARNING, "no pipe, can not write into pipe.");
RETURN_FALSE;
}
int ret;
//async write
if (SwooleG.main_reactor)
{
swConnection *_socket = swReactor_get(SwooleG.main_reactor, process->pipe);
if (_socket && _socket->nonblock)
{
ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len);
}
else
{
goto _blocking_read;
}
}
else
{
_blocking_read: ret = swSocket_write_blocking(process->pipe, data, data_len);
}
if (ret < 0)
{
swoole_php_error(E_WARNING, "write() failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
ZVAL_LONG(return_value, ret);
}
static PHP_METHOD(swoole_process, read)
{
long buf_size = 8192;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &buf_size) == FAILURE)
{
RETURN_FALSE;
}
if (buf_size > 65536)
{
buf_size = 65536;
}
swWorker *process = swoole_get_object(getThis());
if (process->pipe == 0)
{
swoole_php_fatal_error(E_WARNING, "no pipe, can not read from pipe.");
RETURN_FALSE;
}
char *buf = emalloc(buf_size + 1);
int ret = read(process->pipe, buf, buf_size);;
if (ret < 0)
{
efree(buf);
if (errno != EINTR)
{
swoole_php_error(E_WARNING, "read() failed. Error: %s[%d]", strerror(errno), errno);
}
RETURN_FALSE;
}
buf[ret] = 0;
SW_ZVAL_STRINGL(return_value, buf, ret, 0);
efree(buf);
}
swoole_process::signal
信号処理関数の設定非同期プログラムに信号処理関数を追加します.まず、プログラムは、現在のプロセス環境と登録された信号を検査し、条件に合致しない直接戻り、例えば、
swoole_server
にはSIGTERM
とSIGALAM
の信号を設定することができず、この2つの信号はswoole
が保持する必要があり、ユーザーは修正できない.これまで信号処理関数が存在していた場合、関数は以前のコールバック関数を上書きし、前の論理は再び実行され、その後破棄されます.
static PHP_METHOD(swoole_process, signal)
{
zval *callback = NULL;
long signo = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz", &signo, &callback) == FAILURE)
{
return;
}
if (!SWOOLE_G(cli))
{
swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::signal here.");
RETURN_FALSE;
}
if (SwooleG.serv && SwooleG.serv->gs->start)
{
if ((swIsWorker() || swIsTaskWorker()) && signo == SIGTERM)
{
swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM in worker/task process.");
RETURN_FALSE;
}
else if (swIsManager() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM))
{
swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM in manager process.");
RETURN_FALSE;
}
else if (swIsMaster() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM || signo == SIGCHLD))
{
swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM/SIGCHLD in manager process.");
RETURN_FALSE;
}
}
php_swoole_check_reactor();
swSignalHander handler;
if (callback == NULL || ZVAL_IS_NULL(callback))
{
callback = signal_callback[signo];
if (callback)
{
swSignal_add(signo, NULL);
SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, callback);
signal_callback[signo] = NULL;
RETURN_TRUE;
}
else
{
swoole_php_error(E_WARNING, "no callback.");
RETURN_FALSE;
}
}
else if (Z_TYPE_P(callback) == IS_LONG && Z_LVAL_P(callback) == (long) SIG_IGN)
{
handler = NULL;
}
else
{
char *func_name;
if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC))
{
swoole_php_error(E_WARNING, "function '%s' is not callable", func_name);
efree(func_name);
RETURN_FALSE;
}
efree(func_name);
callback = sw_zval_dup(callback);
sw_zval_add_ref(&callback);
handler = php_swoole_onSignal;
}
/**
* for swSignalfd_setup
*/
SwooleG.main_reactor->check_signalfd = 1;
//free the old callback
if (signal_callback[signo])
{
SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, signal_callback[signo]);
}
signal_callback[signo] = callback;
/**
* use user settings
*/
SwooleG.use_signalfd = SwooleG.enable_signalfd;
swSignal_add(signo, handler);
RETURN_TRUE;
}
swoole_process::alarm
プロセスタイマSwoole\Timer
と比較すると、swoole_process::alarm
は非常に良い選択ではなく、swoole_process::alarm
は真のプロセスalarm
タイマに類似しており、alarm
はalarm
信号の設定しか許可されていないが、Swoole\Timer
はタイミングタスクの最小スタックを実現するため、異なる時間間隔で異なるタスクを実行することができる.したがって、両者を区別するために、swoole
は両者が同時に存在することを許さないと規定している.swoole_process::alarm
関数は、swoole_process::signal
を内部的に呼び出し、setitimer
信号を周期的に送信するため、alarm
関数にswoole_process::signal
信号のコールバック関数を設定する必要があるため、alarm
関数と組み合わせる必要がある.static PHP_METHOD(swoole_process, alarm)
{
long usec = 0;
long type = ITIMER_REAL;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &usec, &type) == FAILURE)
{
return;
}
if (!SWOOLE_G(cli))
{
swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::alarm here.");
RETURN_FALSE;
}
if (SwooleG.timer.fd != 0)
{
swoole_php_fatal_error(E_WARNING, "cannot use both 'timer' and 'alarm' at the same time.");
RETURN_FALSE;
}
struct timeval now;
if (gettimeofday(&now, NULL) < 0)
{
swoole_php_error(E_WARNING, "gettimeofday() failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
struct itimerval timer_set;
bzero(&timer_set, sizeof(timer_set));
if (usec > 0)
{
long _sec = usec / 1000000;
long _usec = usec - (_sec * 1000000);
timer_set.it_interval.tv_sec = _sec;
timer_set.it_interval.tv_usec = _usec;
timer_set.it_value.tv_sec = _sec;
timer_set.it_value.tv_usec = _usec;
if (timer_set.it_value.tv_usec > 1e6)
{
timer_set.it_value.tv_usec = timer_set.it_value.tv_usec - 1e6;
timer_set.it_value.tv_sec += 1;
}
}
if (setitimer(type, &timer_set, NULL) < 0)
{
swoole_php_error(E_WARNING, "setitimer() failed. Error: %s[%d]", strerror(errno), errno);
RETURN_FALSE;
}
RETURN_TRUE;
}
swoole_process->useQueue
メッセージキューuseQueue
は、swMsgQueue_create
を使用してprocess->queue
を作成します.static PHP_METHOD(swoole_process, useQueue)
{
long msgkey = 0;
long mode = 2;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE)
{
RETURN_FALSE;
}
swWorker *process = swoole_get_object(getThis());
if (msgkey <= 0)
{
msgkey = ftok(sw_zend_get_executed_filename(), 1);
}
swMsgQueue *queue = emalloc(sizeof(swMsgQueue));
if (swMsgQueue_create(queue, 1, msgkey, 0) < 0)
{
RETURN_FALSE;
}
if (mode & MSGQUEUE_NOWAIT)
{
swMsgQueue_set_blocking(queue, 0);
mode = mode & (~MSGQUEUE_NOWAIT);
}
process->queue = queue;
process->ipc_mode = mode;
zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueId"), queue->msg_id TSRMLS_CC);
zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueKey"), msgkey TSRMLS_CC);
RETURN_TRUE;
}
swoole_process->push
/swoole_process->pop
メッセージ通信プッシュおよび消費メッセージは、
swMsgQueue_push/swMsgQueue_pop
関数を利用する.static PHP_METHOD(swoole_process, push)
{
char *data;
zend_size_t length;
struct
{
long type;
char data[SW_MSGMAX];
} message;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE)
{
RETURN_FALSE;
}
if (length <= 0)
{
swoole_php_fatal_error(E_WARNING, "the data to push is empty.");
RETURN_FALSE;
}
else if (length >= sizeof(message.data))
{
swoole_php_fatal_error(E_WARNING, "the data to push is too big.");
RETURN_FALSE;
}
swWorker *process = swoole_get_object(getThis());
if (!process->queue)
{
swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use push()");
RETURN_FALSE;
}
message.type = process->id;
memcpy(message.data, data, length);
if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0)
{
RETURN_FALSE;
}
RETURN_TRUE;
}
static PHP_METHOD(swoole_process, pop)
{
long maxsize = SW_MSGMAX;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &maxsize) == FAILURE)
{
RETURN_FALSE;
}
if (maxsize > SW_MSGMAX || maxsize <= 0)
{
maxsize = SW_MSGMAX;
}
swWorker *process = swoole_get_object(getThis());
if (!process->queue)
{
swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use pop()");
RETURN_FALSE;
}
struct
{
long type;
char data[SW_MSGMAX];
} message;
if (process->ipc_mode == 2)
{
message.type = 0;
}
else
{
message.type = process->id;
}
int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize);
if (n < 0)
{
RETURN_FALSE;
}
SW_RETURN_STRINGL(message.data, n, 1);
}
swoole_process::kill
/swoole_process::wait
プロセスへの信号送信
kill
は、回収サブプロセスwait
と比較的論理的に簡単であり、対応する関数を呼び出すことである.kill
以降のエラーがESRCH
であれば、対応するプロセスが存在しないことを意味することに注意してください.static PHP_METHOD(swoole_process, kill)
{
long pid;
long sig = SIGTERM;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &pid, &sig) == FAILURE)
{
RETURN_FALSE;
}
int ret = kill((int) pid, (int) sig);
if (ret < 0)
{
if (!(sig == 0 && errno == ESRCH))
{
swoole_php_error(E_WARNING, "kill(%d, %d) failed. Error: %s[%d]", (int) pid, (int) sig, strerror(errno), errno);
}
RETURN_FALSE;
}
RETURN_TRUE;
}
static PHP_METHOD(swoole_process, wait)
{
int status;
zend_bool blocking = 1;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|b", &blocking) == FAILURE)
{
RETURN_FALSE;
}
int options = 0;
if (!blocking)
{
options |= WNOHANG;
}
pid_t pid = swWaitpid(-1, &status, options);
if (pid > 0)
{
array_init(return_value);
add_assoc_long(return_value, "pid", pid);
add_assoc_long(return_value, "code", WEXITSTATUS(status));
add_assoc_long(return_value, "signal", WTERMSIG(status));
}
else
{
RETURN_FALSE;
}
}
static sw_inline int swWaitpid(pid_t __pid, int *__stat_loc, int __options)
{
int ret;
do
{
ret = waitpid(__pid, __stat_loc, __options);
if (ret < 0 && errno == EINTR)
{
continue;
}
break;
} while(1);
return ret;
}