Flumeはデータを収集して直接Solrに入る

21769 ワード

一.背景


CDHプラットフォームでは、NRT(near real-time)の近接リアルタイム検索を実現するために、flumeが収集したデータがsolrに入力され、solrは対外クエリーを提供する.flumeがデータを収集した後(例えば、マシン名dn 12.hadoopをテストする)、Morphlineを使用してデータのETLを実現する必要があり、solrのデータフォーマットに変換するため、構成は3つのステップに分かれています.

二.solr構成


collectionまたは更新の作成
solrctl instancedir --generate /home/data/collectionSignalling
solrctl instancedir --create collectionSignalling /home/data/collectionSignalling
solrctl collection --create collectionSignalling  -s 6 -m 15 -r 2 -c collectionSignalling -a

schemaを変更します.xml構成
 <fields> 
   <field name="_version_" type="long" indexed="true" stored="true"/>
   <field name="_root_" type="string" indexed="true" stored="false"/>   
   <field name="timestamp" type="tdate" indexed="true" stored="true" default="NOW+8HOUR" multiValued="false"/>
   <field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/>    
   <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />    
   
  <field name="province_code" type="string" indexed="true" stored="true" multiValued="false"/>
  <field name="caller" type="string" indexed="true" stored="true" multiValued="false"/>
  <field name="called" type="string" indexed="true" stored="true" multiValued="false"/>
  <field name="call_status" type="string" indexed="true" stored="true" multiValued="false"/>
  <field name="call_time" type="tdate" indexed="true" stored="true" multiValued="false"/>
  <field name="length_time" type="long" indexed="true" stored="true" multiValued="false"/>

  <dynamicField name="ignored_*" type="ignored" multiValued="true"/>

 fields>
 
 <uniqueKey>iduniqueKey>

変更後の更新
solrctl instancedir --update collectionSignalling /home/data/collectionSignalling
solrctl collection --reload collectionSignalling

三.flume構成


1.Flume構成インタフェースにおけるFlume依存Solrの構成、すなわちSolrサービスオプション選択Solr
2.CMはflume agentでファイルを構成し、morphlineFileは直接ファイル名を使用し、パスを追加する必要はありません.
tier1.sources=source1  
tier1.channels=channel1  
tier1.sinks=sink1  

tier1.sources.source1.type = avro  
tier1.sources.source1.bind = 0.0.0.0  
tier1.sources.source1.port = 44444  
tier1.sources.source1.channels=channel1  

tier1.channels.channel1.type=memory  
tier1.channels.channel1.capacity=10000  

tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink  
tier1.sinks.sink1.channel = channel1  
tier1.sinks.sink1.morphlineFile = morphlines.conf  
tier1.sinks.sink1.morphlineId = collectionSignalling  

四.Morphlines構成


CDH flume agentのMorphlinesファイルオプションにETL構成を追加
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# Application configuration file in HOCON format (Human-Optimized Config Object Notation). 
# HOCON syntax is defined at http://github.com/typesafehub/config/blob/master/HOCON.md
# and also used by Akka (http://www.akka.io) and Play (http://www.playframework.org/).
# For more examples see http://doc.akka.io/docs/akka/2.1.2/general/configuration.html

# morphline.conf example file
# this is a comment

# Specify server locations in a SOLR_LOCATOR variable; used later in variable substitutions:
SOLR_LOCATOR : {
  # Name of solr collection
  collection : collectionSignalling

  # ZooKeeper ensemble
  zkHost : "nn1.hadoop:2181,nn2.hadoop:2181,dn7.hadoop:2181,dn5.hadoop:2181,dn3.hadoop:2181/solr"

  # Relative or absolute path to a directory containing conf/solrconfig.xml and conf/schema.xml
  # If this path is uncommented it takes precedence over the configuration stored in ZooKeeper.  
  # solrHomeDir : "example/solr/collection1"

  # The maximum number of documents to send to Solr per network batch (throughput knob)
  # batchSize : 100
}

# Specify an array of one or more morphlines, each of which defines an ETL 
# transformation chain. A morphline consists of one or more (potentially 
# nested) commands. A morphline is a way to consume records (e.g. Flume events, 
# HDFS files or blocks), turn them into a stream of records, and pipe the stream 
# of records through a set of easily configurable transformations on it's way to 
# Solr (or a MapReduceIndexerTool RecordWriter that feeds via a Reducer into Solr).
morphlines : [
  {
    # Name used to identify a morphline. E.g. used if there are multiple morphlines in a 
    # morphline config file
    id : collectionSignalling 

    # Import all morphline commands in these java packages and their subpackages.
    # Other commands that may be present on the classpath are not visible to this morphline.
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]

    commands : [                    
      { 
        #Flume json , json
        readJson{}
      } 
        # json filed solr 
      { 
         extractJsonPaths {  
          flatten : false  
          paths : {   
            province_code : /province_code              
            caller : /caller  
            called : /called  
            call_status : /call_status  
            call_time : /call_time 
            length_time : /length_time     
          }  
        } 
      }

      # Consume the output record of the previous command and pipe another record downstream.
      #
      # convert timestamp field to native Solr timestamp format
      # e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
      #{
      #  convertTimestamp {
      #    field : call_time
      #    inputFormats : ["yyyyMMdd HH:mm:ss"]
      #    inputTimezone : Asia/Shanghai
      #   outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSSZ"                                 
      #    outputTimezone : Asia/Shanghai
      #  }
      #}
      # UUID
      {generateUUID {
       field : id
      }}

      # Consume the output record of the previous command and pipe another record downstream.
      #
      # Command that sanitizes record fields that are unknown to Solr schema.xml by either 
      # deleting them (renameToPrefix is absent or a zero length string), or by moving them to a
      # field prefixed with the given renameToPrefix (e.g. renameToPrefix = "ignored_" to use 
      # typical dynamic Solr fields).
      #
      # Recall that Solr throws an exception on any attempt to load a document that contains a 
      # field that isn't specified in schema.xml.
      {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}

          renameToPrefix : "ignored_"
        }
      }  

      # log the record at DEBUG level to SLF4J
      { logDebug { format : "output record: {}", args : ["@{}"] } }    

      # load the record into a SolrServer or MapReduce SolrOutputFormat.
      # solr 
      { 
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }
    ]
  }
]

四.テスト


パス:/home/hadoop/test/zhenzhen/file 01などのfile 01ファイルを作成し、次のように追加します.
{"province_code":"150000","caller":"18353662886","called":"15335586466","call_status":"1","call_time":"20161221 08:51:40","length_time":"58526"}

flumeディレクトリにアクセスして次のコマンドを実行します.
[hadoop@db1 bin]$ cd /opt/cloudera/parcels/CDH/bin
[hadoop@db1 bin]$ flume-ng avro-client -H dn12.hadoop -p 44444 -F /home/hadoop/test/zhenzhen/file01

flumeログが正常かどうかを確認
1.コマンドラインで表示
[root@dn12 flume-ng]# pwd
/var/log/flume-ng
[root@dn12 flume-ng]# tail -f flume-cmf-flume-AGENT-dn12.hadoop.log

2.CMで確認し、詳細がない場合はflume agent->ログ->Agentレコードしきい値、ログレベルをTRACEに下げたり、CMインタフェースのログファイルメニューでログを表示したりできます.

五.参考記事

http://blog.csdn.net/xiao_jun_0820/article/details/40741997
http://www.cnblogs.com/arli/p/6158771.html