Swoftソース剖析-SwooleとSwoftのあれらの事(Task配達/タイミングタスク編)

17462 ワード

作者:bromineリンク:https://www.jianshu.com/p/b44...出典:簡書の著作権は作者の所有であり、本文はすでに作者の許可を得て転載し、原文を再配置した.Swoft Github: https://github.com/swoft-clou...
Swoftソース剖析シリーズ目次:https://segmentfault.com/a/11...
前言Swoftのタスク機能は、SwooleTask 、またはSwoftTaskメカニズムの本質に基づいて、SwooleTask のパッケージおよび強化である.
タスク配信
//Swoft\Task\Task.php
class Task
{
    /**
     * Deliver coroutine or async task
     *
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     *
     * @return bool|array
     * @throws TaskException
     */
    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
    {
        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

        if(!App::isWorkerStatus() && !App::isCoContext()){
            return self::deliverByQueue($data);//   Command  
        }

        if(!App::isWorkerStatus() && App::isCoContext()){
            throw new TaskException('Please deliver task by http!');
        }


        $server = App::$server->getServer();
        // Delier coroutine task
        if ($type == self::TYPE_CO) {
            $tasks[0]  = $data;
            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

            App::profileStart($prifleKey);
            $result = $server->taskCo($tasks, $timeout);
            App::profileEnd($prifleKey);

            return $result;
        }

        // Deliver async task
        return $server->task($data);
    }
}

タスク配信Task::deliver()は、呼び出しパラメータをパッケージ化した後、$typeパラメータに従ってSwoole$server->taskCo()または$server->task()インターフェースを介してTask に配信する.Task自体は常に同期して実行され、$typeは配達という操作の行為にのみ影響し、Task::TYPE_ASYNCに対応する$server->task()は非同期配達であり、Task::deliver()が呼び出された後すぐに戻る.Task::TYPE_COに対応する$server->taskCo()は、コンセンサス配達であり、配達後にコンセンサス制御を譲渡し、タスクの完了または実行がタイムアウトした後にTask::deliver()がコンセンサスから戻る.
タスク実行
//Swoft\Task\Bootstrap\Listeners\TaskEventListener 
/**
 * The listener of swoole task
 * @SwooleListener({
 *     SwooleEvent::ON_TASK,
 *     SwooleEvent::ON_FINISH,
 * })
 */
class TaskEventListener implements TaskInterface, FinishInterface
{
    /**
     * @param \Swoole\Server $server
     * @param int            $taskId
     * @param int            $workerId
     * @param mixed          $data
     * @return mixed
     * @throws \InvalidArgumentException
     */
    public function onTask(Server $server, int $taskId, int $workerId, $data)
    {
        try {
            /* @var TaskExecutor $taskExecutor*/
            $taskExecutor = App::getBean(TaskExecutor::class);
            $result = $taskExecutor->run($data);
        } catch (\Throwable $throwable) {
            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
            $result = false;

            // Release system resources
            App::trigger(AppEvent::RESOURCE_RELEASE);

            App::trigger(TaskEvent::AFTER_TASK);
        }
        return $result;
    }
}

ここでは、swoole.onTaskのイベントコールバックであり、Workerプロセスから送信されたパッケージ化されたデータをTaskExecutorに転送するだけである.SwooleTaskメカニズムの本質は、Worker が同期Task (別名TaskWorker)に時間のかかるタスクを配信する処理であるため、swoole.onTaskのイベントコールバックはTask で実行される.前述したように、Worker はあなたのほとんどのHTTPサービスコードが実行される環境ですが、TaskEventListener.onTask()メソッドからコードの実行環境はTask です.つまり、TaskExecutorと具体的なTaskBeanTask で実行されます.
//Swoft\Task\TaskExecutor
/**
 * The task executor
 *
 * @Bean()
 */
