Laravel5.2キュー駆動expireパラメータ設定による繰返し実行問題データベース駆動
10136 ワード
'connections' => [
....
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
],
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'expire' => 180,
],
....
],
Laravel5.2キュー駆動config/queue.phpプロファイル、database、redisにはexpireパラメータがあり、マニュアルでは「キュータスクの有効期限(秒)」と解釈され、デフォルトは60秒です.
(注:5.2以降のプロファイルが変更され、'try_after'パラメータに変更されました.詳細はマニュアルを参照してください)
ネット上でこの構成を検索しても、あまり説明されていませんが、実際に使用している間に、expire設定時間を超えるキュープロセスを実行したり、キューを使用して分散プログラムを配置したりすることがわかります.このパラメータとこの設計モードは大きな穴です.
この問題は、分散プログラムを使用して処理キューを配置し、2台のサーバがLaravelフレームワークartisanスクリプトを配置し、MYSQLデータベースに接続し、jobsキューのリストを使用することです.
配備後、それぞれ2台のサーバーのスクリプトを起動し、発見後に実行したスクリプトは、MYSQLのjobsテーブルのようなキュー駆動でデータを取得し、先に実行したスクリプトキューデータに遭遇したときにスキップするのではなく、このデータをFailedと見なし、新しいデータをfailed_に格納する.jobsテーブル(Laravelキューが失敗するとfailed_jobsテーブルにキューデータが格納され、データが重複します.
前に1台のサーバで3つのプロセス実行スクリプトを起動しても、このようなエラーは発生しません.後に実行されるスクリプトは前のプロセスのキューデータを取得しません.Failedとは判断されません.マルチサービス処理時にキュー駆動中のデータエラーが発生した原因は何ですか.
キューが実行するプロセスに基づいて、プログラムが実行されると、キューはキュードライバにタスクを取り、タスクを取得するプロセスキュードライバは作業物処理を行うべきであり、このように2番目のプロセスタスクは実行中のキューデータをスキップする.
いくつかの資料を調べて、Laravelキューの原理を理解して、最後にQueueのソースコードを見なければなりません.
LaravelのQueueのソースコードはすべてIlluminateQueueディレクトリの下にあります.
まず、MYSQLを駆動するjobsテーブルを分析します.
CREATE TABLE `jobs` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`queue` varchar(255) COLLATE utf8_unicode_ci NOT NULL,
`payload` longtext COLLATE utf8_unicode_ci NOT NULL,
`attempts` tinyint(3) unsigned NOT NULL,
`reserved` tinyint(3) unsigned NOT NULL,
`reserved_at` int(10) unsigned DEFAULT NULL,
`available_at` int(10) unsigned NOT NULL,
`created_at` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`),
KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
マニュアルでは主にキュータスクの保存について説明しています.payloadフィールドにはシーケンス化されたタスクが格納されています.Laravelキューはデータモデルをシーケンス化することができ、実行時にキューシステムは自動的にデータベースからモデルインスタンス全体を取得します.具体的な説明はマニュアルを参照してください.
しかし、他のいくつかのステータスと時間フィールドこそ、キュー物事の処理を保証する重要なフィールドです.
「attempts」実行回数、「reserved」実行状態、「reserved_at」実行開始時間、「available_at’サブスクリプション実行時間,‘created_at’はキュー作成時間である.
イベントをリスニングするスクリプトにはListenerがあります.phpとWorkerphpの2つのスクリプトは、Listenerが指定したキュー、connectionパラメータを処理できることをソースコードで説明していますが、実際にはworkでキューを処理しています.Laravel5.4 queue:listenパラメータはキャンセルされ、queue:workで実行されます.でもここで言ったのはLaravel 5です.2の問題は、次の理由でLaravel最適化がlistenを取り除いたのか分からない.
キュー処理のWorkerクラスのソースコードの解析を続行し、キューデータを取得するときにpopメソッドを使用します.このメソッドは、databaseやredisなどの転送された駆動タイプに基づいて、その駆動のpopメソッドを呼び出します.
$connection = $this->manager->connection($connectionName);
$job = $this->getNextJob($connection, $queue);
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if (! is_null($job)) {
return $this->process(
$this->manager->getName($connectionName), $job, $maxTries, $delay
);
}
次はDatabaseQueueです.phpのpop法.
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
$job = $this->markJobAsReserved($job);
$this->database->commit();
return new DatabaseJob(
$this->container, $this, $job, $queue
);
}
$this->database->commit();
}
データを取得するプロセス物事処理が開かれています.
キュー・データのコアは$this->getNextavailableJob($queue)です.
sqlログを開き、キューデータがどのようにクエリーされているかを確認します.
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
{
$this->database->enableQueryLog();
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
var_dump($this->database->getQueryLog());
return $job ? (object) $job : null;
}
array(1) {
[0] =>
array(3) {
'query' =>
string(165) "select * from `jobs` where `queue` = ? and ((`reserved` = ? and `available_at` <= ?) or (`reserved` = ? and `reserved_at` <= ?)) order by `id` asc limit 1 for update"
'bindings' =>
array(5) {
[0] =>
string(7) "default"
[1] =>
int(0)
[2] =>
int(1493634233)
[3] =>
int(1)
[4] =>
int(1493634173)
}
'time' =>
double(1.55)
}
sql文からキューデータを取得するには2つの条件があることがわかります
reservedが0の場合、available_at時間は現在の時間より小さく、この条件は実行されるキューである.reservedが1の場合、reserved_at実行開始時間は計算された時間($this->isReservedButExpired)より小さく、すなわち現在の時間からタイムアウト秒を減算するCarbon::now()->subSeconds($this->expire)->gettimestamp()であり、キュータスクが期限切れであるかどうかを判断する.
selectプロセス全体が「for update」で、排他ロックがあります.
条件を満たすキューを取得した後
/**
* Mark the given job ID as reserved.
*
* @param \stdClass $job
* @return \stdClass
*/
protected function markJobAsReserved($job)
{
$job->reserved = 1;
$job->attempts = $job->attempts + 1;
$job->reserved_at = $this->getTime();
$this->database->table($this->table)->where('id', $job->id)->update([
'reserved' => $job->reserved,
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
]);
return $job;
}
プログラムはこのデータを更新し、更新が完了するとcommitになります.
同じサーバでは、2番目のプロセスがデータを取得する際に悲観的なロックに遭遇し、1番目のプロセスがデータを取得してreservedと時間を更新した後に実行する必要があります.つまり、Laravelキューでdatabaseを使用する場合、同時実行プロセスは複数のデータを同時に取得するのではなく、同じデータを取得して1つのプロセスupdateデータの状態と実行時間を待つので、キューがデータを取得した後に最初の操作が更新されるので、キューが期限切れにならない限り、2番目のプロセスは1番目のプロセスの同じデータを取得しません.
DatabaseQueue.phpのpopメソッドでは,キューデータを取得すると,「$this->database->commit();」前sleep(10)では、第2のキューが他のキューデータを取得していないことが明らかになり、「for update」はupdateレベルの排他ロックにすぎず、selectを排斥しないことを示しています.
Laravelはdatabaseキューを使用するとブロック現象がある場合がありますが、その原因かどうか分かりません.
実行時間が長すぎて「expire」パラメータ設定時間を超えると、第2のキューは第1のキューデータを取得し、タイムアウトを判断します.この場合、設定した最大実行回数triesに基づいて、新しいキューデータを挿入して実行を継続するか、エラーキュー「failed_jobs」テーブルに挿入してキュー実行に失敗したかを判断します.
以上がLaravelがmysqlを使用してキューを実行する論理であり、前に述べた2台のサーバ配置Laravelフレームワークはartisanスクリプトを実行し、1つのjobsテーブルキューFailedの問題はサーバ時間が一致しない原因であり、後のサーバ実行時に前のキューデータをタイムアウトと判断して「failed_jobs」の新しいデータに挿入し、最大失敗回数に達した場合である.そうでない場合は、新しいデータを挿入して試行を続行します.
したがってqueue:listenの実行時間パラメータであるtimeout=60は、キュータスクの有効期限より小さいexpireパラメータを設定する必要があります。
そして、Laravel 5.2のqueue:workには--timeout=60というパラメータはありません。。。
最後にキュー実行済みの処理ロジックです.
キューの実行に成功するとjobsのデータが削除されます.これは問題ありません.タイムアウト、例外などが失敗した場合、設定した最大失敗回数に基づいて新しいデータを挿入するか、Failedデータをfailed_jobsテーブルに挿入するかを判断します.
エラーが発生した場合、handleJobExceptionの例外処理はDatabaseQueueを呼び出します.phpのreleaseメソッド,$job->release($delay),最終的にpushToDatabase実装である.
新しいデータを挿入する場合、attemptsは失敗回数、reservedは0、available_atは、現在のタイムスタンプに遅延時間パラメータを加え、キュー処理全体が完全なデータ論理動作を形成する.
Laravel5.4キュー機能を大きく変更しました.マニュアルのヒント
タスクの有効期限とタイムアウト
タスク実行時間
コンフィギュレーションファイルconfig/queue.phpでは、各接続にretry_が定義されています.after項.この構成項目の目的は、タスクが実行されてから数秒後にキューに戻ることを定義することです.retry_afterが設定した値は90で、タスクが90秒実行された後も完了していない場合は、削除するのではなくキューに戻されます.間違いなくretry_afterの値は、タスク実行時間の最大可能値に設定されます.
Laravel5.4 queueのlistenコマンドを削除し、workにもタイムアウトパラメータを追加しました.Laravel5.5が出るときはアップグレードすべきです.
付録:Laravel 5.2テストのシナリオは、以前はネット上で検索されたのが早いので、jobをコマンドに書く方法ですが、実は5.2以降はjobの使用が簡単です.
jobsの下でjobタスクを定義し、handleはいくつかのテストスキームを追加することができます.例えば、私のような異常を投げ出して、直接Failedの
class MyJob extends Job implements ShouldQueue
{
use InteractsWithQueue, SerializesModels;
private $key;
private $value;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($key, $value)
{
$this->key = $key;
$this->value = $value;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
for($i=0;$i<20;$i++){
echo "$i
";
sleep(1);
}
echo "sss\t".$this->key."\t".date("Y-m-d H:i:s")."
";
throw new \Exception("
");
// Redis::hset('queue.test', $this->key, $this->value);
}
public function failed()
{
dump('failed');
}
}
コントローラはタスクキューの設定にアクセスし、keyとvalueの前にテストredisで挿入し、自分のテストスキームでjobパラメータを設定することができます.
for($i = 0; $i < 5; $i ++) {
echo "$i";
$job = (new MyJob($i, $i))->delay(20);
$this->dispatch($job);
}
私の例では5つのキューを設定し、複数のshellを開いてartisanテストを実行しましょう.
本当はredisキューコードを読み終えて、一緒に出したかったのですが、最近用事が多すぎて、redisコードもあまり見ていません.
redisドライバはhttp://www.cnblogs.com/z12987...の記事を参照してLaravelキューredisドライバロジックについて詳しく説明します.redisドライバで使用されるlistとzset構造の格納キューは、実行中に転送キューが削除され、データベースの「for update」操作がないため、キューブロックに格納されているわけではありません.
BUTキュータスクの有効期限設定はデータベースドライバと同じなので
Queue:listenの実行時間パラメータであるtimeout=60は、キュータスクの有効期限より小さいexpireパラメータを設定する必要があります。
やっと書き終わりました...