MQTTの学習研究(二)moquete-mqttの使用のmqtt brook erの起動
10459 ワード
MQTTで 公式サイト ( http://mqtt.org/software)には多くのMQTTの実現形態がある。具体的には公式サイトを参照してください。MoquetteはApache Minaのモデルに基づくJava MQTT brook erです。Minaを使ったことがあるクラスメートは実はbrookの起動過程はMinaアプリケーションの起動です。
MQTT moquetteでは、MINAをベースのメッセージとして転送する方式を採用している。 本クラスの目的はMQTT moquette Brokerを起動する方式であり、本明細書のソースコードは moquete-brook er-011-jar-with-dependencies.jarのserverクラスを直接起動したいならmoquett-brook er-011-jar-with-dependencies.jarのファイル方式 いくつかのコマンドを実行できます。 java-jar moquete-brook er-011-jar-with-dependencies.jar google codeダウンロードMQTT moquette Brokerアドレス: http://code.google.com/p/moquette-mqtt/ GITダウンロードMQTT moquette clientアドレス: https://github.com/fusesource/mqtt-client
アプリケーションではMQTTのアプリケーションを使用します。
MQTT moquetteのbrookサービス開始コードは以下の通りです。
以上のコードから分かるように、購読、心拍検出、接続が切断され、接続時には、関連するプロトコルエンコーダオブジェクトクラスに関連するエンコーダオブジェクトを追加する必要があります。
MQTTHandlerクラスは、主なbrookとして、メッセージを発行し、購読するトラフィックプロセッサクラスである。
IoServiceStatistics類情報統計類。主な統計はmina応用の中で情報の統計を読みます。
上のコードは主にMQTT moquetteの起動について説明します。サービスセグメントのリリースメッセージとクライアントの購読受信情報の実現について説明します。
MQTTの学習研究(三)moquete-mqttの利用mqttサービスはテーマ情報を発表します。
http://topmanopensource.iteye.com/blog/1699386
MQTTの学習研究(四)moquete-mqttの利用のmqttクライアントは購読して主題情報を受信します。
永久リンク:http://topmanopensource.iteye.com/blog/1699408
MQTT moquetteでは、MINAをベースのメッセージとして転送する方式を採用している。 本クラスの目的はMQTT moquette Brokerを起動する方式であり、本明細書のソースコードは moquete-brook er-011-jar-with-dependencies.jarのserverクラスを直接起動したいならmoquett-brook er-011-jar-with-dependencies.jarのファイル方式 いくつかのコマンドを実行できます。 java-jar moquete-brook er-011-jar-with-dependencies.jar google codeダウンロードMQTT moquette Brokerアドレス: http://code.google.com/p/moquette-mqtt/ GITダウンロードMQTT moquette clientアドレス: https://github.com/fusesource/mqtt-client
アプリケーションではMQTTのアプリケーションを使用します。
MQTT moquetteのbrookサービス開始コードは以下の通りです。
package com.etrip.mqtt;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoServiceStatistics;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolDecoder;
import org.apache.mina.filter.codec.demux.DemuxingProtocolEncoder;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging;
import org.dna.mqtt.moquette.proto.ConnAckEncoder;
import org.dna.mqtt.moquette.proto.ConnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectDecoder;
import org.dna.mqtt.moquette.proto.DisconnectEncoder;
import org.dna.mqtt.moquette.proto.MQTTLoggingFilter;
import org.dna.mqtt.moquette.proto.PingReqDecoder;
import org.dna.mqtt.moquette.proto.PingRespEncoder;
import org.dna.mqtt.moquette.proto.PubAckDecoder;
import org.dna.mqtt.moquette.proto.PubAckEncoder;
import org.dna.mqtt.moquette.proto.PubCompDecoder;
import org.dna.mqtt.moquette.proto.PubCompEncoder;
import org.dna.mqtt.moquette.proto.PubCompMessage;
import org.dna.mqtt.moquette.proto.PubRecDecoder;
import org.dna.mqtt.moquette.proto.PubRecEncoder;
import org.dna.mqtt.moquette.proto.PubRelDecoder;
import org.dna.mqtt.moquette.proto.PubRelEncoder;
import org.dna.mqtt.moquette.proto.PublishDecoder;
import org.dna.mqtt.moquette.proto.PublishEncoder;
import org.dna.mqtt.moquette.proto.SubAckEncoder;
import org.dna.mqtt.moquette.proto.SubscribeDecoder;
import org.dna.mqtt.moquette.proto.UnsubAckEncoder;
import org.dna.mqtt.moquette.proto.UnsubscribeDecoder;
import org.dna.mqtt.moquette.proto.messages.ConnAckMessage;
import org.dna.mqtt.moquette.proto.messages.DisconnectMessage;
import org.dna.mqtt.moquette.proto.messages.PingRespMessage;
import org.dna.mqtt.moquette.proto.messages.PubAckMessage;
import org.dna.mqtt.moquette.proto.messages.PubRecMessage;
import org.dna.mqtt.moquette.proto.messages.PubRelMessage;
import org.dna.mqtt.moquette.proto.messages.PublishMessage;
import org.dna.mqtt.moquette.proto.messages.SubAckMessage;
import org.dna.mqtt.moquette.proto.messages.UnsubAckMessage;
import org.dna.mqtt.moquette.server.MQTTHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* MQTT moquette MINA
*
* MQTT moquette Broker ,
* moquette-broker-0.1-jar-with-dependencies.jar server
* moquette-broker-0.1-jar-with-dependencies.jar jar
*
* java -jar moquette-broker-0.1-jar-with-dependencies.jar
*
*
* google code MQTT moquette Broker :
* http://code.google.com/p/moquette-mqtt/
*
* GIT MQTT moquette client :
* https://github.com/fusesource/mqtt-client
*
* @author longgangbai
*
*
*/
public class MQTTBrokerProxyServer {
private static final Logger LOG = LoggerFactory.getLogger(MQTTBrokerProxyServer.class);
public static final String STORAGE_FILE_PATH = System.getProperty("user.home") + File.separator + "moquette_store.hawtdb";
private IoAcceptor m_acceptor;
SimpleMessaging messaging;
public static void main(String[] args)
throws IOException
{
new MQTTBrokerProxyServer().startServer();
}
protected void startServer() throws IOException
{
//
DemuxingProtocolDecoder decoder = new DemuxingProtocolDecoder();
decoder.addMessageDecoder(new ConnectDecoder());//
decoder.addMessageDecoder(new PublishDecoder());//
decoder.addMessageDecoder(new PubAckDecoder());//
decoder.addMessageDecoder(new PubRelDecoder());
decoder.addMessageDecoder(new PubRecDecoder());//
decoder.addMessageDecoder(new PubCompDecoder());
decoder.addMessageDecoder(new SubscribeDecoder());//
decoder.addMessageDecoder(new UnsubscribeDecoder());//
decoder.addMessageDecoder(new DisconnectDecoder());//
decoder.addMessageDecoder(new PingReqDecoder());// ping
//
DemuxingProtocolEncoder encoder = new DemuxingProtocolEncoder();
encoder.addMessageEncoder(ConnAckMessage.class, new ConnAckEncoder());//
encoder.addMessageEncoder(SubAckMessage.class, new SubAckEncoder());//
encoder.addMessageEncoder(UnsubAckMessage.class, new UnsubAckEncoder());//
encoder.addMessageEncoder(PubAckMessage.class, new PubAckEncoder());//
encoder.addMessageEncoder(PubRecMessage.class, new PubRecEncoder());//
encoder.addMessageEncoder(PubCompMessage.class, new PubCompEncoder());
encoder.addMessageEncoder(PubRelMessage.class, new PubRelEncoder());
encoder.addMessageEncoder(PublishMessage.class, new PublishEncoder());//
encoder.addMessageEncoder(PingRespMessage.class, new PingRespEncoder());// ping
encoder.addMessageEncoder(DisconnectMessage.class,new DisconnectEncoder());//
this.m_acceptor = new NioSocketAcceptor();
//
this.m_acceptor.getFilterChain().addLast("logger", new MQTTLoggingFilter("SERVER LOG"));
//
this.m_acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(encoder, decoder));
//
MQTTHandler handler = new MQTTHandler();
//
this.messaging = SimpleMessaging.getInstance();
this.messaging.init();
//
handler.setMessaging(this.messaging);
//
this.m_acceptor.setHandler(handler);
((NioSocketAcceptor)this.m_acceptor).setReuseAddress(true);
((NioSocketAcceptor)this.m_acceptor).getSessionConfig().setReuseAddress(true);
this.m_acceptor.getSessionConfig().setReadBufferSize(2048);
this.m_acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
this.m_acceptor.getStatistics().setThroughputCalculationInterval(10);
this.m_acceptor.getStatistics().updateThroughput(System.currentTimeMillis());
//
this.m_acceptor.bind(new InetSocketAddress(1883));
// ip
LOG.info("Server binded"+InetAddress.getLocalHost().getHostAddress());
try {
Thread.sleep(100000000000000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// broker
Runtime.getRuntime().addShutdownHook(new Thread()
{
public void run() {
MQTTBrokerProxyServer.this.stopServer();
}
});
}
protected void stopServer() {
LOG.info("Server stopping...");
this.messaging.stop();
//Mina IO
IoServiceStatistics statistics = this.m_acceptor.getStatistics();
statistics.updateThroughput(System.currentTimeMillis());
System.out.println(String.format("Total read bytes: %d, read throughtput: %f (b/s)", new Object[] { Long.valueOf(statistics.getReadBytes()), Double.valueOf(statistics.getReadBytesThroughput()) }));
System.out.println(String.format("Total read msgs: %d, read msg throughtput: %f (msg/s)", new Object[] { Long.valueOf(statistics.getReadMessages()), Double.valueOf(statistics.getReadMessagesThroughput()) }));
//
for (IoSession session : this.m_acceptor.getManagedSessions().values()) {
if ((session.isConnected()) && (!session.isClosing())) {
session.close(false);
}
}
// IoAcceptor
this.m_acceptor.unbind();
this.m_acceptor.dispose();
LOG.info("Server stopped");
}
}
以上のコードから分かるように、購読、心拍検出、接続が切断され、接続時には、関連するプロトコルエンコーダオブジェクトクラスに関連するエンコーダオブジェクトを追加する必要があります。
MQTTHandlerクラスは、主なbrookとして、メッセージを発行し、購読するトラフィックプロセッサクラスである。
IoServiceStatistics類情報統計類。主な統計はmina応用の中で情報の統計を読みます。
上のコードは主にMQTT moquetteの起動について説明します。サービスセグメントのリリースメッセージとクライアントの購読受信情報の実現について説明します。
MQTTの学習研究(三)moquete-mqttの利用mqttサービスはテーマ情報を発表します。
http://topmanopensource.iteye.com/blog/1699386
MQTTの学習研究(四)moquete-mqttの利用のmqttクライアントは購読して主題情報を受信します。
永久リンク:http://topmanopensource.iteye.com/blog/1699408