学習しやすいノート-RabbitMQチュートリアル2:1人の生産者と複数の消費者


  • RabbitMQチュートリアル2:1人の生産者と複数の消費者
  • コードベース:学習しやすいノート-RabbitMQチュートリアル1:生産者と消費者
  • 新規生産者クラス:
    package OneProductMulConsume;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.MessageProperties;
    
    public class OneProductSend {
    
    	private static final String TASK_QUEUE_NAME = "task_queue";
    
    	public static void main(String[] argv) throws java.io.IOException,
    			TimeoutException {
    
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.65.129");
    		Connection connection = factory.newConnection();
    		Channel channel = connection.createChannel();
    
    		/**
    		 *  true 
    		 */
    		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
    		for (int i = 0; i < 10; i++) {
    
    			String message = "X";
    			for (int j = 0; j < i; j++) {
    				message += ".";
    			}
    			
    			message = message + "(" + i + ")";
    			
    			/**
    			 * MessageProperties.PERSISTENT_TEXT_PLAIN: 
    			 */
    			channel.basicPublish("", TASK_QUEUE_NAME,
    					MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    			System.out.println(" [x] Sent '" + message + "'");
    		}
    
    		channel.close();
    		connection.close();
    	}
    	// ...
    }
  • メッセージ持続化/**MessageProperties.PERSISTENT_TEXT_PLAIN:メッセージの永続化*/channelを表す.basicPublish("", TASK_QUEUE_NAME,                           MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

  • キュー永続化:/**の2番目のパラメータはtrueでキュー永続化*/channelを表します.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
  • 新規消費者クラス:
    package OneProductMulConsume;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class MulConsumeRecv {
      private static final String TASK_QUEUE_NAME = "task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.129");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
    
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        /**
         *  , 
         */
        channel.basicQos(1);
    
        final Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
    
            System.out.println(" [x] Received '" + message + "'");
            try {
              doWork(message);
            } finally {
              System.out.println(" [x] Done");
              /**
               *  
               */
              channel.basicAck(envelope.getDeliveryTag(), false);
            }
          }
        };
        /**
         *  false , 
         */
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
      }
    
      private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
          if (ch == '.') {
            try {
              Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
              Thread.currentThread().interrupt();
            }
          }
        }
      }
    }
    
  • 消費者が明確な処理結果フラグ/**を返すように設定*2番目のパラメータはfalseで処理が完了したことを示すサービス側に通知し、サービス側が確認*/boolean autoAck=falseである.    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  • 消費者は、結果処理フラグ/***の処理が完了すると、サービス側*/channelに通知する.basicAck(envelope.getDeliveryTag(), false);

  • 毎回1つのメッセージ/**のみが受信ことは、メッセージキューから1つのメッセージのみが取得されることを示し、処理が完了するとサービス側に次のメッセージ*/channelが送信されることを通知する.basicQos(1);
  • テスト
  • 循環配布:2人の作業者が循環してメッセージを取得し、処理(合計10メッセージ)
  • 作業者1:8個のメッセージを処理した[*]Waiting for messages.To exit press CTRL+C  [x] Received 'X(0)'  [x] Done  [x] Received 'X.(1)'  [x] Done  [x] Received 'X..(2)'  [x] Done  [x] Received 'X...(3)'  [x] Done  [x] Received 'X....(4)'  [x] Done  [x] Received 'X.....(5)'  [x] Done  [x] Received 'X.......(7)'  [x] Done  [x] Received 'X.........(9)'  [x] Done
  • 後にワーカー2を起動する:2つのメッセージを処理した[*]Waiting for messages.To exit press CTRL+C  [x] Received 'X......(6)'  [x] Done  [x] Received 'X........(8)'  [x] Done