ActiveMQのC〹における応用
29486 ワード
本論文は.NET Fraamewarkの枠組みの下での応用です。これまでActiveMQはサポートされていません。NET CoreはRabitMQがサポートしています。NET Coreは、ActiveMQが早くサポートしてくれることを望んでいます。
ActiveMQはいいものです。多く言う必要はありません。ActiveMQはJava、C、C+++、C葃、Ruby、Perl、Python、PHPなどの様々な言語サポートを提供しています。私はwindows下開発GUIにいますので、C+++とC葑に関心を持っています。その中でC嗳のActiveMQは簡単です。AppheはNMS(.Net Messaging Service)を提供します。Net開発は以下のような手順で簡単な実現ができます。C++の応用はちょっと面倒くさいです。後で文章を書いて紹介します。
1、ActiveMQ公式サイトに最新版のActiveMQをダウンロードします。http://activemq.apache.org/download.html。前に降りたのは5.3.1で、5.3.2は今も出てきました。
2、ActiveMQ公式サイトに行って最新版のApache.NMSをダウンロードします。http://activemq.apache.org/nms/download.htmlApache.NMSとApache.NMS.ActiveMQの2つのbinパッケージをダウンロードする必要があります。ソースに興味があれば、srcパッケージをダウンロードすることもできます。ここで注意したいのですが、1.2.0バージョンのNMS.ActiveMQをダウンロードすると、Apache.NMS.ActiveMQ.dllが実際に使用しています。すなわち、ActiveMQのアプリケーションを停止するとWaitOne関数が異常になります。srcパッケージのソースがAache.NMS.ActiveMQ.1.0-1.2-srccurccurccurcsのためです。ソースを修正して再コンパイルすればいいです。最新版を見ましたが、1.3.0はこのバグを修復しましたので、最新版をダウンロードすればいいです。
3、ActiveMQを実行して、ActiveMQ解凍後のbinフォルダを見つけます。\apphe-activemq-5.3.1\bin、activemq.batバッチ処理ファイルを実行すればActiveMQサーバを起動できます。デフォルトポートは61616です。これは配置ファイルで修正できます。
4、C〓〓〓プログラムを書いてActiveMQの簡単な応用を実現します。新しいC〓〓〓プロジェクト(一つのProducterプロジェクトと一つのConsmerプロジェクト)、WinFormまたはConsolieプログラムはどちらでもいいです。ここに建てられたのはConsolieプロジェクトです。Apache.NMS.dllとActiveMQ.dllの引用を加えて、コードを実現するために簡単なProdcerとConsmerのコードを作成できます。
producer:
プログラム実現の機能:生産者producerはtestingというテーマを立てて、5秒ごとにこのテーマにメッセージを送ります。消費者consumerはtestingテーマを購読していますので、生産者がtestingテーマのメッセージをActiveMQサーバに送信すると、サーバーはtestingテーマを購読している消費者にこのメッセージを送ります。
producer.exeとconsumer.exeをコンパイルして生成し、二つのexeを実行すると、メッセージの送受信が見られます。
この例は建設のテーマです。ActiveMQは他の方法をサポートしています。Que、つまりP 2 Pの違いは何ですか?Topicはブロードキャストである。つまり、あるTopicが複数の消費者に購読された場合、メッセージがサーバーに届くと、サーバはこのメッセージをすべての消費者に送信する。Queueはポイントです。つまり、一つのメッセージは一つの消費者にしか送れません。もしあるQueが複数の消費者に購読されていたら、特別な状況がないと、メッセージは一つずつ順番に違う消費者に送られます。
msg 1-->consumer A
msg 2-->consumer B
msg 3-->consumer C
msg 4-->consumer A
msg 5-->consumer B
msg 6-->consumer C
特殊な場合は、ActiveMQフィルタリング機構をサポートする、すなわち生産者は、消費者側のSelectorに対応するメッセージの属性を設定することができ、消費者が設定したselectorがメッセージのProptiesに一致するだけで、メッセージが消費者に送信される。TopicもQueもSelectorをサポートしています。
ProptiesとSelectorはどう設定すればいいですか?下記のコードをご覧ください。
producer:
ブログの園から来ました。http://www.cnblogs.com/guthing/archive/2010/06/17/1759333.html
簡単に整理しました。ありがとうございます。
ActiveMQ他の文章のオススメ:http://blog.csdn.net/lee353086/article/details/6819123
ActiveMQはいいものです。多く言う必要はありません。ActiveMQはJava、C、C+++、C葃、Ruby、Perl、Python、PHPなどの様々な言語サポートを提供しています。私はwindows下開発GUIにいますので、C+++とC葑に関心を持っています。その中でC嗳のActiveMQは簡単です。AppheはNMS(.Net Messaging Service)を提供します。Net開発は以下のような手順で簡単な実現ができます。C++の応用はちょっと面倒くさいです。後で文章を書いて紹介します。
1、ActiveMQ公式サイトに最新版のActiveMQをダウンロードします。http://activemq.apache.org/download.html。前に降りたのは5.3.1で、5.3.2は今も出てきました。
2、ActiveMQ公式サイトに行って最新版のApache.NMSをダウンロードします。http://activemq.apache.org/nms/download.htmlApache.NMSとApache.NMS.ActiveMQの2つのbinパッケージをダウンロードする必要があります。ソースに興味があれば、srcパッケージをダウンロードすることもできます。ここで注意したいのですが、1.2.0バージョンのNMS.ActiveMQをダウンロードすると、Apache.NMS.ActiveMQ.dllが実際に使用しています。すなわち、ActiveMQのアプリケーションを停止するとWaitOne関数が異常になります。srcパッケージのソースがAache.NMS.ActiveMQ.1.0-1.2-srccurccurccurcsのためです。ソースを修正して再コンパイルすればいいです。最新版を見ましたが、1.3.0はこのバグを修復しましたので、最新版をダウンロードすればいいです。
1 private void StopMonitorThreads()
2 {
3 lock(monitor)
4 {
5 if(monitorStarted.CompareAndSet(true, false))
6 {
7 AutoResetEvent shutdownEvent = new AutoResetEvent(false);
8 // Attempt to wait for the Timers to shutdown, but don't wait
9 // forever, if they don't shutdown after two seconds, just quit.
10 this.readCheckTimer.Dispose(shutdownEvent);
11 shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
12 this.writeCheckTimer.Dispose(shutdownEvent);
13 shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
14 //WaitOne :public virtual bool WaitOne(TimeSpan timeout,bool exitContext)
15 this.asyncTasks.Shutdown();
16 this.asyncTasks = null;
17 this.asyncWriteTask = null;
18 this.asyncErrorTask = null;
19 }
20 }
21 }
22 private void StopMonitorThreads()
23 {
24 lock(monitor)
25 {
26 if(monitorStarted.CompareAndSet(true, false))
27 {
28 AutoResetEvent shutdownEvent = new AutoResetEvent(false);
29
30 // Attempt to wait for the Timers to shutdown, but don't wait
31 // forever, if they don't shutdown after two seconds, just quit.
32 this.readCheckTimer.Dispose(shutdownEvent);
33 shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
34 this.writeCheckTimer.Dispose(shutdownEvent);
35 shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
36 //WaitOne :public virtual bool WaitOne(TimeSpan timeout,bool exitContext)
37 this.asyncTasks.Shutdown();
38 this.asyncTasks = null;
39 this.asyncWriteTask = null;
40 this.asyncErrorTask = null;
41 }
42 }
43 }
3、ActiveMQを実行して、ActiveMQ解凍後のbinフォルダを見つけます。\apphe-activemq-5.3.1\bin、activemq.batバッチ処理ファイルを実行すればActiveMQサーバを起動できます。デフォルトポートは61616です。これは配置ファイルで修正できます。
4、C〓〓〓プログラムを書いてActiveMQの簡単な応用を実現します。新しいC〓〓〓プロジェクト(一つのProducterプロジェクトと一つのConsmerプロジェクト)、WinFormまたはConsolieプログラムはどちらでもいいです。ここに建てられたのはConsolieプロジェクトです。Apache.NMS.dllとActiveMQ.dllの引用を加えて、コードを実現するために簡単なProdcerとConsmerのコードを作成できます。
producer:
1 using System;
2 using System.Collections.Generic;
3 using System.Text;
4 using Apache.NMS;
5 using Apache.NMS.ActiveMQ;
6 using System.IO;
7 using System.Xml.Serialization;
8 using System.Runtime.Serialization.Formatters.Binary;
9 namespace Publish
10 {
11 class Program
12 {
13 static void Main(string[] args)
14 {
15 try
16 {
17 //Create the Connection Factory
18 IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
19 using (IConnection connection = factory.CreateConnection())
20 {
21 //Create the Session
22 using (ISession session = connection.CreateSession())
23 {
24 //Create the Producer for the topic/queue
25 IMessageProducer prod = session.CreateProducer(
26 new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
27 //Send Messages
28 int i = 0;
29 while (!Console.KeyAvailable)
30 {
31 ITextMessage msg = prod.CreateTextMessage();
32 msg.Text = i.ToString();
33 Console.WriteLine("Sending: " + i.ToString());
34 prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
35 System.Threading.Thread.Sleep(5000);
36 i++;
37 }
38 }
39 }
40 Console.ReadLine();
41 }
42 catch (System.Exception e)
43 {
44 Console.WriteLine("{0}",e.Message);
45 Console.ReadLine();
46 }
47 }
48 }
49 }
consumer: 1 using System;
2 using System.Collections.Generic;
3 using System.Text;
4 using Apache.NMS;
5 using Apache.NMS.ActiveMQ;
6 using System.IO;
7 using System.Xml.Serialization;
8 using System.Runtime.Serialization.Formatters.Binary;
9 namespace Subscribe
10 {
11 class Program
12 {
13 static void Main(string[] args)
14 {
15 try
16 {
17 //Create the Connection factory
18 IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
19 //Create the connection
20 using (IConnection connection = factory.CreateConnection())
21 {
22 connection.ClientId = "testing listener";
23 connection.Start();
24 //Create the Session
25 using (ISession session = connection.CreateSession())
26 {
27 //Create the Consumer
28 IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);
29 consumer.Listener += new MessageListener(consumer_Listener);
30 Console.ReadLine();
31 }
32 connection.Stop();
33 connection.Close();
34 }
35 }
36 catch (System.Exception e)
37 {
38 Console.WriteLine(e.Message);
39 }
40 }
41 static void consumer_Listener(IMessage message)
42 {
43 try
44 {
45 ITextMessage msg = (ITextMessage)message;
46 Console.WriteLine("Receive: " + msg.Text);
47 }
48 catch (System.Exception e)
49 {
50 Console.WriteLine(e.Message);
51 }
52 }
53 }
54 }
プログラム実現の機能:生産者producerはtestingというテーマを立てて、5秒ごとにこのテーマにメッセージを送ります。消費者consumerはtestingテーマを購読していますので、生産者がtestingテーマのメッセージをActiveMQサーバに送信すると、サーバーはtestingテーマを購読している消費者にこのメッセージを送ります。
producer.exeとconsumer.exeをコンパイルして生成し、二つのexeを実行すると、メッセージの送受信が見られます。
この例は建設のテーマです。ActiveMQは他の方法をサポートしています。Que、つまりP 2 Pの違いは何ですか?Topicはブロードキャストである。つまり、あるTopicが複数の消費者に購読された場合、メッセージがサーバーに届くと、サーバはこのメッセージをすべての消費者に送信する。Queueはポイントです。つまり、一つのメッセージは一つの消費者にしか送れません。もしあるQueが複数の消費者に購読されていたら、特別な状況がないと、メッセージは一つずつ順番に違う消費者に送られます。
msg 1-->consumer A
msg 2-->consumer B
msg 3-->consumer C
msg 4-->consumer A
msg 5-->consumer B
msg 6-->consumer C
特殊な場合は、ActiveMQフィルタリング機構をサポートする、すなわち生産者は、消費者側のSelectorに対応するメッセージの属性を設定することができ、消費者が設定したselectorがメッセージのProptiesに一致するだけで、メッセージが消費者に送信される。TopicもQueもSelectorをサポートしています。
ProptiesとSelectorはどう設定すればいいですか?下記のコードをご覧ください。
producer:
1 public void SetProperties()
2 {
3 ITextMessage msg = prod.CreateTextMessage();
4 msg.Text = i.ToString();
5 msg.Properties.SetString("myFilter", "test1");
6 Console.WriteLine("Sending: " + i.ToString());
7 prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
8 ITextMessage msg = prod.CreateTextMessage();
9 msg.Text = i.ToString();
10 msg.Properties.SetString("myFilter", "test1");
11 Console.WriteLine("Sending: " + i.ToString());
12 prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
13
14 }
consumer:1 public void SetSelector()
2 {
3 // consumer Selector
4 IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");
5 // consumer Selector
6 IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");
7 }
ブログの園から来ました。http://www.cnblogs.com/guthing/archive/2010/06/17/1759333.html
簡単に整理しました。ありがとうございます。
ActiveMQ他の文章のオススメ:http://blog.csdn.net/lee353086/article/details/6819123