Flume NGのInterceptor概要
Flumeで読み込んだファイルをさらに細分化して保存したい場合があります.例えば、sourceのデータはビジネスタイプによって別々に保存されます.具体的には、sourceのweb、wap、mediaなどの内容を別々に保存します.例えば、データを破棄したり変更したりします.この場合,インターセプタを用いることが考えられる.
flumeは、ブロッキングによってイベントを変更および破棄する機能を実現します.ブロッキングは、クラス継承org.apache.flume.interceptor.Interceptorインタフェースを定義することによって実現されます.ユーザーは、ノード定義ルールによってイベントを変更または破棄できます.Flumeはチェーンブロックをサポートし、構成で構築されたブロッククラスの名前を指定します.ソースの構成では、ブロッカーはスペース間隔のリストとして指定されます.ブロッキングは指定した順序で呼び出されます.ブロックが返すイベントのリストは、チェーン内の次のブロックに渡されます.ブロッキングがイベントを破棄する場合、ブロッキングはイベントリストを返すときにイベントを返さないだけです.ブロッカーがすべてのイベントを破棄する場合は、空のイベントリストを返します.
まず重要なオブジェクトEvent:eventはflume伝送の最小オブジェクトであり、sourceからデータを取得するとeventにカプセル化され、eventをchannelに送信し、sinkはchannelからeventを消費する.eventは、ヘッド(Map headers)とボディ(body)の2つの部分から構成されています.Headers部分はmapであり、body部分はStringまたはbyte[]などであってもよい.ここでbody部分は本当にデータを格納する場所であり、headers部分は本節で説明するinterceptorに使用されます.
Flume-NGにはいくつかのブロッカーがあります.
1、HostInterceptor:IP或いはhostnameでブロックする;
2、TimestampInterceptor:タイムスタンプでブロックする;
3.RegexExtractorInterceptor:このブロッカーは正規表現マッチンググループを抽出し、指定した正規表現を使用してマッチンググループをイベントのheaderとして追加する.また、イベントヘッダとしてマッチンググループを追加する前にマッチンググループをフォーマットするためのプラグイン可能なserializersもサポートされています.
4、RegexFilteringInterceptor:このブロッカーはイベントを選択的にフィルタします.イベントボディをテキストで解析し、構成されたルール式でテキストをマッチングします.指定された正規表現は、イベントを含むか除外するために使用できます.これと上の違いは、event.bodyが正則に合致する内容をheadersのvalueとして抽出する正規表現に従ってeventを選択的に通過させることです.
5、StaticInterceptor:eventのheaderのvalueをカスタマイズできます.
これらのクラスはorg.apache.flume.interceptorパッケージの下にあります.
これらのinterceptorは比較的簡単です.私たちはHostInterceptorを選んでinterceptorの原理と、どのように自分でinterceptorをカスタマイズするかを説明します.
これらのinterceptorはorg.apache.flume.interceptor.Interceptorインタフェースを実現しています.このインタフェースには4つの方法と1つの内部インタフェースがあります.
1、public void initialize()実行前の初期化は、一般的に実現する必要はありません(上記のいくつかはこの方法を実現していません).
2、public Event intercept(Event event)は単一eventを処理する.
3、public List intercept(List events)はeventを一括処理し、実際に上の2を上場サイクルで呼び出す.
4、public void close()はいくつかの掃除をすることができて、上のいくつかもこの方法を実現していません;
5、 public interface Builder extends ConfigurableはInterceptorオブジェクトを構築し、外部ではこのBuilderを使用してInterceptorオブジェクトを取得します.
自分でカスタマイズするには、上の2,3,5を完成しなければなりません.
次にorg.apache.flume.interceptor.HostInterceptorを見てみましょう.すべてのコードは次のとおりです.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.interceptor;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.flume.interceptor.HostInterceptor.Constants.*;
/**
* Simple Interceptor class that sets the host name or IP on all events
* that are intercepted.
* The host header is named host
and its format is either the FQDN
* or IP of the host on which this interceptor is run.
*
*
* Properties:
*
* preserveExisting: Whether to preserve an existing value for 'host'
* (default is false)
*
* useIP: Whether to use IP address or fully-qualified hostname for 'host'
* header value (default is true)
*
* hostHeader: Specify the key to be used in the event header map for the
* host name. (default is "host")
*
* Sample config:
*
*
* agent.sources.r1.channels = c1
* agent.sources.r1.type = SEQ
* agent.sources.r1.interceptors = i1
* agent.sources.r1.interceptors.i1.type = host
* agent.sources.r1.interceptors.i1.preserveExisting = true
* agent.sources.r1.interceptors.i1.useIP = false
* agent.sources.r1.interceptors.i1.hostHeader = hostname
*
*
*/
public class HostInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory
.getLogger(HostInterceptor.class);
private final boolean preserveExisting;
private final String header;
private String host = null;
/**
* Only {@link HostInterceptor.Builder} can build me
*/
private HostInterceptor(boolean preserveExisting,
boolean useIP, String header) {
this.preserveExisting = preserveExisting;
this.header = header;
InetAddress addr;
try {
addr = InetAddress.getLocalHost();
if (useIP) {
host = addr.getHostAddress();
} else {
host = addr.getCanonicalHostName();
}
} catch (UnknownHostException e) {
logger.warn("Could not get local host address. Exception follows.", e);
}
}
@Override
public void initialize() {
// no-op
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map headers = event.getHeaders();
if (preserveExisting && headers.containsKey(header)) {
return event;
}
if(host != null) {
headers.put(header, host);
}
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List intercept(List events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instances of the HostInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
private boolean useIP = USE_IP_DFLT;
private String header = HOST;
@Override
public Interceptor build() {
return new HostInterceptor(preserveExisting, useIP, header);
}
@Override
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
useIP = context.getBoolean(USE_IP, USE_IP_DFLT);
header = context.getString(HOST_HEADER, HOST);
}
}
public static class Constants {
public static String HOST = "host";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
public static String USE_IP = "useIP";
public static boolean USE_IP_DFLT = true;
public static String HOST_HEADER = "hostHeader";
}
}
Constantsクラスは、パラメータクラスおよびデフォルトのパラメータです.
BuilderクラスはHostInterceptorオブジェクトを構築し、まずconfigure(Context context)メソッドを使用してプロファイル内のinterceptorのパラメータを取得し、次にメソッドbuild()を使用してHostInterceptorオブジェクトを返します.
1、preserveExistingはeventのヘッダに本interceptorで指定したヘッダが含まれている場合、このヘッダを保持するかどうか、trueは保持するかどうかを示す.
2、useIPは自機のIPアドレスをheaderのvalueとして使用するかどうかを表し、trueはIPを使用し、デフォルトはtrueである.
3、headerはeventのheadersのkeyで、デフォルトはhostです.
HostInterceptor:
1、コンストラクション関数は付与のほか、useIPに基づいてIPまたはhostnameを取得する.
2、intercept(Event event)メソッドはeventのヘッダを設定する場所で、まずheadersオブジェクトを取得し、preserveExisting=trueを同時に満たし、headers.containsKey(header)がeventに直接戻り、そうでなければheaders:headers.put(header,host)を設定します.
3、intercept(List events)メソッドは、上記2をループ呼び出しするメソッドである.
他のいくつかのInterceptorも似ていることは明らかです.プロファイルでsourceのinterceptorを構成する場合、自分でカスタマイズしたinterceptorの場合は、com.MyInterceptor$Builderなどの完全なクラス名+¥Builderをtypeパラメータに割り当てる必要があります.
これによりheadersを設定すると、後続のフローでselectorによってサブディビジョンストレージを実現できます.