activemq-queue-selector

5423 ワード

前のページに続く:
http://wangxinchun.iteye.com/blog/2145958
consumerにとって、receiveメッセージは、キーワードによって自分の興味あるメッセージをフィルタすることができます.もちろん、異なるDestinationキューを定義してこれらのメッセージを搬送することができますが、一般的には、同じキューが一つのテーマのメッセージを保存し、列の個数を減らすことができます.このためにconsumerのreceive時のフィルタ機能が追加されました.
caseは以下の通りです
メッセージ消費者:

/**
      intended  ,        50   
*/
public class Consumer {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final long TIMEOUT = 20000;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        System.out.println("
Waiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s"); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("test-queue"); // intended , 50 MessageConsumer consumer = session.createConsumer(destination, "intended = 'you'"); int i = 0; while (true) { Message message = consumer.receive(TIMEOUT); if (message != null) { if (message instanceof TextMessage) { String text = ((TextMessage) message).getText(); System.out.println("Got " + i++ + ". message: " + text); } } else { break; } } consumer.close(); session.close(); } catch (Exception e) { System.out.println("Caught exception!"); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { System.out.println("Could not close an open connection..."); } } } } }
メッセージ生産者:

/**
 *  producer    test-queue     ,  intended   ,    me you
 */
public class Producer {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final int NUM_MESSAGES_TO_SEND = 100;
    private static final long DELAY = 100;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;

        try {

            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
                TextMessage message = session.createTextMessage("Message #" + i);
                System.out.println("Sending message #" + i);
                if (i % 2 == 0) {
                    System.out.println("Sending to me");
                    message.setStringProperty("intended", "me");
                } else {
                    System.out.println("Sending to you");
                    message.setStringProperty("intended", "you");
                }
                producer.send(message);
                Thread.sleep(DELAY);
            }

            producer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }

}
検証:Producerを通じて100件のメッセージを送信しました.
Consmerでintensdedを設定してください.50件のメッセージを受けました.Consmerでintensdedを設定します.  あなたのために50件のメッセージを受け取りました.予想通りです.