PHPでRabbitMQ(amqp拡張)を試みる
3404 ワード
2年前に「PythonでRabbitMQを試してみる」という記事を出したことがありますが、2年後の今日、PHPベースのamqp資料はかわいそうに少なく、元のいくつかの拡張も廃棄され、amqpだけが残って健在で、PECLに収録されています.収録されているとはいえ、公式マニュアルの情報は薄い.
本題に戻ると、amqp拡張のインストールは多くありませんが、先日送った「PHPにamqp拡張をインストールする」を参照してください.
amqpをインストールすると、コードの作成を開始できます.
消費者:受信メッセージロジック:
接続の作成-->channelの作成-->スイッチの作成-->キューの作成-->スイッチ/キュー/ルーティングキーのバインド-->メッセージの受信
生産者:メッセージ送信ロジック:
接続の作成-->channelの作成-->スイッチオブジェクトの作成-->メッセージの送信
注意すべき点は、queueオブジェクトには、consumeとgetの2つの方法があります.
前者はブロックされており、メッセージがない場合は保留され、ループで使用するのに適しています.
後者は非ブロックであり,メッセージを取得する際にあれば取得し,なければfalseを返す.
テストスクリーンショット
運用消費者:
生産者を実行し、メッセージを送信します.
消費者はメッセージを受信しました.
本題に戻ると、amqp拡張のインストールは多くありませんが、先日送った「PHPにamqp拡張をインストールする」を参照してください.
amqpをインストールすると、コードの作成を開始できます.
消費者:受信メッセージロジック:
接続の作成-->channelの作成-->スイッチの作成-->キューの作成-->スイッチ/キュー/ルーティングキーのバインド-->メッセージの受信
'192.168.1.93',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$e_name = 'e_linvo'; //
$q_name = 'q_linvo'; //
$k_route = 'key_1'; // key
// channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!
");
}
$channel = new AMQPChannel($conn);
//
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct
$ex->setFlags(AMQP_DURABLE); //
echo "Exchange Status:".$ex->declare()."
";
//
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //
echo "Message Total:".$q->declare()."
";
// ,
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."
";
//
echo "Message:
";
while(True){
$q->consume('processMessage');
//$q->consume('processMessage', AMQP_AUTOACK); // ACK
}
$conn->disconnect();
/**
*
*
*/
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."
"; //
$queue->ack($envelope->getDeliveryTag()); // ACK
}
生産者:メッセージ送信ロジック:
接続の作成-->channelの作成-->スイッチオブジェクトの作成-->メッセージの送信
'192.168.1.93',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$e_name = 'e_linvo'; //
//$q_name = 'q_linvo'; //
$k_route = 'key_1'; // key
// channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!
");
}
$channel = new AMQPChannel($conn);
//
$message = "TEST MESSAGE! !";
//
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
//
//$channel->startTransaction(); //
for($i=0; $i<5; ++$i){
echo "Send Message:".$ex->publish($message, $k_route)."
";
}
//$channel->commitTransaction(); //
$conn->disconnect();
注意すべき点は、queueオブジェクトには、consumeとgetの2つの方法があります.
前者はブロックされており、メッセージがない場合は保留され、ループで使用するのに適しています.
後者は非ブロックであり,メッセージを取得する際にあれば取得し,なければfalseを返す.
テストスクリーンショット
運用消費者:
生産者を実行し、メッセージを送信します.
消費者はメッセージを受信しました.