ActiveMQ(五)


本日本編はActiveMQのQueueベース使用
 
 
    私から見ればActvieMQでQueueはよく使われるメッセージ送信モデルであり、その応用性はtopicよりはるかに広い(大牛は噴かないで、会社の業務、会社の業界はtopic方式が私が接触したプロジェクトで広く使われていないことを決定した).
 
 
1.Topicとqueueの技術特徴の比較
 
Topic
Queue
中国語フルネーム
購読メッセージの発行
ポイント対ポイント
ステータスの有無
topicはステータスがなく、データはデフォルトで着陸しません.
Queueデータのデフォルトは、Active MQデフォルトで$AMQ_が格納されているなど、サーバ上でファイル形式で保存されます.HOMEdatakr-storedataでは、DBストレージに構成することもできます.
完全性の保証
パブリッシャーがパブリッシュしたすべてのデータが保証されず、サブスクライバは受け入れられます.
各データが受信者に受信されることを保証します.
メッセージが失われるかどうか
一般に、パブリッシャがメッセージをサブスクリプションメッセージにパブリッシュする場合、topicアドレスを傍受しているサブスクリプションのみがメッセージを受信することができる.購読者が傍受していなければ、topicは失われます.
メッセージ発起人はターゲットQueueにメッセージを送信し、受信者はこのQueue上のメッセージを非同期で受信することができる.Queue上のメッセージは、受信者が一時的に受け取らなければ、メッセージが受信されるまで失われることはありません.
メッセージ配信受信ポリシー
1対のマルチメッセージパブリケーション受信ポリシーは、同じtopicアドレスを傍受する複数のサブスクライバが、パブリケーション者から送信されたメッセージを受信することができる.サブスクライバは通知サーバを受信しました
ポイント・ツー・ポイントのメッセージは受信ポリシーを発行し、1つのメッセージ・イニシエータが送信したメッセージは、1つの受信者のみが受信できます.受信者が受信した後、サーバに受信したことを通知し、サーバはqueue内のメッセージを削除またはその他の操作を行います.
 Topicとqueueの最大の違いは、topicがブロードキャストの形式で、すべてのオンラインリスニングクライアントに新しいメッセージがあることを通知し、リスニングされていないクライアントはメッセージを受信できないことです.queueは、リスニング状態にある複数のクライアントのうちの1つをポイント・ツー・ポイントとして通知する.
応用シーンから言えばtopicは電子商取引の広告とプッシュするのに適しており、チラシを広めるには誰かが手に入れるかどうか気にする必要はありません.Queueは、顧客メール、重要なメッセージの公開など、メッセージの到着を確認する必要があるシーンなど、厳格なトランザクションを処理するのに適しています.
 
2.(転)効率比較(以前の個人学習抄録のためにソースを抜粋していない)
    リスニングクライアントのコンカレント数を増やして検証することで、topicのメッセージプッシュは、リスニングクライアントのコンカレント上昇によって著しく低下するかどうか、テスト環境のサーバはci環境のActiveMQ、クライアントは私のホストである.
        実測の結果,topic方式で送信されたメッセージ,送信と受信の効率は,1人のサブスクライバと100人のサブスクライバを前提として明らかな差はなかったが,500人のサブスクライバ(スレッド)が併発する前提で効率の差が顕著であった.(500スレッド同時の場合、自機のcpu占有率は70-90%に達しているため、自機のテストによる性能ボトルネックかtopicメッセージ送信方式に性能ボトルネックがあるか確認できず、効率の低下が顕著である).
        Topic方式で送信されたメッセージとqueue方式で送信されたメッセージは,送信と受信の効率は,1人のサブスクライバと100人のサブスクライバを前提として明らかな差はなかったが,500人のサブスクライバが同時であることを前提としてtopic方式の効率はqueueより明らかに低かった.
        Queue方式で送信されるメッセージは,1人のサブスクライバ,100人のサブスクライバ,500人のサブスクライバを前提として送信と受信の効率に有意な変化はない.
Topic実測データ: 
 
送信者が送信したメッセージの合計数
すべてのサブスクライバが受信したメッセージの合計数
メッセージの送受信に平均時間がかかる
シングル購読者
100
100
101ms
100購読者
100
10000
103ms
500購読者
100
50000
14162ms
 
