Mina、Netty、Twistedを一緒に学ぶ(7):パブリッシュ/サブスクリプション(Publish/subscribe)


メッセージングには多くの方法があり、リクエスト/レスポンス(Request/Reply)が最も一般的です.前述のブログの例では、サーバがメッセージを受信すると、すぐにwriteがクライアントにメッセージを返信するリクエスト/応答方式を採用することが多い.HTTPプロトコルもリクエスト/レスポンスに基づく方式である.
しかし、リクエスト/レスポンスは、すべてのメッセージングのニーズを満たすことはできません.一部のニーズでは、受動的なリクエスト待ち後に応答するのではなく、サービス側がクライアントにメッセージをアクティブにプッシュする必要がある場合があります.
Publish/Subscribe(Publish/Subscribe)は、サーバがクライアントにメッセージをアクティブに送信するメッセージング方式です.サブスクライバSubscriberがサーバクライアントに接続されると、パブリッシャのPublisherパブリケーションのサブスクライバを開始することに相当し、パブリッシャがメッセージをパブリッシュすると、すべてのサブスクライバがこのメッセージを受信します.
インターネットチャットルームは一般的に、パブリケーション/サブスクリプションモードに基づいて実現されます.例えば、QQグループを追加すると、このグループのすべてのメッセージを購読することに相当し、新しいメッセージがある場合、サーバは自発的にすべてのクライアントにメッセージを送信します.ただ、チャットルームのすべての人は投稿者であり、購読者でもある.
以下、MINA、Netty、Twistedでそれぞれ簡単なパブリケーション/サブスクリプションモードのサーバプログラムを実現し、サーバに接続されているすべてのクライアントはサブスクライバであり、パブリケーション者がメッセージをパブリッシュすると、サーバはすべてのクライアントにメッセージを転送します.
MINA:
MINAでは、IoServiceのgetManagedSessions()メソッドを使用して、このIoServiceが現在管理しているすべてのIoSession、すなわち、サーバに接続されているすべてのクライアントのセットを取得できます.サーバは、パブリッシャによって発行されたメッセージを受信すると、IoServiceのgetManagedSessions()メソッドによってすべてのクライアントに対応するIoSessionを取得し、これらのクライアントにメッセージを送信することができる.
public class TcpServer {

	public static void main(String[] args) throws IOException {
		IoAcceptor acceptor = new NioSocketAcceptor();

		acceptor.getFilterChain().addLast("codec",
				new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r
", "\r
"))); acceptor.setHandler(new TcpServerHandle()); acceptor.bind(new InetSocketAddress(8080)); } } class TcpServerHandle extends IoHandlerAdapter { @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { // IoSession Collection<IoSession> sessions = session.getService().getManagedSessions().values(); // IoSession IoUtil.broadcast(message, sessions); } }

Netty:
Nettyはチャネルグループを保存するためにチャネルグループを提供し、チャネルグループはスレッドの安全なチャネルセットであり、いくつかの列のチャネルバッチ操作を提供する.TCP接続がオフになると、対応するチャネルが自動的にチャネルGroupから削除されるので、閉じたチャネルを手動で削除する必要はありません.
NettyドキュメントChannelGroupの説明:
A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don't need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.
新しいクライアントがサーバに接続されると、対応するチャネルが1つのチャネルグループに追加され、パブリッシャがメッセージを発行すると、サーバはチャネルグループを介してすべてのクライアントにメッセージを書き込むことができる.
public class TcpServer {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();
							pipeline.addLast(new LineBasedFrameDecoder(80));
							pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
							pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
							pipeline.addLast(new TcpServerHandler());
						}
					});
			ChannelFuture f = b.bind(8080).sync();
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}
}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

	// ChannelGroup            ,    static       ChannelGroup  ,   new  TcpServerHandler      ChannelGroup
	private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		channels.add(ctx.channel()); //         ChannelGroup,     ChannelGroup        Channel
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		channels.writeAndFlush(msg + "\r
"); // , ChannelGroup } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // cause.printStackTrace(); , PublishClient , PublishClient , ctx.close(); } }

Twisted:
Twistedでは、グローバルなデータはFactoryに配置され、各接続に関連するデータはProtocolに配置されます.したがって、ここではFactoryにプロパティを追加して、Protocolセットを保存し、すべての接続サーバのクライアントを表すことができます.新しいクライアントがサーバに接続されている場合は、対応するProtocolインスタンスをセットに入れ、接続が切断されると、対応するProtocolをセットから削除します.サーバがパブリッシャからパブリッシュされたメッセージを受信すると、すべてのクライアントを巡回してメッセージを送信します.
# -*- coding:utf-8 –*-

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(LineOnlyReceiver): 

    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.clients.add(self) #           Protocol   clients

    def connectionLost(self, reason):
        self.factory.clients.remove(self) #            Protocol  

    def lineReceived(self, line):
        #        ,    
        for c in self.factory.clients:
            c.sendLine(line)

class TcpServerFactory(Factory):
    def __init__(self):
        self.clients = set() # set                  

    def buildProtocol(self, addr):
        return TcpServerHandle(self)

reactor.listenTCP(8080, TcpServerFactory())
reactor.run()

次は、メッセージをパブリッシュするためのクライアントと、メッセージを購読するクライアントの2つのクライアントプログラムです.
メッセージをパブリッシュするクライアントは簡単です.サーバwriteにメッセージを1つ送ればいいです.
public class PublishClient {

	public static void main(String[] args) throws IOException {

		Socket socket = null;
		OutputStream out = null;

		try {

			socket = new Socket("localhost", 8080);
			out = socket.getOutputStream();
			out.write("Hello\r
".getBytes()); // out.flush(); } finally { // out.close(); socket.close(); } } }

サブスクリプションメッセージのクライアントがサーバに接続されると、受信サーバから送信されるパブリケーションメッセージがブロックされます.
public class SubscribeClient {

	public static void main(String[] args) throws IOException {

		Socket socket = null;
		BufferedReader in = null;

		try {

			socket = new Socket("localhost", 8080);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

			while (true) {
				String line = in.readLine(); //             
				System.out.println(line);
			}

		} finally {
			//     
			in.close();
			socket.close();
		}
	}
}

MINA、Netty、Twistedサーバについてテストします.
1、テスト時にまずサーバーを起動する;2、サブスクリプションメッセージを実行するクライアントSubscribeClient、SubscribeClientは複数を開くことができる.3、最後にメッセージを発行するクライアントPublishClientを実行し、すべてのSubscribeClientの出力結果を複数回実行して表示することができる.
実行の結果、メッセージを発行するクライアントPublishClientがサーバにメッセージを発行すると、サーバは自発的にこのメッセージをすべてのTCP接続に転送し、すべてのサブスクリプションメッセージのクライアントSubscribeClientがこのメッセージを受信して印刷することが分かった.
作者:叉叉哥转载请注明出典:http://blog.csdn.net/xiao__gui/article/details/39396789