ビッグデータリアルタイム監視アラート
4324 ワード
文書ディレクトリ 1. flumeで収集したデータはKafka に送信される.1)カスタムソース 2)カスタムブロックInterceptor 3)カスタムsink 2.kafka->spark->es 3.kafka->spark->hbase 4.kafka->spark->hdfs 5.kafka->spark->redis(warnアラート) 1.flumeで収集したデータをKafkaに送信
カスタムソース->監視ディレクトリのファイルデータを絶えず収集し、収集に成功した後、ファイルを新しいファイルディレクトリに移動し、ディレクトリ名は日付フォーマットをカスタマイズし、channelまたはブロッキングカスタムブロッキング->sourceがプッシュしたデータを解析し、フィールドと内容が一致するかどうかを検証します.また、フィールド内のフォーマット効果(携帯電話番号、経緯度など)は、データが間違っている場合はjsonのフォーマットに変換し、postのリクエストでESに更新します.カスタムsink->手動でトランザクションをコミットし、kafkaに送信し、ロールバックに失敗し、送信データ量のしきい値を制限し、バッチ処理します.
1)カスタムソース
固定書き方、カスタムSourceはAbstractSourceを直接継承し、Configurableを実現し、PollableSourceインタフェースは公式サイトのカスタム例を参照することができる.http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#source
プロジェクトの主な実現方法configure():flumeを取得する.confのパラメータ
process():主な業務コードは、監視ディレクトリのファイルデータを絶えず収集し、収集に成功した後、ファイルを新しいファイルディレクトリに移動し、ディレクトリ名は日付フォーマットをカスタマイズし、channelまたはブロックにデータをプッシュします.
2)カスタムブロッキングインターセプタ
カスタムブロッカーによるInterceptorインタフェースの直接実装
3)カスタムsink
書き方:extends AbstractSink implements Configurable
2.kafka->spark->es
3.kafka->spark->hbase
4.kafka->spark->hdfs
5.kafka->spark->redis(warnアラート)
カスタムソース->監視ディレクトリのファイルデータを絶えず収集し、収集に成功した後、ファイルを新しいファイルディレクトリに移動し、ディレクトリ名は日付フォーマットをカスタマイズし、channelまたはブロッキングカスタムブロッキング->sourceがプッシュしたデータを解析し、フィールドと内容が一致するかどうかを検証します.また、フィールド内のフォーマット効果(携帯電話番号、経緯度など)は、データが間違っている場合はjsonのフォーマットに変換し、postのリクエストでESに更新します.カスタムsink->手動でトランザクションをコミットし、kafkaに送信し、ロールバックに失敗し、送信データ量のしきい値を制限し、バッチ処理します.
1)カスタムソース
固定書き方、カスタムSourceはAbstractSourceを直接継承し、Configurableを実現し、PollableSourceインタフェースは公式サイトのカスタム例を参照することができる.http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#source
プロジェクトの主な実現方法configure():flumeを取得する.confのパラメータ
flume.conf ,
tier1.sources.source1.successfile=/usr/chl/data/filedir_successful/
successfile = context.getString(FlumeConfConstant.SUCCESSFILE)
FlumeConfConstant.SUCCESSFILE successfile,
process():主な業務コードは、監視ディレクトリのファイルデータを絶えず収集し、収集に成功した後、ファイルを新しいファイルディレクトリに移動し、ディレクトリ名は日付フォーマットをカスタマイズし、channelまたはブロックにデータをプッシュします.
2)カスタムブロッキングインターセプタ
カスタムブロッカーによるInterceptorインタフェースの直接実装
public Event intercept(Event event):
public List<Event> intercept(List<Event> events):
public static class Builder implements Interceptor.Builder {
@Override
public void configure(Context context) {
}
@Override
public Interceptor build() {
return new DataCleanInterceptor();
}
}
3)カスタムsink
書き方:extends AbstractSink implements Configurable
2.kafka->spark->es
3.kafka->spark->hbase
4.kafka->spark->hdfs
5.kafka->spark->redis(warnアラート)