Queue実測データ: 
 
送信者が送信したメッセージの合計数
すべてのサブスクライバが受信したメッセージの合計数
メッセージの送受信に平均時間がかかる
シングル購読者
100
100
96ms
100購読者
100
100
96ms
500購読者
100
100
100ms
 
PS:これは参考にしてください.個人的にはこのデータにはあまり説得力がありません.まずqueueにとって、何人のメッセージ受信者がいても、MQのメッセージ送信総数はメッセージ発信者が発起した本数に準じていますが、topicの異なるMQ送信総数は、投稿者が発布した本数と購読者個数の積です.
 
次はコードです.
まずメッセージ発起人(Sender):
public class Sender {
	private static ConnectionFactory connectionFactory;
	private static Connection connection;
	private static Session session;
	private static int total=100;

	public Sender() throws JMSException {
		connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	}

	public void close() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

	public MessageProducer getMessageProducer(String stock, int dMode) throws JMSException {
		MessageProducer producer = session.createProducer(session.createQueue("myQueue"));
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		return producer;
	}

	
	public TextMessage setMessageByText(String text) throws JMSException {
		TextMessage message = session.createTextMessage();
		message.setText(text);
		return message;
	}
	public TextMessage setMessageByMap(Map<String, Object> map) throws JMSException {
		TextMessage message = session.createTextMessage();
		for (String key : map.keySet()) {
			Object o=map.get(key);
			if(o instanceof Integer){
				message.setIntProperty(key, (Integer)o);
			}else if(o instanceof Boolean){
				message.setBooleanProperty(key, (Boolean)o);
			}else if(o instanceof Long){
				message.setLongProperty(key, (Long)o);
			}else if(o instanceof String){
				message.setStringProperty(key, (String)o);
			}else if(o instanceof Double){
				message.setDoubleProperty(key, (Double)o);
			}else if(o instanceof Short){
				message.setShortProperty(key, (Short)o);
			}else if(o instanceof Short){
				message.setShortProperty(key, (Short)o);
			}else if(o instanceof Float){
				message.setFloatProperty(key, (Float)o);
			}
		}
		return message;
	}

	public void sendMessage(MessageProducer producer,TextMessage message) throws JMSException{
		producer.send(message);
	}
	public static void main(String[] args) throws JMSException {
		Sender sender = new Sender();
		MessageProducer producer = sender.getMessageProducer("test", DeliveryMode.NON_PERSISTENT);
		int count=0;
		while (true) {
			TextMessage message;
			if(count%2==0){
				Map<String,Object> map=new HashMap<String,Object>();
				map.put("name", "My message");
				map.put("writer", "Bartholomew");
				map.put("content", "this is ActiveMQ!"+count);
				message= sender.setMessageByMap(map);
				sender.sendMessage(producer,message);
				System.out.println("   "+ (++count)+"   : " + message.toString());
			}else{
				message=sender.setMessageByText("hello world!"+count);
				sender.sendMessage(producer,message);
				System.out.println("   "+ (++count)+"   : " + message.getText());
			}
			if(total<=count){
				break;
			}
			
		}
	}
}

 次にメッセージ受信者の傍受クラスです.傍受クラスはメッセージの送信状況に基づいて書かれています.自分で修正してください.
public class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		TextMessage tm = (TextMessage) message;
		String content;
		try {
			content = tm.getText();
			if(content!=null){
				System.out.println("Received message: " + content);
			}else{
				Enumeration<String> pnames =tm.getPropertyNames();
				while(pnames.hasMoreElements()){
					String o = (String)pnames.nextElement();
//					message.getObjectProperty(o);
					System.out.print(o+":"+message.getObjectProperty(o)+",");
				}
				System.out.println();
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

 最後にメッセージ受信者
public class Receiver {
	public static void main(String[] args) throws JMSException {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("myQueue");
		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MyMessageListener());
	}
}

 
実行については、queueにとって誰が先に走るか、誰が後ろに走るかという規定はありません.また、1つのメッセージには受信者が1人しかいないので、実行順序を勝手に決めることができます.