Swoftソース剖析-SwooleとSwoftのあれらの事(Task配達/タイミングタスク編)
作者:bromineリンク:https://www.jianshu.com/p/b44...出典:簡書の著作権は作者の所有であり、本文はすでに作者の許可を得て転載し、原文を再配置した.Swoft Github: https://github.com/swoft-clou...
Swoftソース剖析シリーズ目次:https://segmentfault.com/a/11...
前言
タスク配信
タスク配信
タスク実行
ここでは、
タスク実行の構想は簡単で、
特筆すべき点は、
プロセスからタスクを配信
前述したように、
言い換えれば、
データをパッケージ化した後、
簡単な回顧練習:
Commandプロセスまたはそのサブプロセスからタスクを送信
しかし、
同じプロジェクトの
タイミングタスク
手動で実行する通常のタスクに加えて、
ここでなぜSwooleのメモリTableを使うのですか?
背景の紹介が終わりました.この2つのタイミングタスクプロセスの動作を見てみましょう.
タイミングタスクのマクロ実行状況は次のとおりです.
Swoftソース剖析シリーズ目次:https://segmentfault.com/a/11...
前言
Swoft
のタスク機能は、Swoole
のTask
、またはSwoft
のTask
メカニズムの本質に基づいて、Swoole
のTask
のパッケージおよび強化である.タスク配信
//Swoft\Task\Task.php
class Task
{
/**
* Deliver coroutine or async task
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
*
* @return bool|array
* @throws TaskException
*/
public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
{
$data = TaskHelper::pack($taskName, $methodName, $params, $type);
if(!App::isWorkerStatus() && !App::isCoContext()){
return self::deliverByQueue($data);// Command
}
if(!App::isWorkerStatus() && App::isCoContext()){
throw new TaskException('Please deliver task by http!');
}
$server = App::$server->getServer();
// Delier coroutine task
if ($type == self::TYPE_CO) {
$tasks[0] = $data;
$prifleKey = 'task' . '.' . $taskName . '.' . $methodName;
App::profileStart($prifleKey);
$result = $server->taskCo($tasks, $timeout);
App::profileEnd($prifleKey);
return $result;
}
// Deliver async task
return $server->task($data);
}
}
タスク配信
Task::deliver()
は、呼び出しパラメータをパッケージ化した後、$type
パラメータに従ってSwoole
の$server->taskCo()
または$server->task()
インターフェースを介してTask
に配信する.Task
自体は常に同期して実行され、$type
は配達という操作の行為にのみ影響し、Task::TYPE_ASYNC
に対応する$server->task()
は非同期配達であり、Task::deliver()
が呼び出された後すぐに戻る.Task::TYPE_CO
に対応する$server->taskCo()
は、コンセンサス配達であり、配達後にコンセンサス制御を譲渡し、タスクの完了または実行がタイムアウトした後にTask::deliver()
がコンセンサスから戻る.タスク実行
//Swoft\Task\Bootstrap\Listeners\TaskEventListener
/**
* The listener of swoole task
* @SwooleListener({
* SwooleEvent::ON_TASK,
* SwooleEvent::ON_FINISH,
* })
*/
class TaskEventListener implements TaskInterface, FinishInterface
{
/**
* @param \Swoole\Server $server
* @param int $taskId
* @param int $workerId
* @param mixed $data
* @return mixed
* @throws \InvalidArgumentException
*/
public function onTask(Server $server, int $taskId, int $workerId, $data)
{
try {
/* @var TaskExecutor $taskExecutor*/
$taskExecutor = App::getBean(TaskExecutor::class);
$result = $taskExecutor->run($data);
} catch (\Throwable $throwable) {
App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
$result = false;
// Release system resources
App::trigger(AppEvent::RESOURCE_RELEASE);
App::trigger(TaskEvent::AFTER_TASK);
}
return $result;
}
}
ここでは、
swoole.onTask
のイベントコールバックであり、Worker
プロセスから送信されたパッケージ化されたデータをTaskExecutor
に転送するだけである.Swoole
のTask
メカニズムの本質は、Worker
が同期Task
(別名TaskWorker
)に時間のかかるタスクを配信する処理であるため、swoole.onTask
のイベントコールバックはTask
で実行される.前述したように、Worker
はあなたのほとんどのHTTP
サービスコードが実行される環境ですが、TaskEventListener.onTask()
メソッドからコードの実行環境はTask
です.つまり、TaskExecutor
と具体的なTaskBean
はTask
で実行されます.//Swoft\Task\TaskExecutor
/**
* The task executor
*
* @Bean()
*/
class TaskExecutor
{
/**
* @param string $data
* @return mixed
*/
public function run(string $data)
{
$data = TaskHelper::unpack($data);
$name = $data['name'];
$type = $data['type'];
$method = $data['method'];
$params = $data['params'];
$logid = $data['logid'] ?? uniqid('', true);
$spanid = $data['spanid'] ?? 0;
$collector = TaskCollector::getCollector();
if (!isset($collector['task'][$name])) {
return false;
}
list(, $coroutine) = $collector['task'][$name];
$task = App::getBean($name);
if ($coroutine) {
$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
} else {
$result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
}
return $result;
}
}
タスク実行の構想は簡単で、
Worker
から送られてきたデータを元の呼び出しパラメータに復元し、$name
パラメータに基づいて対応するTaskBean
を見つけ、対応するtask()
メソッドを呼び出す.ここで、TaskBean
はクラスレベル注釈@Task(name="TaskName")
または@Task("TaskName")
を使用して宣言される.特筆すべき点は、
@Task
注記には、name
属性に加えて、coroutine
属性があり、上記のコードは、このパラメータに基づいて、runCoTask()
または同期runSyncTask()
を使用してTask
を実行するように選択される.ただし、Swoole
のTask
の実行は完全に同期されているため、協程はサポートされていませんので、現在のバージョンではtrue
に設定しないでください.同様にTaskBean
に記述されたタスクコードに必要な同期ブロック、または環境に応じて非同期非ブロックおよびコヒーレンスを同期ブロックに自動的に降格させることができるプロセスからタスクを配信
前述したように、
Swoole
Task
メカニズムの本質はWorker
時間のかかるタスクを同期に送信Task
(別名TaskWorker
)処理.言い換えれば、
Swoole
の$server->taskCo()
または$server->task()
は、いずれもWorker
でしか使用できない.この制限はシーンの使用を大きく制限しています.Process
でタスクを配達できるようにするにはどうすればいいですか?Swoft
は、この制限を回避するためにTask::deliverByProcess()
の方法を提供する.その実現原理も簡単であり、Swoole
の$server->sendMessage()
法により呼び出し情報をProcess
からWorker
に配信し、次いでワークプロセスによってTask
に配信し、関連コードは以下の通りである.//Swoft\Task\Task.php
/**
* Deliver task by process
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
* @param int $workId
*
* @return bool
*/
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
/* @var PipeMessageInterface $pipeMessage */
$server = App::$server->getServer();
$pipeMessage = App::getBean(PipeMessage::class);
$data = [
'name' => $taskName,
'method' => $methodName,
'params' => $params,
'timeout' => $timeout,
'type' => $type,
];
$message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
return $server->sendMessage($message, $workId);
}
データをパッケージ化した後、
$server->sendMessage()
を使用してWorker
に配達します.//Swoft\Bootstrap\Server\ServerTrait.php
/**
* onPipeMessage event callback
*
* @param \Swoole\Server $server
* @param int $srcWorkerId
* @param string $message
* @return void
* @throws \InvalidArgumentException
*/
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
/* @var PipeMessageInterface $pipeMessage */
$pipeMessage = App::getBean(PipeMessage::class);
list($type, $data) = $pipeMessage->unpack($message);
App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}
$server->sendMessage
の後、Worker
はデータを受け取るとswoole.pipeMessage
イベントのコールバックをトリガーし、Swoft
はそれを自分のswoft.pipeMessage
イベントに変換してトリガーする.//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
* The pipe message listener
*
* @Listener(event=AppEvent::PIPE_MESSAGE)
*/
class PipeMessageListener implements EventHandlerInterface
{
/**
* @param \Swoft\Event\EventInterface $event
*/
public function handle(EventInterface $event)
{
$params = $event->getParams();
if (count($params) < 3) {
return;
}
list($type, $data, $srcWorkerId) = $params;
if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
return;
}
$type = $data['type'];
$taskName = $data['name'];
$params = $data['params'];
$timeout = $data['timeout'];
$methodName = $data['method'];
// delever task
Task::deliver($taskName, $methodName, $params, $type, $timeout);
}
}
swoft.pipeMessage
イベントは、最終的にPipeMessageListener
によって処理される.関連する傍受では、swoft.pipeMessage
イベントがTask::deliverByProcess()
によって生成されたことが発見された場合、Worker
は、Task::deliver()
の代わりにTaskWorker
を実行し、最終的にはTask::deliverByProcess()
にタスクデータを配信する.簡単な回顧練習:
TaskBean
からCommand
まで最終的にタスクを実行し、どのプロセスを経験し、呼び出しチェーンのどの部分がそれぞれどのプロセスで実行されますか?Commandプロセスまたはそのサブプロセスからタスクを送信
//Swoft\Task\QueueTask.php
/**
* @param string $data
* @param int $taskWorkerId
* @param int $srcWorkerId
*
* @return bool
*/
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
if ($taskWorkerId === null) {
$taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
}
if ($srcWorkerId === null) {
$srcWorkerId = mt_rand(0, $this->workerNum - 1);
}
$this->check();
$data = $this->pack($data, $srcWorkerId);
$result = \msg_send($this->queueId, $taskWorkerId, $data, false);
if (!$result) {
return false;
}
return true;
}
Process
プロセスのタスク配信では、状況はさらに複雑になります.上記のHttp/Rpc
は、Manager
サービスから派生することが多く、同じSwoole\Server
の子孫プロセスとして、$server->sendMessage()
のハンドル変数を得ることができ、$server->task()
、Swoft
などの方法でタスク配信を行うことができる.しかし、
Command
のシステムでは、Command
という非常に道行く人の役があります.shell
のプロセスは、cronb
またはHttp/Rpc
から独立して開始され、Command
のサービスに関連するプロセスとは親縁関係がない.したがって、Command
プロセスおよびProcess
から開始されたSwoole\Server
プロセスは、UnixSocket
の呼び出しハンドルを直接Swoft
を介してタスク配達を行うことはできない.このプロセスにタスク配信サポートを提供するために、Swoole
は、Task
のCommand
の特殊な機能であるメッセージキューを利用する.同じプロジェクトの
Http\RpcServer
およびmessage_queue_key
は、システムカーネル内の同じメッセージキューを1つのComand
によって取得することを約束し、Task
プロセスは、メッセージキューを介してTask::deliver()
にタスクを配信することができる.このメカニズムは、外部に開示された方法を提供せず、Swoft
方法にのみ含まれ、Semaphore
は、現在の環境に従って暗黙的に配達方式を切り替える.しかし、このメッセージキューの実装はPHP
の拡張に依存し、使用したい場合は、--enable-sysvmsg
をコンパイルするときにSwoft
パラメータを追加する必要があります.タイミングタスク
手動で実行する通常のタスクに加えて、
Crontab
はLinuxのSwoft
の代わりにプロジェクトで使用するための精度秒のタイミングタスク機能を提供する.Process
は、2つの前置CronTimerProcess
---タスク計画プロセス:CronExecProcess
とタスク実行プロセスRunTimeTable
と、2つのメモリデータテーブル---OriginTable
(タスク(構成)テーブル)Swoft
((タスク)実行テーブル)を使用して、タイミングタスクの管理スケジューリングを行います.\\Swoft\Task\Crontab\TableCrontab.php
/**
* ,
* , `rule`,`taskClass`,`taskMethod` key
* @var array $originStruct
*/
private $originStruct = [
'rule' => [\Swoole\Table::TYPE_STRING, 100],// , @Scheduled cron
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],// @Task name ( )
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task , @Scheduled
'add_time' => [\Swoole\Table::TYPE_STRING, 11],// 10
];
/**
* ,
* , `taskClass`,`taskMethod`,`minute`,`sec` key
* @var array $runTimeStruct
*/
private $runTimeStruct = [
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//
'minute' => [\Swoole\Table::TYPE_STRING, 20],// , date('YmdHi')
'sec' => [\Swoole\Table::TYPE_STRING, 20],// , 10
'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],// , 0( ) 1( ) 2( ) 。
// : , , ` ` , _ _,_ _,_ _ 。
];
ここでなぜSwooleのメモリTableを使うのですか?
array()
のタイミングタスク管理は、それぞれタスク計画プロセスとタスク実行プロセスによって担当される.2つのプロセスの実行はタイミングタスクを共同で管理し、プロセス間の独立したTable
などの構造を使用すると、2つのプロセスは必然的に頻繁なプロセス間通信を必要とします.一方、プロセス間でのデータ共有は、プロセス間でのTable
(本明細書のSwoole
、特に説明しない限り、Swoole\Table
のTable
構造を指す)を用いて直接行われ、性能が高いだけでなく、操作が簡単であり、2つのプロセスをデカップリングした.Table
が2つのプロセス間で共通に使用できるようにするには、Swoole Server
が起動する前にメモリを作成して割り当てる必要があります.具体的なコードはSwoft\Task\Bootstrap\Listeners->onBeforeStart()
で、比較的簡単で、興味のある人は自分で読むことができます.背景の紹介が終わりました.この2つのタイミングタスクプロセスの動作を見てみましょう.
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
* Crontab timer process
*
* @Process(name="cronTimer", boot=true)
*/
class CronTimerProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
//code....
/* @var \Swoft\Task\Crontab\Crontab $cron*/
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$time = (60 - date('s')) * 1000;
$server->after($time, function () use ($server, $cron) {
// Every minute check all tasks, and prepare the tasks that next execution point needs
$cron->checkTask();
$server->tick(60 * 1000, function () use ($cron) {
$cron->checkTask();
});
});
}
}
//Swoft\Task\Crontab\Crontab.php
/**
* runTimeTable
*
* @param array $task
* @param array $parseResult crontab , Task
* @return bool
*/
private function initRunTimeTableData(array $task, array $parseResult): bool
{
$runTimeTableTasks = $this->getRunTimeTable()->table;
$min = date('YmdHi');
$sec = strtotime(date('Y-m-d H:i'));
foreach ($parseResult as $time) {
$this->checkTaskQueue(false);
$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
$runTimeTableTasks->set($key, [
'taskClass' => $task['taskClass'],
'taskMethod' => $task['taskMethod'],
'minute' => $min,
'sec' => $time + $sec,
'runStatus' => self::NORMAL
]);
}
return true;
}
CronTimerProcess
は、Swoft
のタイミングタスクスケジューリングプロセスであり、そのコアメソッドはCrontab->initRunTimeTableData()
である.このプロセスは、Swoole
のタイマ機能を使用し、Swoole\Timer
が毎分の最初の秒に実行するコールバックによって、CronTimerProcess
が起動されるたびにタスクテーブルを巡回して現在の1分以内の60秒にそれぞれ実行する必要があるタスクリストを計算し、実行テーブルに書き込み、未実行とマークする.//Swoft\Task\Bootstrap\Process
/**
* Crontab process
*
* @Process(name="cronExec", boot=true)
*/
class CronExecProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
$pname = App::$server->getPname();
$process->name(sprintf('%s cronexec process', $pname));
/** @var \Swoft\Task\Crontab\Crontab $cron */
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$server->tick(0.5 * 1000, function () use ($cron) {
$tasks = $cron->getExecTasks();
if (!empty($tasks)) {
foreach ($tasks as $task) {
// Diliver task
Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
$cron->finishTask($task['key']);
}
}
});
}
}
CronExecProcess
は、タイミングタスクの実行者として、Swoole\Timer
を介して0.5s
毎に自身を呼び覚まし、
を1回遍歴し、現在実行する必要があるタスクを選択し、sendMessage()
を介してそのタスク実行テーブルの状態を送信し、更新する.この実行プロセスは、タスクの配信のみを担当し、タスクの実際の実行はTask
においてTaskExecutor
によって処理される.タイミングタスクのマクロ実行状況は次のとおりです.