MQTTクライアント構築及びアプリケーション(Nodejs)

13873 ワード

需要:データ同期プログラムからリリースされたデータを購読し、受信したデータをSolrに書き込みます.MQTTを構築してsorに書き込むnode jsを例に挙げます.1.使用前にnpmをインストールしなければなりません.2.mqttライブラリnpm install mqtt 2をインストールします.sorライブラリnpm install sor 3をインストールします.index.jsファイルを作成します.
var mqtt = require('mqtt');
var log = require('./src/util/log');
// TODO:          ,         
var env = 'dev'; //         uat   prod


var mqServerCfg = {
  dev : "mqtt://127.0.0.1",
  uat : "mqtt://27.0.0.1",
  prod : "  ..."
};


var defaultSolrHost = {
  dev : {host : '127.0.0.1', port : '8983', baseUrl : '/solr'},
  uat : {host : '127.0.0.1', port : '8983', baseUrl : '/solr'},
  prod : {host : '  ...', port : '8983', baseUrl : '/solr'},
};


var workers = {
  'user/operation_information' : [
    {
      processor : './src/user/operation_information',
      solrCore : 'operation_information'
    }
  ],
};


//        ,          
var bind = function(topic, workerConfig, mqOption, defaultSolrOption) {
  // log.info('=============================');
  log.info('Start to check config for topic :' + topic);
  //    ,         
  if (!workerConfig.processor) {
    log.error(
        'e0001',
        'Checking config failed, the path of the processor is not defined, please check the seeting of workers.{topic}[n].processor!');
    return false;
  }


  if (!workerConfig.solrCore) {
    log.error(
        'e0002',
        'Checking config failed, the solr core is not defined, please check the seeting of workers.{topic}[n].solrCore!');
    return false;
  }


  var processor;
  try {
    processor = require(workerConfig.processor);
  } catch (e) {
    log.error('e0003',
              'Load processor "' + workerConfig.processor + '" failed!');
    return false;
  }


  var mqClient = mqtt.connect(mqOption);
  // log.info('i0002', mqClient + ' connected to MQTT Agent ' + mqOption);
  var solr = require('./src/util/solr');


  defaultSolrOption.host =
      workerConfig.solrHost ? workerConfig.solrHost : defaultSolrOption.host;
  defaultSolrOption.port =
      workerConfig.solrPort ? workerConfig.solrPort : defaultSolrOption.port;
  defaultSolrOption.baseUrl = workerConfig.solrBaseUrl
                                  ? workerConfig.solrBaseUrl
                                  : defaultSolrOption.baseUrl;
  defaultSolrOption.core = workerConfig.solrCore;
  processor.solrClient = solr.getClient(defaultSolrOption);
  mqClient.on('message',
              function(topic, message) { processor.process(topic, message); });


  mqClient.subscribe(topic);
  log.info('i0001', 'The topic ' + topic +
                        ' has been subscribed by the processor ' +
                        workerConfig.processor);
};


var start = function(environment) {
  //            
  environment = environment ? environment : 'dev';


  var mqttOptions = mqServerCfg[environment];
  var solrDefaultOptions = defaultSolrHost[environment];


  //     ,            
  for (var topic in workers) {
    var processors = workers[topic];
    if (!Array.isArray(processors) || processors.length === 0) {
      //       
      continue;
    }
    //         ,        mqtt client    
    for (var procId in processors) {
      bind(topic, processors[procId], mqttOptions, solrDefaultOptions);
    }
  }
};


start();
4.package.josnファイルを作成する
{
  "name": "mqtt_cloud_client",
  "version": "1.0.0",
  "description": "",
  "main": "worker.js",
  "dependencies": {
    "mqtt": "^1.7.4",
    "solr-client": "^0.6.0"
  },
  "devDependencies": {
    "blanket": "^1.2.3",
    "jshint": "^2.9.1",
    "mocha": "^2.4.5"
  },
  "scripts": {
    "test": "mocha",
    "start": "node index.js"
  },
  "author": "Milo Liu",
  "license": "ISC",
  "config": {
    "blanket": {
      "pattern": "src",
      "data-cover-never": [
        "node_modules",
        "test"
      ]
    }
  }
}
5.operation_information.js内容
//-------------------------
// New code
// ------------------------
var crypto = require('crypto');
var log = require('../util/log');
var date = require('../util/date');


var operation_information= function() { this.solrClient = null; };


operation_information.prototype.process = function(topic, message) {
  // get json from message
  var msgJson = {};
  try {
    msgJson = JSON.parse(message.toString());
  } catch (e) {
    log.error("e0101", "Invalid json string of the topic " + topic,
              message.toString());
    return false;
  }


  // --------------
  // check head and PARK_ID
  if (!msgJson.head || !msgJson.head.PARK_ID) {
    log.error("e0102",
              "Can't found the valid head.PARK_ID in the json of the topic " +
                  topic,
              msgJson);
    return false;
  }


  var parkId = msgJson.head.xxx;


  if (!msgJson.body || !Array.isArray(msgJson.body) ||
      msgJson.body.length === 0) {
    log.error("e0103", "Empty body, do nothing for the topic " + topic,
              msgJson);
    return false;
  }


  var docs = [];


  for (var docIdx in msgJson.body) {
    var doc = msgJson.body[docIdx];
    doc.ParkId = parkId;

    var id = [ parkId, doc.xx, doc.xx].join('@@');
    doc.Id = crypto.createHash("md5").update(id).digest("hex").toString();


    docs.push(doc);
  }
  log.info("operation_information:", docs.length);


  this.solrClient.add(
      docs, {"boost" : 1.0, "overwrite" : true, "commitWithin" : 1000},
      function(err, obj) {
        if (err) {
          log.error("e0110", "[" + topic + "]" + JSON.stringify(err), docs);
        }
        /*else {
           log.info("i0101","[" + topic +"]" + JSON.stringify(obj));
        }*/
      });
};


module.exports = new operation_information();
  • ディレクトリ構造
  • node–node_modules--sour———mqtt——–src—user——operation_information——–conf.json——operation_information.js--util–package.json–index.js