PHP/REDISメッセージキューサービスを実現


一般的なメッセージ・キュー
生産者
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

//     
$tasks = [];
for ($i = 0; $i < 3; $i++) {
     
    $tasks[] = $i;
}

//         
$res = $redis->lPush("tasks", ...$tasks);

consumer(消費者)
$redis = new Redis();

$redis->connect('127.0.0.1', 6379);

//     :              CPU,redis   QPS      
//       ?
//                         pop

while (true) {
     
    // blpop/brpop              ,         ,      ,      。
    //          。 blpop/brpop     lpop/rpop,           
    $res = $redis->brPop("tasks", 20);
    if (!$res) {
     
        //      
        // sleep(1);
        // sleep               。       ,               。
        //      1     ,         1s。
        //     :         ,         ,                 
        continue;
    } else {
     
        //     
        echo "    " . PHP_EOL;
        sleep(1); //       
    }
}


遅延キューメッセージ
生産者
//       
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);


//     
$tasks = [];
for ($i = 0; $i < 50; $i++) {
     
    array_push($tasks, time() + $i);
    array_push($tasks, str_shuffle('asdfghjklqwertyuiopzxcvbnm1234567890'));
}

// Redis Zadd                            
//     
$res = $redis->zAdd('delay-queue', [], ...$tasks);

if ($res) {
     
    echo 'success' . PHP_EOL;
} else {
     
    echo 'fail: zadd error' . PHP_EOL;
}

consumer(消費者)
$redis = new Redis();

$redis->connect('127.0.0.1', 6379);

$key = 'delay-queue';

while (true) {
     
    // Redis Zrangebyscore                   。           (    )    
    $msg = $redis->zRangeByScore($key, 0, time(), ['limit' => [0, 1]]);
    if (count($msg) > 0) {
     
        // Redis Zrem                   ,          
        $success = $redis->zRem($key, $msg[0]);
        //             ,             。
        //                    ,   loop           、      。
        //                  ,   zrem         。
        if ($success) {
     
            echo "    : {
       $msg[0]}" . PHP_EOL;
            sleep(1);//       
        } else {
     
            echo "    " . PHP_EOL;
        }
    } else {
     
        //               CUP      redis  QPS 
        sleep(1);
    }
}