ビッグデータリアルタイム監視アラート

4324 ワード

文書ディレクトリ
  • 1. flumeで収集したデータはKafka
  • に送信される.
  • 1)カスタムソース
  • 2)カスタムブロックInterceptor
  • 3)カスタムsink
  • 2.kafka->spark->es
  • 3.kafka->spark->hbase
  • 4.kafka->spark->hdfs
  • 5.kafka->spark->redis(warnアラート)
  • 1.flumeで収集したデータをKafkaに送信
    カスタムソース->監視ディレクトリのファイルデータを絶えず収集し、収集に成功した後、ファイルを新しいファイルディレクトリに移動し、ディレクトリ名は日付フォーマットをカスタマイズし、channelまたはブロッキングカスタムブロッキング->sourceがプッシュしたデータを解析し、フィールドと内容が一致するかどうかを検証します.また、フィールド内のフォーマット効果(携帯電話番号、経緯度など)は、データが間違っている場合はjsonのフォーマットに変換し、postのリクエストでESに更新します.カスタムsink->手動でトランザクションをコミットし、kafkaに送信し、ロールバックに失敗し、送信データ量のしきい値を制限し、バッチ処理します.
    1)カスタムソース
    固定書き方、カスタムSourceはAbstractSourceを直接継承し、Configurableを実現し、PollableSourceインタフェースは公式サイトのカスタム例を参照することができる.http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#source
    プロジェクトの主な実現方法configure():flumeを取得する.confのパラメータ
     flume.conf        ,  
    tier1.sources.source1.successfile=/usr/chl/data/filedir_successful/
      
    successfile = context.getString(FlumeConfConstant.SUCCESSFILE)
    FlumeConfConstant.SUCCESSFILE  successfile,     
    

    process():主な業務コードは、監視ディレクトリのファイルデータを絶えず収集し、収集に成功した後、ファイルを新しいファイルディレクトリに移動し、ディレクトリ名は日付フォーマットをカスタマイズし、channelまたはブロックにデータをプッシュします.
    2)カスタムブロッキングインターセプタ
    カスタムブロッカーによるInterceptorインタフェースの直接実装
    public Event intercept(Event event):          
    public List<Event> intercept(List<Event> events):   
    
         
    public static class Builder implements Interceptor.Builder {
            @Override
            public void configure(Context context) {
            }
            @Override
            public Interceptor build() {
                return new DataCleanInterceptor();
            }
        }
    

    3)カスタムsink
    書き方:extends AbstractSink implements Configurable
    2.kafka->spark->es
    3.kafka->spark->hbase
    4.kafka->spark->hdfs
    5.kafka->spark->redis(warnアラート)