class TaskExecutor
{
    /**
     * @param string $data
     * @return mixed
    */
    public function run(string $data)
    {
        $data = TaskHelper::unpack($data);

        $name   = $data['name'];
        $type   = $data['type'];
        $method = $data['method'];
        $params = $data['params'];
        $logid  = $data['logid'] ?? uniqid('', true);
        $spanid = $data['spanid'] ?? 0;


        $collector = TaskCollector::getCollector();
        if (!isset($collector['task'][$name])) {
            return false;
        }

        list(, $coroutine) = $collector['task'][$name];
        $task = App::getBean($name);
        if ($coroutine) {
            $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
        } else {
            $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
        }

        return $result;
    }
}

タスク実行の構想は簡単で、Worker から送られてきたデータを元の呼び出しパラメータに復元し、$nameパラメータに基づいて対応するTaskBeanを見つけ、対応するtask()メソッドを呼び出す.ここで、TaskBeanはクラスレベル注釈@Task(name="TaskName")または@Task("TaskName")を使用して宣言される.
特筆すべき点は、@Task注記には、name属性に加えて、coroutine属性があり、上記のコードは、このパラメータに基づいて、runCoTask()または同期runSyncTask()を使用してTaskを実行するように選択される.ただし、SwooleTask の実行は完全に同期されているため、協程はサポートされていませんので、現在のバージョンではtrueに設定しないでください.同様にTaskBeanに記述されたタスクコードに必要な同期ブロック、または環境に応じて非同期非ブロックおよびコヒーレンスを同期ブロックに自動的に降格させることができる
プロセスからタスクを配信
前述したように、SwooleTaskメカニズムの本質はWorker 時間のかかるタスクを同期に送信Task (別名TaskWorker)処理.
言い換えれば、Swoole$server->taskCo()または$server->task()は、いずれもWorker でしか使用できない.この制限はシーンの使用を大きく制限しています.Processでタスクを配達できるようにするにはどうすればいいですか?Swoftは、この制限を回避するためにTask::deliverByProcess()の方法を提供する.その実現原理も簡単であり、Swoole$server->sendMessage()法により呼び出し情報をProcessからWorker に配信し、次いでワークプロセスによってTask に配信し、関連コードは以下の通りである.
//Swoft\Task\Task.php
/**
 * Deliver task by process
 *
 * @param string $taskName
 * @param string $methodName
 * @param array  $params
 * @param string $type
 * @param int    $timeout
 * @param int    $workId
 *
 * @return bool
 */
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
    /* @var PipeMessageInterface $pipeMessage */
    $server      = App::$server->getServer();
    $pipeMessage = App::getBean(PipeMessage::class);
    $data = [
        'name'    => $taskName,
        'method'  => $methodName,
        'params'  => $params,
        'timeout' => $timeout,
        'type'    => $type,
    ];

    $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
    return $server->sendMessage($message, $workId);
}

データをパッケージ化した後、$server->sendMessage()を使用してWorkerに配達します.
//Swoft\Bootstrap\Server\ServerTrait.php
/**
 * onPipeMessage event callback
 *
 * @param \Swoole\Server $server
 * @param int            $srcWorkerId
 * @param string         $message
 * @return void
 * @throws \InvalidArgumentException
 */
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
    /* @var PipeMessageInterface $pipeMessage */
    $pipeMessage = App::getBean(PipeMessage::class);
    list($type, $data) = $pipeMessage->unpack($message);

    App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}
$server->sendMessageの後、Worker はデータを受け取るとswoole.pipeMessageイベントのコールバックをトリガーし、Swoftはそれを自分のswoft.pipeMessageイベントに変換してトリガーする.
//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
 * The pipe message listener
 *
 * @Listener(event=AppEvent::PIPE_MESSAGE)
 */
class PipeMessageListener implements EventHandlerInterface
{
    /**
     * @param \Swoft\Event\EventInterface $event
     */
    public function handle(EventInterface $event)
    {
        $params = $event->getParams();
        if (count($params) < 3) {
            return;
        }

        list($type, $data, $srcWorkerId) = $params;

        if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
            return;
        }

        $type       = $data['type'];
        $taskName   = $data['name'];
        $params     = $data['params'];
        $timeout    = $data['timeout'];
        $methodName = $data['method'];

