MQTTクライアント構築及びアプリケーション(Nodejs)
需要:データ同期プログラムからリリースされたデータを購読し、受信したデータをSolrに書き込みます.MQTTを構築してsorに書き込むnode jsを例に挙げます.1.使用前にnpmをインストールしなければなりません.2.mqttライブラリnpm install mqtt 2をインストールします.sorライブラリnpm install sor 3をインストールします.index.jsファイルを作成します.ディレクトリ構造 node–node_modules--sour———mqtt——–src—user——operation_information——–conf.json——operation_information.js--util–package.json–index.js
var mqtt = require('mqtt');
var log = require('./src/util/log');
// TODO: ,
var env = 'dev'; // uat prod
var mqServerCfg = {
dev : "mqtt://127.0.0.1",
uat : "mqtt://27.0.0.1",
prod : " ..."
};
var defaultSolrHost = {
dev : {host : '127.0.0.1', port : '8983', baseUrl : '/solr'},
uat : {host : '127.0.0.1', port : '8983', baseUrl : '/solr'},
prod : {host : ' ...', port : '8983', baseUrl : '/solr'},
};
var workers = {
'user/operation_information' : [
{
processor : './src/user/operation_information',
solrCore : 'operation_information'
}
],
};
// ,
var bind = function(topic, workerConfig, mqOption, defaultSolrOption) {
// log.info('=============================');
log.info('Start to check config for topic :' + topic);
// ,
if (!workerConfig.processor) {
log.error(
'e0001',
'Checking config failed, the path of the processor is not defined, please check the seeting of workers.{topic}[n].processor!');
return false;
}
if (!workerConfig.solrCore) {
log.error(
'e0002',
'Checking config failed, the solr core is not defined, please check the seeting of workers.{topic}[n].solrCore!');
return false;
}
var processor;
try {
processor = require(workerConfig.processor);
} catch (e) {
log.error('e0003',
'Load processor "' + workerConfig.processor + '" failed!');
return false;
}
var mqClient = mqtt.connect(mqOption);
// log.info('i0002', mqClient + ' connected to MQTT Agent ' + mqOption);
var solr = require('./src/util/solr');
defaultSolrOption.host =
workerConfig.solrHost ? workerConfig.solrHost : defaultSolrOption.host;
defaultSolrOption.port =
workerConfig.solrPort ? workerConfig.solrPort : defaultSolrOption.port;
defaultSolrOption.baseUrl = workerConfig.solrBaseUrl
? workerConfig.solrBaseUrl
: defaultSolrOption.baseUrl;
defaultSolrOption.core = workerConfig.solrCore;
processor.solrClient = solr.getClient(defaultSolrOption);
mqClient.on('message',
function(topic, message) { processor.process(topic, message); });
mqClient.subscribe(topic);
log.info('i0001', 'The topic ' + topic +
' has been subscribed by the processor ' +
workerConfig.processor);
};
var start = function(environment) {
//
environment = environment ? environment : 'dev';
var mqttOptions = mqServerCfg[environment];
var solrDefaultOptions = defaultSolrHost[environment];
// ,
for (var topic in workers) {
var processors = workers[topic];
if (!Array.isArray(processors) || processors.length === 0) {
//
continue;
}
// , mqtt client
for (var procId in processors) {
bind(topic, processors[procId], mqttOptions, solrDefaultOptions);
}
}
};
start();
4.package.josnファイルを作成する{
"name": "mqtt_cloud_client",
"version": "1.0.0",
"description": "",
"main": "worker.js",
"dependencies": {
"mqtt": "^1.7.4",
"solr-client": "^0.6.0"
},
"devDependencies": {
"blanket": "^1.2.3",
"jshint": "^2.9.1",
"mocha": "^2.4.5"
},
"scripts": {
"test": "mocha",
"start": "node index.js"
},
"author": "Milo Liu",
"license": "ISC",
"config": {
"blanket": {
"pattern": "src",
"data-cover-never": [
"node_modules",
"test"
]
}
}
}
5.operation_information.js内容//-------------------------
// New code
// ------------------------
var crypto = require('crypto');
var log = require('../util/log');
var date = require('../util/date');
var operation_information= function() { this.solrClient = null; };
operation_information.prototype.process = function(topic, message) {
// get json from message
var msgJson = {};
try {
msgJson = JSON.parse(message.toString());
} catch (e) {
log.error("e0101", "Invalid json string of the topic " + topic,
message.toString());
return false;
}
// --------------
// check head and PARK_ID
if (!msgJson.head || !msgJson.head.PARK_ID) {
log.error("e0102",
"Can't found the valid head.PARK_ID in the json of the topic " +
topic,
msgJson);
return false;
}
var parkId = msgJson.head.xxx;
if (!msgJson.body || !Array.isArray(msgJson.body) ||
msgJson.body.length === 0) {
log.error("e0103", "Empty body, do nothing for the topic " + topic,
msgJson);
return false;
}
var docs = [];
for (var docIdx in msgJson.body) {
var doc = msgJson.body[docIdx];
doc.ParkId = parkId;
var id = [ parkId, doc.xx, doc.xx].join('@@');
doc.Id = crypto.createHash("md5").update(id).digest("hex").toString();
docs.push(doc);
}
log.info("operation_information:", docs.length);
this.solrClient.add(
docs, {"boost" : 1.0, "overwrite" : true, "commitWithin" : 1000},
function(err, obj) {
if (err) {
log.error("e0110", "[" + topic + "]" + JSON.stringify(err), docs);
}
/*else {
log.info("i0101","[" + topic +"]" + JSON.stringify(obj));
}*/
});
};
module.exports = new operation_information();