RabbitMQメッセージキューのPHPでの応用
23588 ワード
メッセージキューの実装では、RabbitMQはその頑丈さと信頼性で長い.会社のプロジェクトでメッセージキューの実装として選択する.MQのメカニズムと原理についてネット上で多くの文章を見ることができて、ここではもう贅沢に述べないで、いくつかの比較的に混同しやすい問題だけを話します
1,binding keyとrouting key
binding keyもrouting keyも自分で設定した文字のセットにすぎないが、使う場所が違う.binding keyはスイッチとキューをバインドするときに方法で伝達される文字列で、routing keyはメッセージを発表するついでに持参した文字列で、この2つは実は1つのものだと言っている人もいるが、そうではない.いいえ、この2つの役割が異なるため、1つのスイッチは多くのキューをバインドすることができますが、各キューに必要なメッセージのタイプが異なるかもしれません.binding keyは、このバインド時にスイッチとキューの間に残るヒント情報です.メッセージが送信されると、メッセージとともに送信されるrouting keyはbinding keyと同じようにメッセージがこのキューに必要なものであることを説明し、異なる場合はこのキューに渡さないでください.スイッチは次のキューを探してみてください.分かったでしょう、この2つのkeyは暗号で、合ったのは自分の人で、それに合わないなら、もう一度探してください.
binding keyとrouting keyのペアは実は完全に同じではなく、「似ている」ペアを作ることもできます.スイッチを確立するときは、MQに伝えます.私が宣言するこのスイッチとその上のキューの間でメッセージを送信するときにrouting keyとbinding keyが完全に同じであることを要求します.このモードはDirectと呼ばれています.routing keyとbinding keyが「ぼやけている」マッチングできる場合は、このモードはTopicと呼ばれ、マッチングが必要でなければFanoutと呼ばれます.
2、持続化
スイッチとキューは、いずれも作成時に永続化に設定、再起動後に返信することができるが、その中のメッセージはできず、メッセージも復元する場合は、メッセージをスイッチにパブリッシュする際に、フラグ「Delivery Mode」(配達モード)を指定することができ、1は非永続化、2は永続化である.
3、フロー制御機構
メッセージの生産速度が速く、プロセスの処理能力が低いと、メッセージが蓄積され、メモリの占有量が多くなり、MQがクラッシュするため、rabbitmqにはフロー制御メカニズムがあり、制限を超えるとメッセージの受信を阻止し、mqフロー制御には3つのメカニズムがある.
1,メッセージの送信が速すぎる接続をアクティブにブロックします.これは調整できません.ブロックされた場合、abbitmqctlコンソールにblockedの状態が表示されます.
2、メモリが制限を超えていると、接続がブロックされ、vm_memory_high_watermark可変
3、残りのディスクは以下のmqを限定すると、すべての生産者をアクティブにブロックし、デフォルトは50 m、disk_free_Limit可変
以下はcentos 7の上にある、MQのインストール手順である.
1、必要なサポート
2,erlang環境
3,rabbitmq依存ファイルのインストール,rabbitmqのインストール
4、管理プラグインの有効化
5、プロファイルの作成
6,環境ファイルの作成
7,phpのrabbitmq拡張をインストールする
アクションコマンド
phpのserverエンドスクリプト
phpクライアントスクリプト
一部のmq定数の設定、正しくないところを翻訳して、みんなは実験を基準にします
1,binding keyとrouting key
binding keyもrouting keyも自分で設定した文字のセットにすぎないが、使う場所が違う.binding keyはスイッチとキューをバインドするときに方法で伝達される文字列で、routing keyはメッセージを発表するついでに持参した文字列で、この2つは実は1つのものだと言っている人もいるが、そうではない.いいえ、この2つの役割が異なるため、1つのスイッチは多くのキューをバインドすることができますが、各キューに必要なメッセージのタイプが異なるかもしれません.binding keyは、このバインド時にスイッチとキューの間に残るヒント情報です.メッセージが送信されると、メッセージとともに送信されるrouting keyはbinding keyと同じようにメッセージがこのキューに必要なものであることを説明し、異なる場合はこのキューに渡さないでください.スイッチは次のキューを探してみてください.分かったでしょう、この2つのkeyは暗号で、合ったのは自分の人で、それに合わないなら、もう一度探してください.
binding keyとrouting keyのペアは実は完全に同じではなく、「似ている」ペアを作ることもできます.スイッチを確立するときは、MQに伝えます.私が宣言するこのスイッチとその上のキューの間でメッセージを送信するときにrouting keyとbinding keyが完全に同じであることを要求します.このモードはDirectと呼ばれています.routing keyとbinding keyが「ぼやけている」マッチングできる場合は、このモードはTopicと呼ばれ、マッチングが必要でなければFanoutと呼ばれます.
2、持続化
スイッチとキューは、いずれも作成時に永続化に設定、再起動後に返信することができるが、その中のメッセージはできず、メッセージも復元する場合は、メッセージをスイッチにパブリッシュする際に、フラグ「Delivery Mode」(配達モード)を指定することができ、1は非永続化、2は永続化である.
3、フロー制御機構
メッセージの生産速度が速く、プロセスの処理能力が低いと、メッセージが蓄積され、メモリの占有量が多くなり、MQがクラッシュするため、rabbitmqにはフロー制御メカニズムがあり、制限を超えるとメッセージの受信を阻止し、mqフロー制御には3つのメカニズムがある.
1,メッセージの送信が速すぎる接続をアクティブにブロックします.これは調整できません.ブロックされた場合、abbitmqctlコンソールにblockedの状態が表示されます.
2、メモリが制限を超えていると、接続がブロックされ、vm_memory_high_watermark可変
3、残りのディスクは以下のmqを限定すると、すべての生産者をアクティブにブロックし、デフォルトは50 m、disk_free_Limit可変
以下はcentos 7の上にある、MQのインストール手順である.
1、必要なサポート
yum install ncurses-devel unixODBC unixODBC-devel
2,erlang環境
wget http://www.erlang.org/download/ otp_src_17.3.tar.gz
tar zxvf otp_src_17.3.tar.gz
cd otp_src_17.3
./configure --without-javac
#
make && make install
3,rabbitmq依存ファイルのインストール,rabbitmqのインストール
yum install xmlto
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.1/rabbitmq-server-3.4.1.tar.gz
tar zxvf rabbitmq-server-3.4.1.tar.gz
cd rabbitmq-server-3.4.1/
make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc
make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc install
/usr/rabbitmq/sbin/rabbitmq-server -detached rabbitmq
/usr/rabbitmq/sbin/rabbitmqctl status
/usr/rabbitmq/sbin/rabbitmqctl stop rabbitmq
4、管理プラグインの有効化
mkdir /etc/rabbitmq
cd /usr/rabbitmq/sbin
./rabbitmq-plugins enable rabbitmq_management ( )
./rabbitmq-plugins disable rabbitmq_management ( )
# rabbitmq
# http://127.0.0.1:15672/
# iptables
# vi /etc/sysconfig/iptables
# -A INPUT -m state --state NEW -m tcp -p tcp --dport 15672 -j ACCEPT
# iptable systemctl restart iptables.service
5、プロファイルの作成
# /usr/rabbitmq/sbin/rabbitmq-defaults config
#
touch/usr/rabbitmq/sbin
#vm_memory_high_watermark , , , , 0.4, 40%,
#vm_memory_high_watermark_paging_ratio
vi /usr/rabbitmq/sbin/rabbitmq.config
[
{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75},
{vm_memory_high_watermark, 0.7}]}
].
6,環境ファイルの作成
touch /etc/rabbitmq/rabbitmq-env.conf
#
RABBITMQ_NODENAME=FZTEC-240088
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 IP
RABBITMQ_NODE_PORT=5672
RABBITMQ_LOG_BASE=/data/rabbitmq/log
RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
7,phpのrabbitmq拡張をインストールする
yum install librabbitmq-devel.x86_64
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar zxvf amqp-1.4.0.tgz
cd amqp-1.4.0
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp
make && make install
vim /usr/local/php/etc/php.ini
#
extension=amqp.so
service nginx reload
service php-fpm restart
アクションコマンド
exchange
/usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete arguments
/usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me
/usr/rabbitmq/sbin/rabbitmqctl list_bindings
/usr/rabbitmq/sbin/rabbitmqctl list_connections
phpのserverエンドスクリプト
<?php
$routingkey='key';
//
$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
echo "Established a connection to the broker
";
}
else {
echo "Cannot connect to the broker
";
}
//
$message = json_encode(array('Hello World3!','php3','c++3:'));
// channel
$channel = new AMQPChannel($conn);
// exchange
$ex = new AMQPExchange($channel);
$ex->setName('exchange');//
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
//$ex->setFlags(AMQP_AUTODELETE);
//echo "exchange status:".$ex->declare();
echo "exchange status:".$ex->declareExchange();
echo "
";
for($i=0;$i<100;$i++){
if($routingkey=='key2'){
$routingkey='key';
}else{
$routingkey='key2';
}
$ex->publish($message,$routingkey);
}
/*
$ex->publish($message,$routingkey);
$q = new AMQPQueue($channel);
$q->setName('queue');
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$q->declare();
echo "
";
echo 'queue bind: '.$q->bind('exchange','route.key');
routingKey
echo "
";
$channel->startTransaction();
echo "send: ".$ex->publish($message, 'route.key'); // routingKey
$channel->commitTransaction();
$conn->disconnect();
*/
phpクライアントスクリプト
<?php
$bindingkey='key2';
// RabbitMQ
$conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
$conn = new AMQPConnection($conn_args);
$conn->connect();
// queue , exchange, routingkey
$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue2');
$q->setFlags(AMQP_DURABLE);
$q->declare();
$q->bind('exchange',$bindingkey);
//
$messages = $q->get(AMQP_AUTOACK) ;
if ($messages){
var_dump(json_decode($messages->getBody(), true ));
}
$conn->disconnect();
?>
一部のmq定数の設定、正しくないところを翻訳して、みんなは実験を基準にします
/**
* Passing in this constant as a flag will forcefully disable all other flags.
* Use this if you want to temporarily disable the amqp.auto_ack ini setting.
* , amqp.auto_ack
*/
define('AMQP_NOPARAM', 0);
/**
* Durable exchanges and queues will survive a broker restart, complete with all of their data.
* , ,
*/
define('AMQP_DURABLE', 2);
/**
* Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist.
* , ,
*/
define('AMQP_PASSIVE', 4);
/**
* Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue.
* ,
*/
define('AMQP_EXCLUSIVE', 8);
/**
* For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound
* to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete
* flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no
* subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be
* automatically deleted with the client disconnects.
* , , , .
* , , , ,
*
*/
define('AMQP_AUTODELETE', 16);
/**
* Clients are not allowed to make specific queue bindings to exchanges defined with this flag.
*
*/
define('AMQP_INTERNAL', 32);
/**
* When passed to the consume method for a clustered environment, do not consume from the local node.
* ,
*/
define('AMQP_NOLOCAL', 64);
/**
* When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag,
* the messages will be immediately marked as acknowledged by the server upon delivery.
* get , acknowledged ( )
*/
define('AMQP_AUTOACK', 128);
/**
* Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty.
* ,
*/
define('AMQP_IFEMPTY', 256);
/**
* Passed on queue or exchange creation, this flag indicates that the queue or exchange should be
* deleted when no clients are connected to the given queue or exchange.
* , ,
*/
define('AMQP_IFUNUSED', 512);
/**
* When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned.
* , ,
*/
define('AMQP_MANDATORY', 1024);
/**
* When publishing a message, mark this message for immediate processing by the broker. (High priority message.)
* , .
*/
define('AMQP_IMMEDIATE', 2048);
/**
* If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple
* messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message.
* If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding
* messages.
* AMQPQueue::ack , , , 0
* , AMQP_MULTIPLE, 0,
*/
define('AMQP_MULTIPLE', 4096);
/**
* If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait
* for a reply method. If the server could not complete the method it will raise a channel or connection exception.
* AMQPExchange::bind() , , , ,
*/
define('AMQP_NOWAIT', 8192);
/**
* If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue.
* AMQPQueue::nack ,
*/
define('AMQP_REQUEUE', 16384);
/**
* A direct exchange type.
* direct
*/
define('AMQP_EX_TYPE_DIRECT', 'direct');
/**
* A fanout exchange type.
* fanout
*/
define('AMQP_EX_TYPE_FANOUT', 'fanout');
/**
* A topic exchange type.
* topic
*/
define('AMQP_EX_TYPE_TOPIC', 'topic');
/**
* A header exchange type.
* header
*/
define('AMQP_EX_TYPE_HEADERS', 'headers');
/**
* socket
*/
define('AMQP_OS_SOCKET_TIMEOUT_ERRNO', 536870947);