Laravel5.2キュー駆動expireパラメータ設定による繰返し実行問題データベース駆動

10136 ワード

  'connections' => [
    ....
        'database' => [
            'driver' => 'database',
            'table' => 'jobs',
            'queue' => 'default',
            'expire' => 60,
        ],
        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => 'default',
            'expire' => 180,
        ],
    ....
    ],

Laravel5.2キュー駆動config/queue.phpプロファイル、database、redisにはexpireパラメータがあり、マニュアルでは「キュータスクの有効期限(秒)」と解釈され、デフォルトは60秒です.
(注:5.2以降のプロファイルが変更され、'try_after'パラメータに変更されました.詳細はマニュアルを参照してください)
ネット上でこの構成を検索しても、あまり説明されていませんが、実際に使用している間に、expire設定時間を超えるキュープロセスを実行したり、キューを使用して分散プログラムを配置したりすることがわかります.このパラメータとこの設計モードは大きな穴です.
この問題は、分散プログラムを使用して処理キューを配置し、2台のサーバがLaravelフレームワークartisanスクリプトを配置し、MYSQLデータベースに接続し、jobsキューのリストを使用することです.
配備後、それぞれ2台のサーバーのスクリプトを起動し、発見後に実行したスクリプトは、MYSQLのjobsテーブルのようなキュー駆動でデータを取得し、先に実行したスクリプトキューデータに遭遇したときにスキップするのではなく、このデータをFailedと見なし、新しいデータをfailed_に格納する.jobsテーブル(Laravelキューが失敗するとfailed_jobsテーブルにキューデータが格納され、データが重複します.
前に1台のサーバで3つのプロセス実行スクリプトを起動しても、このようなエラーは発生しません.後に実行されるスクリプトは前のプロセスのキューデータを取得しません.Failedとは判断されません.マルチサービス処理時にキュー駆動中のデータエラーが発生した原因は何ですか.
キューが実行するプロセスに基づいて、プログラムが実行されると、キューはキュードライバにタスクを取り、タスクを取得するプロセスキュードライバは作業物処理を行うべきであり、このように2番目のプロセスタスクは実行中のキューデータをスキップする.
いくつかの資料を調べて、Laravelキューの原理を理解して、最後にQueueのソースコードを見なければなりません.
LaravelのQueueのソースコードはすべてIlluminateQueueディレクトリの下にあります.
まず、MYSQLを駆動するjobsテーブルを分析します.
CREATE TABLE `jobs` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) COLLATE utf8_unicode_ci NOT NULL,
  `payload` longtext COLLATE utf8_unicode_ci NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`),
  KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

マニュアルでは主にキュータスクの保存について説明しています.payloadフィールドにはシーケンス化されたタスクが格納されています.Laravelキューはデータモデルをシーケンス化することができ、実行時にキューシステムは自動的にデータベースからモデルインスタンス全体を取得します.具体的な説明はマニュアルを参照してください.
しかし、他のいくつかのステータスと時間フィールドこそ、キュー物事の処理を保証する重要なフィールドです.
「attempts」実行回数、「reserved」実行状態、「reserved_at」実行開始時間、「available_at’サブスクリプション実行時間,‘created_at’はキュー作成時間である.
イベントをリスニングするスクリプトにはListenerがあります.phpとWorkerphpの2つのスクリプトは、Listenerが指定したキュー、connectionパラメータを処理できることをソースコードで説明していますが、実際にはworkでキューを処理しています.Laravel5.4 queue:listenパラメータはキャンセルされ、queue:workで実行されます.でもここで言ったのはLaravel 5です.2の問題は、次の理由でLaravel最適化がlistenを取り除いたのか分からない.
キュー処理のWorkerクラスのソースコードの解析を続行し、キューデータを取得するときにpopメソッドを使用します.このメソッドは、databaseやredisなどの転送された駆動タイプに基づいて、その駆動のpopメソッドを呼び出します.
$connection = $this->manager->connection($connectionName);
$job = $this->getNextJob($connection, $queue);
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if (! is_null($job)) {
    return $this->process(
        $this->manager->getName($connectionName), $job, $maxTries, $delay
    );
}

次はDatabaseQueueです.phpのpop法.
/**
 * Pop the next job off of the queue.
 *
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
 */
public function pop($queue = null)
{
    $queue = $this->getQueue($queue);
    $this->database->beginTransaction();
    if ($job = $this->getNextAvailableJob($queue)) {
        $job = $this->markJobAsReserved($job);
        $this->database->commit();
        return new DatabaseJob(
            $this->container, $this, $job, $queue
        );
    }
    $this->database->commit();
}

データを取得するプロセス物事処理が開かれています.
キュー・データのコアは$this->getNextavailableJob($queue)です.
sqlログを開き、キューデータがどのようにクエリーされているかを確認します.
/**
 * Get the next available job for the queue.
 *
 * @param  string|null  $queue
 * @return \StdClass|null
 */
protected function getNextAvailableJob($queue)
{
    $this->database->enableQueryLog();
    $job = $this->database->table($this->table)
                ->lockForUpdate()
                ->where('queue', $this->getQueue($queue))
                ->where(function ($query) {
                    $this->isAvailable($query);
                    $this->isReservedButExpired($query);
                })
                ->orderBy('id', 'asc')
                ->first();
    var_dump($this->database->getQueryLog());
    return $job ? (object) $job : null;
}

array(1) {
  [0] =>
  array(3) {
    'query' =>
    string(165) "select * from `jobs` where `queue` = ? and ((`reserved` = ? and `available_at` <= ?) or (`reserved` = ? and `reserved_at` <= ?)) order by `id` asc limit 1 for update"
    'bindings' =>
    array(5) {
      [0] =>
      string(7) "default"
      [1] =>
      int(0)
      [2] =>
      int(1493634233)
      [3] =>
      int(1)
      [4] =>
      int(1493634173)
    }
    'time' =>
    double(1.55)
  }

sql文からキューデータを取得するには2つの条件があることがわかります
reservedが0の場合、available_at時間は現在の時間より小さく、この条件は実行されるキューである.reservedが1の場合、reserved_at実行開始時間は計算された時間($this->isReservedButExpired)より小さく、すなわち現在の時間からタイムアウト秒を減算するCarbon::now()->subSeconds($this->expire)->gettimestamp()であり、キュータスクが期限切れであるかどうかを判断する.
selectプロセス全体が「for update」で、排他ロックがあります.
条件を満たすキューを取得した後
/**
 * Mark the given job ID as reserved.
 *
 * @param \stdClass $job
 * @return \stdClass
 */
protected function markJobAsReserved($job)
{
    $job->reserved = 1;
    $job->attempts = $job->attempts + 1;
    $job->reserved_at = $this->getTime();
    $this->database->table($this->table)->where('id', $job->id)->update([
        'reserved' => $job->reserved,
        'reserved_at' => $job->reserved_at,
        'attempts' => $job->attempts,
    ]);
    return $job;
}

プログラムはこのデータを更新し、更新が完了するとcommitになります.
同じサーバでは、2番目のプロセスがデータを取得する際に悲観的なロックに遭遇し、1番目のプロセスがデータを取得してreservedと時間を更新した後に実行する必要があります.つまり、Laravelキューでdatabaseを使用する場合、同時実行プロセスは複数のデータを同時に取得するのではなく、同じデータを取得して1つのプロセスupdateデータの状態と実行時間を待つので、キューがデータを取得した後に最初の操作が更新されるので、キューが期限切れにならない限り、2番目のプロセスは1番目のプロセスの同じデータを取得しません.
DatabaseQueue.phpのpopメソッドでは,キューデータを取得すると,「$this->database->commit();」前sleep(10)では、第2のキューが他のキューデータを取得していないことが明らかになり、「for update」はupdateレベルの排他ロックにすぎず、selectを排斥しないことを示しています.
Laravelはdatabaseキューを使用するとブロック現象がある場合がありますが、その原因かどうか分かりません.
実行時間が長すぎて「expire」パラメータ設定時間を超えると、第2のキューは第1のキューデータを取得し、タイムアウトを判断します.この場合、設定した最大実行回数triesに基づいて、新しいキューデータを挿入して実行を継続するか、エラーキュー「failed_jobs」テーブルに挿入してキュー実行に失敗したかを判断します.
以上がLaravelがmysqlを使用してキューを実行する論理であり、前に述べた2台のサーバ配置Laravelフレームワークはartisanスクリプトを実行し、1つのjobsテーブルキューFailedの問題はサーバ時間が一致しない原因であり、後のサーバ実行時に前のキューデータをタイムアウトと判断して「failed_jobs」の新しいデータに挿入し、最大失敗回数に達した場合である.そうでない場合は、新しいデータを挿入して試行を続行します.

したがってqueue:listenの実行時間パラメータであるtimeout=60は、キュータスクの有効期限より小さいexpireパラメータを設定する必要があります。


そして、Laravel 5.2のqueue:workには--timeout=60というパラメータはありません。。。


最後にキュー実行済みの処理ロジックです.
キューの実行に成功するとjobsのデータが削除されます.これは問題ありません.タイムアウト、例外などが失敗した場合、設定した最大失敗回数に基づいて新しいデータを挿入するか、Failedデータをfailed_jobsテーブルに挿入するかを判断します.
エラーが発生した場合、handleJobExceptionの例外処理はDatabaseQueueを呼び出します.phpのreleaseメソッド,$job->release($delay),最終的にpushToDatabase実装である.
新しいデータを挿入する場合、attemptsは失敗回数、reservedは0、available_atは、現在のタイムスタンプに遅延時間パラメータを加え、キュー処理全体が完全なデータ論理動作を形成する.
Laravel5.4キュー機能を大きく変更しました.マニュアルのヒント

タスクの有効期限とタイムアウト


タスク実行時間
コンフィギュレーションファイルconfig/queue.phpでは、各接続にretry_が定義されています.after項.この構成項目の目的は、タスクが実行されてから数秒後にキューに戻ることを定義することです.retry_afterが設定した値は90で、タスクが90秒実行された後も完了していない場合は、削除するのではなくキューに戻されます.間違いなくretry_afterの値は、タスク実行時間の最大可能値に設定されます.
Laravel5.4 queueのlistenコマンドを削除し、workにもタイムアウトパラメータを追加しました.Laravel5.5が出るときはアップグレードすべきです.
付録:Laravel 5.2テストのシナリオは、以前はネット上で検索されたのが早いので、jobをコマンドに書く方法ですが、実は5.2以降はjobの使用が簡単です.
jobsの下でjobタスクを定義し、handleはいくつかのテストスキームを追加することができます.例えば、私のような異常を投げ出して、直接Failedの
class MyJob extends Job implements ShouldQueue
{
    use InteractsWithQueue, SerializesModels;
    private $key;
    private $value;
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($key, $value)
    {
        $this->key = $key;
        $this->value = $value;
    }
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        for($i=0;$i<20;$i++){
            echo "$i
"; sleep(1); } echo "sss\t".$this->key."\t".date("Y-m-d H:i:s")."
"; throw new \Exception("
"); // Redis::hset('queue.test', $this->key, $this->value); } public function failed() { dump('failed'); } }

コントローラはタスクキューの設定にアクセスし、keyとvalueの前にテストredisで挿入し、自分のテストスキームでjobパラメータを設定することができます.
for($i = 0; $i < 5; $i ++) {
    echo "$i";
    $job = (new MyJob($i, $i))->delay(20);
    $this->dispatch($job);
}

私の例では5つのキューを設定し、複数のshellを開いてartisanテストを実行しましょう.
本当はredisキューコードを読み終えて、一緒に出したかったのですが、最近用事が多すぎて、redisコードもあまり見ていません.
redisドライバはhttp://www.cnblogs.com/z12987...の記事を参照してLaravelキューredisドライバロジックについて詳しく説明します.redisドライバで使用されるlistとzset構造の格納キューは、実行中に転送キューが削除され、データベースの「for update」操作がないため、キューブロックに格納されているわけではありません.
BUTキュータスクの有効期限設定はデータベースドライバと同じなので

Queue:listenの実行時間パラメータであるtimeout=60は、キュータスクの有効期限より小さいexpireパラメータを設定する必要があります。


やっと書き終わりました...