Swooleソース分析-プロセス管理Swoole_Process

21375 ワード

前言swoole-1.7.2は、PHPpcntl拡張の代わりにプロセス管理モジュールを追加した.
PHPが持参したpcntlには、多くの不足があります.
  • pcntlは、プロセス間通信の機能
  • を提供する.
  • pcntlリダイレクト標準入出力
  • はサポートする.
  • pcntlは、forkのような元のインタフェースのみを提供し、エラー
  • の使用が容易である.
  • swoole_processは、pcntlよりも強力な機能を提供し、より使いやすいAPIを提供し、PHPをマルチプロセスプログラミングの面でより楽にします.
  • swoole_process::__constructサブプロセスの作成
    プロセスの初期化では、まず現在の環境を判断します.
  • CLIモードでは
  • は使用できません.
  • は、server masterプロセスの下にあり、serverが開始された後はプロセスを作成できません.masterプロセスは、reatorスレッドを複数作成し、fork後にマルチスレッドもコピーされるからです.
  • と同様に、非同期のAIOを使用したプロセスではスレッドプールが使用され、forkでは非常に複雑なスレッド付きforkの問題が発生します.

  • 現在の環境でプロセスを作成できる場合は、次のプロパティを初期化する必要があります.
  • process->id:通常のクライアントプロセス、またはmasterプロセスがserverを起動していない状態であれば、php_swoole_worker_round_idは作成されたprocessプロセスの数であり、この場合は増加するだけでよい.serverが起動した場合、php_swoole_worker_round_idには、workerプロセスの数も加算されます.php_swoole_worker_round_idが増加するとprocess->idになります.
  • プロセスの入出力をメインプロセスパイプに関連付けるリダイレクトを設定する
  • .
  • swPipeUnsock_create関数新規パイプ
  • static PHP_METHOD(swoole_process, __construct)
    {
        zend_bool redirect_stdin_and_stdout = 0;
        long pipe_type = 2;
        zval *callback;
    
        //only cli env
        if (!SWOOLE_G(cli))
        {
            swoole_php_fatal_error(E_ERROR, "swoole_process only can be used in PHP CLI mode.");
            RETURN_FALSE;
        }
    
        if (SwooleG.serv && SwooleG.serv->gs->start == 1 && swIsMaster())
        {
            swoole_php_fatal_error(E_ERROR, "swoole_process can't be used in master process.");
            RETURN_FALSE;
        }
    
        if (SwooleAIO.init)
        {
            swoole_php_fatal_error(E_ERROR, "unable to create process with async-io threads.");
            RETURN_FALSE;
        }
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        char *func_name = NULL;
        if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC))
        {
            swoole_php_fatal_error(E_ERROR, "function '%s' is not callable", func_name);
            efree(func_name);
            RETURN_FALSE;
        }
        efree(func_name);
    
        swWorker *process = emalloc(sizeof(swWorker));
        bzero(process, sizeof(swWorker));
    
        int base = 1;
        if (SwooleG.serv && SwooleG.serv->gs->start)
        {
            base = SwooleG.serv->worker_num + SwooleG.serv->task_worker_num + SwooleG.serv->user_worker_num;
        }
        if (php_swoole_worker_round_id == 0)
        {
            php_swoole_worker_round_id = base;
        }
        process->id = php_swoole_worker_round_id++;
    
        if (redirect_stdin_and_stdout)
        {
            process->redirect_stdin = 1;
            process->redirect_stdout = 1;
            process->redirect_stderr = 1;
            /**
             * Forced to use stream pipe
             */
            pipe_type = 1;
        }
    
        if (pipe_type > 0)
        {
            swPipe *_pipe = emalloc(sizeof(swPipe));
            int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM;
            if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)
            {
                RETURN_FALSE;
            }
    
            process->pipe_object = _pipe;
            process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);
            process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER);
            process->pipe = process->pipe_master;
    
            zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC);
        }
    
        swoole_set_object(getThis(), process);
        zend_update_property(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("callback"), callback TSRMLS_CC);
    }
    
    swoole_process->startプロセスの開始swoole_process->start関数は、forkの新しいプロセスに使用され、php_swoole_process_startが呼び出される
    static PHP_METHOD(swoole_process, start)
    {
        swWorker *process = swoole_get_object(getThis());
    
        if (process->pid > 0 && kill(process->pid, 0) == 0)
        {
            swoole_php_fatal_error(E_WARNING, "process has already been started.");
            RETURN_FALSE;
        }
    
        pid_t pid = fork();
        if (pid < 0)
        {
            swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno);
            RETURN_FALSE;
        }
        else if (pid > 0)
        {
            process->pid = pid;
            process->child_process = 0;
            zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC);
            RETURN_LONG(pid);
        }
        else
        {
            process->child_process = 1;
            SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC));
        }
        RETURN_TRUE;
    }
    php_swoole_process_start関数は、メインプロセスの残りのリダイレクトおよびクリーンアップのいくつかの機能を設定するために使用されます.
  • は、STDIN_FILENO入力、STDOUT_FILENO出力、STDERR_FILENOエラー出力をpipe_workerにバインドし、リダイレクト機能を実現する.
  • SwooleG.main_reactorが存在する場合、関連メモリを削除して解放します.
  • メインプロセスに残っているタイマと信号をクリアします.
  • 設定process_typeは0
  • である.
  • _constructコールバック関数
  • を実行する.
  • コールバック関数で非同期システムが呼び出された場合、php_swoole_event_wait関数がイベントループを開始する.
  • int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC)
    {
        process->pipe = process->pipe_worker;
        process->pid = getpid();
    
        if (process->redirect_stdin)
        {
            if (dup2(process->pipe, STDIN_FILENO) < 0)
            {
                swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
            }
        }
    
        if (process->redirect_stdout)
        {
            if (dup2(process->pipe, STDOUT_FILENO) < 0)
            {
                swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
            }
        }
    
        if (process->redirect_stderr)
        {
            if (dup2(process->pipe, STDERR_FILENO) < 0)
            {
                swoole_php_fatal_error(E_WARNING, "dup2() failed. Error: %s[%d]", strerror(errno), errno);
            }
        }
    
        /**
         * Close EventLoop
         */
        if (SwooleG.main_reactor)
        {
            SwooleG.main_reactor->free(SwooleG.main_reactor);
            SwooleG.main_reactor = NULL;
            swTraceLog(SW_TRACE_PHP, "destroy reactor");
        }
    
        bzero(&SwooleWG, sizeof(SwooleWG));
        SwooleG.pid = process->pid;
        if (SwooleG.process_type != SW_PROCESS_USERWORKER)
        {
            SwooleG.process_type = 0;
        }
        SwooleWG.id = process->id;
    
        if (SwooleG.timer.fd)
        {
            swTimer_free(&SwooleG.timer);
            bzero(&SwooleG.timer, sizeof(SwooleG.timer));
        }
    
        swSignal_clear();
    
        zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC);
        zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC);
    
        zval *zcallback = sw_zend_read_property(swoole_process_class_entry_ptr, object, ZEND_STRL("callback"), 0 TSRMLS_CC);
        zval **args[1];
    
        if (zcallback == NULL || ZVAL_IS_NULL(zcallback))
        {
            swoole_php_fatal_error(E_ERROR, "no callback.");
            return SW_ERR;
        }
    
        zval *retval = NULL;
        args[0] = &object;
        sw_zval_add_ref(&object);
    
        if (sw_call_user_function_ex(EG(function_table), NULL, zcallback, &retval, 1, args, 0, NULL TSRMLS_CC) == FAILURE)
        {
            swoole_php_fatal_error(E_ERROR, "callback function error");
            return SW_ERR;
        }
        if (EG(exception))
        {
            zend_exception_error(EG(exception), E_ERROR TSRMLS_CC);
        }
        if (retval)
        {
            sw_zval_ptr_dtor(&retval);
        }
    
        if (SwooleG.main_reactor)
        {
            php_swoole_event_wait();
        }
        SwooleG.running = 0;
    
        zend_bailout();
        return SW_OK;
    }
    
    swoole_process->write/ swoole_process->read
    メインプロセスとサブプロセスとの間で通信を行うにはwritereadを使用します.swoole_eventを使用すると、パイプが自動的に非ブロックモードに変わり、reactorによってイベントのループ読み書きが行われます.そうしないと、ブロック読み書きが採用されます.
    static PHP_METHOD(swoole_process, write)
    {
        char *data = NULL;
        zend_size_t data_len = 0;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &data_len) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        if (data_len < 1)
        {
            swoole_php_fatal_error(E_WARNING, "the data to send is empty.");
            RETURN_FALSE;
        }
    
        swWorker *process = swoole_get_object(getThis());
        if (process->pipe == 0)
        {
            swoole_php_fatal_error(E_WARNING, "no pipe, can not write into pipe.");
            RETURN_FALSE;
        }
    
        int ret;
    
        //async write
        if (SwooleG.main_reactor)
        {
            swConnection *_socket = swReactor_get(SwooleG.main_reactor, process->pipe);
            if (_socket && _socket->nonblock)
            {
                ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len);
            }
            else
            {
                goto _blocking_read;
            }
        }
        else
        {
            _blocking_read: ret = swSocket_write_blocking(process->pipe, data, data_len);
        }
    
        if (ret < 0)
        {
            swoole_php_error(E_WARNING, "write() failed. Error: %s[%d]", strerror(errno), errno);
            RETURN_FALSE;
        }
        ZVAL_LONG(return_value, ret);
    }
    
    static PHP_METHOD(swoole_process, read)
    {
        long buf_size = 8192;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &buf_size) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        if (buf_size > 65536)
        {
            buf_size = 65536;
        }
    
        swWorker *process = swoole_get_object(getThis());
    
        if (process->pipe == 0)
        {
            swoole_php_fatal_error(E_WARNING, "no pipe, can not read from pipe.");
            RETURN_FALSE;
        }
    
        char *buf = emalloc(buf_size + 1);
        int ret = read(process->pipe, buf, buf_size);;
        if (ret < 0)
        {
            efree(buf);
            if (errno != EINTR)
            {
                swoole_php_error(E_WARNING, "read() failed. Error: %s[%d]", strerror(errno), errno);
            }
            RETURN_FALSE;
        }
        buf[ret] = 0;
        SW_ZVAL_STRINGL(return_value, buf, ret, 0);
        efree(buf);
    }
    swoole_process::signal信号処理関数の設定
    非同期プログラムに信号処理関数を追加します.まず、プログラムは、現在のプロセス環境と登録された信号を検査し、条件に合致しない直接戻り、例えば、swoole_serverにはSIGTERMSIGALAMの信号を設定することができず、この2つの信号はswooleが保持する必要があり、ユーザーは修正できない.
    これまで信号処理関数が存在していた場合、関数は以前のコールバック関数を上書きし、前の論理は再び実行され、その後破棄されます.
    static PHP_METHOD(swoole_process, signal)
    {
        zval *callback = NULL;
        long signo = 0;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz", &signo, &callback) == FAILURE)
        {
            return;
        }
    
        if (!SWOOLE_G(cli))
        {
            swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::signal here.");
            RETURN_FALSE;
        }
    
        if (SwooleG.serv && SwooleG.serv->gs->start)
        {
            if ((swIsWorker() || swIsTaskWorker()) && signo == SIGTERM)
            {
                swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM in worker/task process.");
                RETURN_FALSE;
            }
            else if (swIsManager() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM))
            {
                swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM in manager process.");
                RETURN_FALSE;
            }
            else if (swIsMaster() && (signo == SIGTERM || signo == SIGUSR1 || signo == SIGUSR2 || signo == SIGALRM || signo == SIGCHLD))
            {
                swoole_php_fatal_error(E_WARNING, "unable to register SIGTERM/SIGUSR1/SIGUSR2/SIGALRM/SIGCHLD in manager process.");
                RETURN_FALSE;
            }
        }
    
        php_swoole_check_reactor();
        swSignalHander handler;
    
        if (callback == NULL || ZVAL_IS_NULL(callback))
        {
            callback = signal_callback[signo];
            if (callback)
            {
                swSignal_add(signo, NULL);
                SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, callback);
                signal_callback[signo] = NULL;
                RETURN_TRUE;
            }
            else
            {
                swoole_php_error(E_WARNING, "no callback.");
                RETURN_FALSE;
            }
        }
        else if (Z_TYPE_P(callback) == IS_LONG && Z_LVAL_P(callback) == (long) SIG_IGN)
        {
            handler = NULL;
        }
        else
        {
            char *func_name;
            if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC))
            {
                swoole_php_error(E_WARNING, "function '%s' is not callable", func_name);
                efree(func_name);
                RETURN_FALSE;
            }
            efree(func_name);
    
            callback = sw_zval_dup(callback);
            sw_zval_add_ref(&callback);
    
            handler = php_swoole_onSignal;
        }
    
        /**
         * for swSignalfd_setup
         */
        SwooleG.main_reactor->check_signalfd = 1;
    
        //free the old callback
        if (signal_callback[signo])
        {
            SwooleG.main_reactor->defer(SwooleG.main_reactor, free_signal_callback, signal_callback[signo]);
        }
        signal_callback[signo] = callback;
    
        /**
         * use user settings
         */
        SwooleG.use_signalfd = SwooleG.enable_signalfd;
    
        swSignal_add(signo, handler);
    
        RETURN_TRUE;
    }
    
    swoole_process::alarmプロセスタイマSwoole\Timerと比較すると、swoole_process::alarmは非常に良い選択ではなく、swoole_process::alarmは真のプロセスalarmタイマに類似しており、alarmalarm信号の設定しか許可されていないが、Swoole\Timerはタイミングタスクの最小スタックを実現するため、異なる時間間隔で異なるタスクを実行することができる.したがって、両者を区別するために、swooleは両者が同時に存在することを許さないと規定している.swoole_process::alarm関数は、swoole_process::signalを内部的に呼び出し、setitimer信号を周期的に送信するため、alarm関数にswoole_process::signal信号のコールバック関数を設定する必要があるため、alarm関数と組み合わせる必要がある.
    static PHP_METHOD(swoole_process, alarm)
    {
        long usec = 0;
        long type = ITIMER_REAL;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &usec, &type) == FAILURE)
        {
            return;
        }
    
        if (!SWOOLE_G(cli))
        {
            swoole_php_fatal_error(E_ERROR, "cannot use swoole_process::alarm here.");
            RETURN_FALSE;
        }
    
        if (SwooleG.timer.fd != 0)
        {
            swoole_php_fatal_error(E_WARNING, "cannot use both 'timer' and 'alarm' at the same time.");
            RETURN_FALSE;
        }
    
        struct timeval now;
        if (gettimeofday(&now, NULL) < 0)
        {
            swoole_php_error(E_WARNING, "gettimeofday() failed. Error: %s[%d]", strerror(errno), errno);
            RETURN_FALSE;
        }
    
        struct itimerval timer_set;
        bzero(&timer_set, sizeof(timer_set));
    
        if (usec > 0)
        {
            long _sec = usec / 1000000;
            long _usec = usec - (_sec * 1000000);
    
            timer_set.it_interval.tv_sec = _sec;
            timer_set.it_interval.tv_usec = _usec;
    
            timer_set.it_value.tv_sec = _sec;
            timer_set.it_value.tv_usec = _usec;
    
            if (timer_set.it_value.tv_usec > 1e6)
            {
                timer_set.it_value.tv_usec = timer_set.it_value.tv_usec - 1e6;
                timer_set.it_value.tv_sec += 1;
            }
        }
    
        if (setitimer(type, &timer_set, NULL) < 0)
        {
            swoole_php_error(E_WARNING, "setitimer() failed. Error: %s[%d]", strerror(errno), errno);
            RETURN_FALSE;
        }
    
        RETURN_TRUE;
    }
    
    swoole_process->useQueueメッセージキューuseQueueは、swMsgQueue_createを使用してprocess->queueを作成します.
    static PHP_METHOD(swoole_process, useQueue)
    {
        long msgkey = 0;
        long mode = 2;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        swWorker *process = swoole_get_object(getThis());
    
        if (msgkey <= 0)
        {
            msgkey = ftok(sw_zend_get_executed_filename(), 1);
        }
    
        swMsgQueue *queue = emalloc(sizeof(swMsgQueue));
        if (swMsgQueue_create(queue, 1, msgkey, 0) < 0)
        {
            RETURN_FALSE;
        }
        if (mode & MSGQUEUE_NOWAIT)
        {
            swMsgQueue_set_blocking(queue, 0);
            mode = mode & (~MSGQUEUE_NOWAIT);
        }
        process->queue = queue;
        process->ipc_mode = mode;
        zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueId"), queue->msg_id TSRMLS_CC);
        zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("msgQueueKey"), msgkey TSRMLS_CC);
        RETURN_TRUE;
    }
    swoole_process->push/swoole_process->popメッセージ通信
    プッシュおよび消費メッセージは、swMsgQueue_push/swMsgQueue_pop関数を利用する.
    static PHP_METHOD(swoole_process, push)
    {
        char *data;
        zend_size_t length;
    
        struct
        {
            long type;
            char data[SW_MSGMAX];
        } message;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        if (length <= 0)
        {
            swoole_php_fatal_error(E_WARNING, "the data to push is empty.");
            RETURN_FALSE;
        }
        else if (length >= sizeof(message.data))
        {
            swoole_php_fatal_error(E_WARNING, "the data to push is too big.");
            RETURN_FALSE;
        }
    
        swWorker *process = swoole_get_object(getThis());
    
        if (!process->queue)
        {
            swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use push()");
            RETURN_FALSE;
        }
    
        message.type = process->id;
        memcpy(message.data, data, length);
    
        if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0)
        {
            RETURN_FALSE;
        }
        RETURN_TRUE;
    }
    
    static PHP_METHOD(swoole_process, pop)
    {
        long maxsize = SW_MSGMAX;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &maxsize) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        if (maxsize > SW_MSGMAX || maxsize <= 0)
        {
            maxsize = SW_MSGMAX;
        }
    
        swWorker *process = swoole_get_object(getThis());
        if (!process->queue)
        {
            swoole_php_fatal_error(E_WARNING, "no msgqueue, can not use pop()");
            RETURN_FALSE;
        }
    
        struct
        {
            long type;
            char data[SW_MSGMAX];
        } message;
    
        if (process->ipc_mode == 2)
        {
            message.type = 0;
        }
        else
        {
            message.type = process->id;
        }
    
        int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize);
        if (n < 0)
        {
            RETURN_FALSE;
        }
        SW_RETURN_STRINGL(message.data, n, 1);
    }
    swoole_process::kill/swoole_process::wait
    プロセスへの信号送信killは、回収サブプロセスwaitと比較的論理的に簡単であり、対応する関数を呼び出すことである.kill以降のエラーがESRCHであれば、対応するプロセスが存在しないことを意味することに注意してください.
    static PHP_METHOD(swoole_process, kill)
    {
        long pid;
        long sig = SIGTERM;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|l", &pid, &sig) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        int ret = kill((int) pid, (int) sig);
        if (ret < 0)
        {
            if (!(sig == 0 && errno == ESRCH))
            {
                swoole_php_error(E_WARNING, "kill(%d, %d) failed. Error: %s[%d]", (int) pid, (int) sig, strerror(errno), errno);
            }
            RETURN_FALSE;
        }
        RETURN_TRUE;
    }
    
    static PHP_METHOD(swoole_process, wait)
    {
        int status;
        zend_bool blocking = 1;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|b", &blocking) == FAILURE)
        {
            RETURN_FALSE;
        }
    
        int options = 0;
        if (!blocking)
        {
            options |= WNOHANG;
        }
    
        pid_t pid = swWaitpid(-1, &status, options);
        if (pid > 0)
        {
            array_init(return_value);
            add_assoc_long(return_value, "pid", pid);
            add_assoc_long(return_value, "code", WEXITSTATUS(status));
            add_assoc_long(return_value, "signal", WTERMSIG(status));
        }
        else
        {
            RETURN_FALSE;
        }
    }
    
    static sw_inline int swWaitpid(pid_t __pid, int *__stat_loc, int __options)
    {
        int ret;
        do
        {
            ret = waitpid(__pid, __stat_loc, __options);
            if (ret < 0 && errno == EINTR)
            {
                continue;
            }
            break;
        } while(1);
        return ret;
    }