        // delever task
        Task::deliver($taskName, $methodName, $params, $type, $timeout);
    }
}
swoft.pipeMessageイベントは、最終的にPipeMessageListenerによって処理される.関連する傍受では、swoft.pipeMessageイベントがTask::deliverByProcess()によって生成されたことが発見された場合、Worker は、Task::deliver()の代わりにTaskWorker を実行し、最終的にはTask::deliverByProcess()にタスクデータを配信する.
簡単な回顧練習:TaskBeanからCommandまで最終的にタスクを実行し、どのプロセスを経験し、呼び出しチェーンのどの部分がそれぞれどのプロセスで実行されますか?
Commandプロセスまたはそのサブプロセスからタスクを送信
//Swoft\Task\QueueTask.php
/**
 * @param string $data
 * @param int    $taskWorkerId
 * @param int    $srcWorkerId
 *
 * @return bool
 */
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
    if ($taskWorkerId === null) {
        $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
    }

    if ($srcWorkerId === null) {
        $srcWorkerId = mt_rand(0, $this->workerNum - 1);
    }

    $this->check();
    $data   = $this->pack($data, $srcWorkerId);
    $result = \msg_send($this->queueId, $taskWorkerId, $data, false);
    if (!$result) {
        return false;
    }

    return true;
}
Processプロセスのタスク配信では、状況はさらに複雑になります.上記のHttp/Rpcは、Managerサービスから派生することが多く、同じSwoole\Serverの子孫プロセスとして、$server->sendMessage()のハンドル変数を得ることができ、$server->task()Swoftなどの方法でタスク配信を行うことができる.
しかし、Commandのシステムでは、Commandという非常に道行く人の役があります.shellのプロセスは、cronbまたはHttp/Rpcから独立して開始され、Commandのサービスに関連するプロセスとは親縁関係がない.したがって、CommandプロセスおよびProcessから開始されたSwoole\Serverプロセスは、UnixSocketの呼び出しハンドルを直接Swoftを介してタスク配達を行うことはできない.このプロセスにタスク配信サポートを提供するために、Swooleは、Task Commandの特殊な機能であるメッセージキューを利用する.
同じプロジェクトのHttp\RpcServerおよびmessage_queue_keyは、システムカーネル内の同じメッセージキューを1つのComandによって取得することを約束し、Task プロセスは、メッセージキューを介してTask::deliver()にタスクを配信することができる.このメカニズムは、外部に開示された方法を提供せず、Swoft方法にのみ含まれ、Semaphoreは、現在の環境に従って暗黙的に配達方式を切り替える.しかし、このメッセージキューの実装はPHPの拡張に依存し、使用したい場合は、--enable-sysvmsgをコンパイルするときにSwoftパラメータを追加する必要があります.
タイミングタスク
手動で実行する通常のタスクに加えて、CrontabはLinuxのSwoftの代わりにプロジェクトで使用するための精度秒のタイミングタスク機能を提供する.Processは、2つの前置CronTimerProcess---タスク計画プロセス:CronExecProcessとタスク実行プロセスRunTimeTableと、2つのメモリデータテーブル---OriginTable(タスク(構成)テーブル)Swoft((タスク)実行テーブル)を使用して、タイミングタスクの管理スケジューリングを行います.
\\Swoft\Task\Crontab\TableCrontab.php
/**
 *    ,           
 *             ,  `rule`,`taskClass`,`taskMethod`  key        
 * @var array $originStruct 
 */
private $originStruct = [
    'rule'       => [\Swoole\Table::TYPE_STRING, 100],//        ,  @Scheduled   cron  
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//      @Task name  (     )
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task  ,  @Scheduled      
    'add_time'   => [\Swoole\Table::TYPE_STRING, 11],//         10    
];

/**
 *    ,                    
 *             ,  `taskClass`,`taskMethod`,`minute`,`sec`  key        
 * @var array $runTimeStruct 
 */
private $runTimeStruct = [
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//  
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//  
    'minute'      => [\Swoole\Table::TYPE_STRING, 20],//         ,        date('YmdHi')
    'sec'        => [\Swoole\Table::TYPE_STRING, 20],//         ,      10    
    'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],//    ,  0(   )  1(   )  2(   )   。 
    //  :               ,                ,   `    `       ,        _   _,_   _,_   _      。
];

