MQTTの学習研究(二)moquete-mqttの使用のmqtt brook erの起動


MQTTで 公式サイト ( http://mqtt.org/software)には多くのMQTTの実現形態がある。具体的には公式サイトを参照してください。MoquetteはApache Minaのモデルに基づくJava MQTT brook erです。Minaを使ったことがあるクラスメートは実はbrookの起動過程はMinaアプリケーションの起動です。
 
 
 MQTT moquetteでは、MINAをベースのメッセージとして転送する方式を採用している。  本クラスの目的はMQTT moquette Brokerを起動する方式であり、本明細書のソースコードは  moquete-brook er-011-jar-with-dependencies.jarのserverクラスを直接起動したいならmoquett-brook er-011-jar-with-dependencies.jarのファイル方式 いくつかのコマンドを実行できます。         java-jar moquete-brook er-011-jar-with-dependencies.jar  google codeダウンロードMQTT moquette Brokerアドレス:    http://code.google.com/p/moquette-mqtt/    GITダウンロードMQTT moquette clientアドレス:    https://github.com/fusesource/mqtt-client
 
 
アプリケーションではMQTTのアプリケーションを使用します。
MQTT moquetteのbrookサービス開始コードは以下の通りです。
package com.etrip.mqtt;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoServiceStatistics;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolDecoder;
import org.apache.mina.filter.codec.demux.DemuxingProtocolEncoder;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging;
import org.dna.mqtt.moquette.proto.ConnAckEncoder;
import org.dna.mqtt.moquette.proto.ConnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectEncoder;
import org.dna.mqtt.moquette.proto.MQTTLoggingFilter;
import org.dna.mqtt.moquette.proto.PingReqDecoder;
import org.dna.mqtt.moquette.proto.PingRespEncoder;
import org.dna.mqtt.moquette.proto.PubAckDecoder;
import org.dna.mqtt.moquette.proto.PubAckEncoder;
import org.dna.mqtt.moquette.proto.PubCompDecoder;
import org.dna.mqtt.moquette.proto.PubCompEncoder;
import org.dna.mqtt.moquette.proto.PubCompMessage;
import org.dna.mqtt.moquette.proto.PubRecDecoder;
import org.dna.mqtt.moquette.proto.PubRecEncoder;
import org.dna.mqtt.moquette.proto.PubRelDecoder;
import org.dna.mqtt.moquette.proto.PubRelEncoder;
import org.dna.mqtt.moquette.proto.PublishDecoder;
import org.dna.mqtt.moquette.proto.PublishEncoder;
import org.dna.mqtt.moquette.proto.SubAckEncoder;
import org.dna.mqtt.moquette.proto.SubscribeDecoder;
import org.dna.mqtt.moquette.proto.UnsubAckEncoder;
import org.dna.mqtt.moquette.proto.UnsubscribeDecoder;
import org.dna.mqtt.moquette.proto.messages.ConnAckMessage;
import org.dna.mqtt.moquette.proto.messages.DisconnectMessage;
import org.dna.mqtt.moquette.proto.messages.PingRespMessage;
import org.dna.mqtt.moquette.proto.messages.PubAckMessage;
import org.dna.mqtt.moquette.proto.messages.PubRecMessage;
import org.dna.mqtt.moquette.proto.messages.PubRelMessage;
import org.dna.mqtt.moquette.proto.messages.PublishMessage;
import org.dna.mqtt.moquette.proto.messages.SubAckMessage;
import org.dna.mqtt.moquette.proto.messages.UnsubAckMessage;
import org.dna.mqtt.moquette.server.MQTTHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 
 *  MQTT moquette    MINA            
 * 
 *        MQTT moquette Broker    ,
 *          moquette-broker-0.1-jar-with-dependencies.jar   server 
 *         moquette-broker-0.1-jar-with-dependencies.jar jar    
 *            
 *        java -jar moquette-broker-0.1-jar-with-dependencies.jar
 * 
 * 
 * google code   MQTT moquette Broker   :
 *    http://code.google.com/p/moquette-mqtt/
 *    
 * GIT   MQTT moquette client   :
 *  https://github.com/fusesource/mqtt-client 
 *    
 * @author longgangbai
 * 
 * 
 */
public class MQTTBrokerProxyServer {
	  private static final Logger LOG = LoggerFactory.getLogger(MQTTBrokerProxyServer.class);

	  public static final String STORAGE_FILE_PATH = System.getProperty("user.home") + File.separator + "moquette_store.hawtdb";
	  private IoAcceptor m_acceptor;
	  SimpleMessaging messaging;

	  public static void main(String[] args)
	    throws IOException
	  {
	    new MQTTBrokerProxyServer().startServer();
	  }

	  protected void startServer() throws IOException
	  {
		//        
	    DemuxingProtocolDecoder decoder = new DemuxingProtocolDecoder();
	    decoder.addMessageDecoder(new ConnectDecoder());//    
	    decoder.addMessageDecoder(new PublishDecoder());//    
	    decoder.addMessageDecoder(new PubAckDecoder());//      
	    decoder.addMessageDecoder(new PubRelDecoder());
	    decoder.addMessageDecoder(new PubRecDecoder());//    
	    decoder.addMessageDecoder(new PubCompDecoder());
	    decoder.addMessageDecoder(new SubscribeDecoder());//    
	    decoder.addMessageDecoder(new UnsubscribeDecoder());//      
	    decoder.addMessageDecoder(new DisconnectDecoder());//      
	    decoder.addMessageDecoder(new PingReqDecoder());//  ping    
	    
        //        
	    DemuxingProtocolEncoder encoder = new DemuxingProtocolEncoder();

	    encoder.addMessageEncoder(ConnAckMessage.class, new ConnAckEncoder());//    
	    encoder.addMessageEncoder(SubAckMessage.class, new SubAckEncoder());//      
	    encoder.addMessageEncoder(UnsubAckMessage.class, new UnsubAckEncoder());//      
	    encoder.addMessageEncoder(PubAckMessage.class, new PubAckEncoder());//      
	    encoder.addMessageEncoder(PubRecMessage.class, new PubRecEncoder());//    
	    encoder.addMessageEncoder(PubCompMessage.class, new PubCompEncoder());
	    encoder.addMessageEncoder(PubRelMessage.class, new PubRelEncoder());
	    encoder.addMessageEncoder(PublishMessage.class, new PublishEncoder());//    
	    encoder.addMessageEncoder(PingRespMessage.class, new PingRespEncoder());//  ping    
	    encoder.addMessageEncoder(DisconnectMessage.class,new DisconnectEncoder());//      
	    
	    this.m_acceptor = new NioSocketAcceptor();
        //        
	    this.m_acceptor.getFilterChain().addLast("logger", new MQTTLoggingFilter("SERVER LOG"));
	    //        
	    this.m_acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(encoder, decoder));
        //        
	    MQTTHandler handler = new MQTTHandler();
	    //            
	    this.messaging = SimpleMessaging.getInstance();
	    this.messaging.init();
        //     
	    handler.setMessaging(this.messaging);
	    //        
	    this.m_acceptor.setHandler(handler);
	    
	    ((NioSocketAcceptor)this.m_acceptor).setReuseAddress(true);
	    ((NioSocketAcceptor)this.m_acceptor).getSessionConfig().setReuseAddress(true);
	    this.m_acceptor.getSessionConfig().setReadBufferSize(2048);
	    this.m_acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
	    this.m_acceptor.getStatistics().setThroughputCalculationInterval(10);
	    this.m_acceptor.getStatistics().updateThroughput(System.currentTimeMillis());
	    //     
	    this.m_acceptor.bind(new InetSocketAddress(1883));
	    //        ip   
	    LOG.info("Server binded"+InetAddress.getLocalHost().getHostAddress());
	    try {
			Thread.sleep(100000000000000L);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//  broker       
	    Runtime.getRuntime().addShutdownHook(new Thread()
	    {
	      public void run() {
	    	  MQTTBrokerProxyServer.this.stopServer();
	      }
	    });
	  }

	  protected void stopServer() {
	    LOG.info("Server stopping...");
        
	    this.messaging.stop();
        //Mina  IO    
	    IoServiceStatistics statistics = this.m_acceptor.getStatistics();
	    statistics.updateThroughput(System.currentTimeMillis());
	    System.out.println(String.format("Total read bytes: %d, read throughtput: %f (b/s)", new Object[] { Long.valueOf(statistics.getReadBytes()), Double.valueOf(statistics.getReadBytesThroughput()) }));
	    System.out.println(String.format("Total read msgs: %d, read msg throughtput: %f (msg/s)", new Object[] { Long.valueOf(statistics.getReadMessages()), Double.valueOf(statistics.getReadMessagesThroughput()) }));
        //       
	    for (IoSession session : this.m_acceptor.getManagedSessions().values()) {
	      if ((session.isConnected()) && (!session.isClosing())) {
	        session.close(false);
	      }
	    }
        //    IoAcceptor  
	    this.m_acceptor.unbind();
	    this.m_acceptor.dispose();
	    LOG.info("Server stopped");
	  }
	}
 
      以上のコードから分かるように、購読、心拍検出、接続が切断され、接続時には、関連するプロトコルエンコーダオブジェクトクラスに関連するエンコーダオブジェクトを追加する必要があります。
        MQTTHandlerクラスは、主なbrookとして、メッセージを発行し、購読するトラフィックプロセッサクラスである。
        IoServiceStatistics類情報統計類。主な統計はmina応用の中で情報の統計を読みます。
 
上のコードは主にMQTT moquetteの起動について説明します。サービスセグメントのリリースメッセージとクライアントの購読受信情報の実現について説明します。
 
MQTTの学習研究(三)moquete-mqttの利用mqttサービスはテーマ情報を発表します。
 
http://topmanopensource.iteye.com/blog/1699386
 
 
 
 
MQTTの学習研究(四)moquete-mqttの利用のmqttクライアントは購読して主題情報を受信します。 
永久リンク:http://topmanopensource.iteye.com/blog/1699408