学習しやすいノート-RabbitMQチュートリアル2:1人の生産者と複数の消費者
4536 ワード
package OneProductMulConsume;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class OneProductSend {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws java.io.IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.129");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* true
*/
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "X";
for (int j = 0; j < i; j++) {
message += ".";
}
message = message + "(" + i + ")";
/**
* MessageProperties.PERSISTENT_TEXT_PLAIN:
*/
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
// ...
}
キュー永続化:/**の2番目のパラメータはtrueでキュー永続化*/channelを表します.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
package OneProductMulConsume;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MulConsumeRecv {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.129");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/**
* ,
*/
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
/**
*
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
/**
* false ,
*/
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
毎回1つのメッセージ/**のみが受信ことは、メッセージキューから1つのメッセージのみが取得されることを示し、処理が完了するとサービス側に次のメッセージ*/channelが送信されることを通知する.basicQos(1);