ここでなぜSwooleのメモリTableを使うのですか?array()のタイミングタスク管理は、それぞれタスク計画プロセスとタスク実行プロセスによって担当される.2つのプロセスの実行はタイミングタスクを共同で管理し、プロセス間の独立したTableなどの構造を使用すると、2つのプロセスは必然的に頻繁なプロセス間通信を必要とします.一方、プロセス間でのデータ共有は、プロセス間でのTable(本明細書のSwoole、特に説明しない限り、Swoole\TableTable構造を指す)を用いて直接行われ、性能が高いだけでなく、操作が簡単であり、2つのプロセスをデカップリングした.Tableが2つのプロセス間で共通に使用できるようにするには、Swoole Serverが起動する前にメモリを作成して割り当てる必要があります.具体的なコードはSwoft\Task\Bootstrap\Listeners->onBeforeStart()で、比較的簡単で、興味のある人は自分で読むことができます.
背景の紹介が終わりました.この2つのタイミングタスクプロセスの動作を見てみましょう.
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
 * Crontab timer process
 *
 * @Process(name="cronTimer", boot=true)
 */
class CronTimerProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        //code....
        /* @var \Swoft\Task\Crontab\Crontab $cron*/
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $time = (60 - date('s')) * 1000;
        $server->after($time, function () use ($server, $cron) {
            // Every minute check all tasks, and prepare the tasks that next execution point needs
            $cron->checkTask();
            $server->tick(60 * 1000, function () use ($cron) {
                $cron->checkTask();
            });
        });
    }
}
//Swoft\Task\Crontab\Crontab.php
/**
 *    runTimeTable  
 *
 * @param array $task          
 * @param array $parseResult   crontab      , Task              
 * @return bool
 */
private function initRunTimeTableData(array $task, array $parseResult): bool
{
    $runTimeTableTasks = $this->getRunTimeTable()->table;

    $min = date('YmdHi');
    $sec = strtotime(date('Y-m-d H:i'));
    foreach ($parseResult as $time) {
        $this->checkTaskQueue(false);
        $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
        $runTimeTableTasks->set($key, [
            'taskClass'  => $task['taskClass'],
            'taskMethod' => $task['taskMethod'],
            'minute'     => $min,
            'sec'        => $time + $sec,
            'runStatus'  => self::NORMAL
        ]);
    }

    return true;
}
CronTimerProcessは、Swoftのタイミングタスクスケジューリングプロセスであり、そのコアメソッドはCrontab->initRunTimeTableData()である.このプロセスは、Swooleのタイマ機能を使用し、Swoole\Timerが毎分の最初の秒に実行するコールバックによって、CronTimerProcessが起動されるたびにタスクテーブルを巡回して現在の1分以内の60秒にそれぞれ実行する必要があるタスクリストを計算し、実行テーブルに書き込み、未実行とマークする.
//Swoft\Task\Bootstrap\Process
/**
 * Crontab process
 *
 * @Process(name="cronExec", boot=true)
 */
class CronExecProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        $pname = App::$server->getPname();
        $process->name(sprintf('%s cronexec process', $pname));

        /** @var \Swoft\Task\Crontab\Crontab $cron */
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $server->tick(0.5 * 1000, function () use ($cron) {
            $tasks = $cron->getExecTasks();
            if (!empty($tasks)) {
                foreach ($tasks as $task) {
                    // Diliver task
                    Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
                    $cron->finishTask($task['key']);
                }
            }
        });
    }
}
CronExecProcessは、タイミングタスクの実行者として、Swoole\Timerを介して0.5s毎に自身を呼び覚まし、 を1回遍歴し、現在実行する必要があるタスクを選択し、sendMessage()を介してそのタスク実行テーブルの状態を送信し、更新する.この実行プロセスは、タスクの配信のみを担当し、タスクの実際の実行はTask においてTaskExecutorによって処理される.
タイミングタスクのマクロ実行状況は次のとおりです.