PHP/REDISメッセージキューサービスを実現
15864 ワード
一般的なメッセージ・キュー
生産者
consumer(消費者)
遅延キューメッセージ
生産者
consumer(消費者)
生産者
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//
$tasks = [];
for ($i = 0; $i < 3; $i++) {
$tasks[] = $i;
}
//
$res = $redis->lPush("tasks", ...$tasks);
consumer(消費者)
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// : CPU,redis QPS
// ?
// pop
while (true) {
// blpop/brpop , , , 。
// 。 blpop/brpop lpop/rpop,
$res = $redis->brPop("tasks", 20);
if (!$res) {
//
// sleep(1);
// sleep 。 , 。
// 1 , 1s。
// : , ,
continue;
} else {
//
echo " " . PHP_EOL;
sleep(1); //
}
}
遅延キューメッセージ
生産者
//
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//
$tasks = [];
for ($i = 0; $i < 50; $i++) {
array_push($tasks, time() + $i);
array_push($tasks, str_shuffle('asdfghjklqwertyuiopzxcvbnm1234567890'));
}
// Redis Zadd
//
$res = $redis->zAdd('delay-queue', [], ...$tasks);
if ($res) {
echo 'success' . PHP_EOL;
} else {
echo 'fail: zadd error' . PHP_EOL;
}
consumer(消費者)
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$key = 'delay-queue';
while (true) {
// Redis Zrangebyscore 。 ( )
$msg = $redis->zRangeByScore($key, 0, time(), ['limit' => [0, 1]]);
if (count($msg) > 0) {
// Redis Zrem ,
$success = $redis->zRem($key, $msg[0]);
// , 。
// , loop 、 。
// , zrem 。
if ($success) {
echo " : {
$msg[0]}" . PHP_EOL;
sleep(1);//
} else {
echo " " . PHP_EOL;
}
} else {
// CUP redis QPS
sleep(1);
}
}