Flink消費kafkaデータ書き込みelasticsearch

36165 ワード

@羲凡——もっとよく生きるためだけに
Flink消費kafkaデータ書き込みelasticsearch
一.前提準備
1.elasticsearchとkibanaをインストールします.インストールしない場合は、下のリンクをクリックしてelasticsearchインストールkibanaを表示します.
2.pom.xmlファイルに当社に依存するバージョンを追加するには少し古いです.es 5です.そうでなければ、対応するバージョンの依存を検索しなければなりません.
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-connector-elasticsearch5_2.12artifactId>
    <version>${flink.version}version>
dependency>

二.直接上コード
package test;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

import java.net.InetSocketAddress;
import java.util.*;

public class FlinkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setParallelism(3);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "cdh01.com:9092,cdh02.com:9092,cdh03.com:9092");
        properties.setProperty("group.id", "test227");
        properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        properties.setProperty("max.poll.records","1000");
        properties.setProperty("max.partition.fetch.bytes","5242880");

        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
                "testtopic",
                new SimpleStringSchema(),
                properties);
        consumer.setStartFromLatest();
        DataStreamSource<String> text = env.addSource(consumer, "Kafka").setParallelism(3);
        DataStream<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String str, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] arr = str.split(" ");
                for (String s : arr) {
                    collector.collect(new Tuple2<>(s, 1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(3)).sum(1);
        sum.print();


        Map<String, String> userConfig = new HashMap<>();
        userConfig.put("cluster.name", "test-es");
        userConfig.put("bulk.flush.max.actions", "2000");
        userConfig.put("bulk.flush.max.size.mb","10");
        userConfig.put("bulk.flush.interval.ms","1000");
        userConfig.put("transport.tcp.compress", "true");

        String esServers = "cdh01.com:9300,cdh02.com:9300,cdh03.com:9300";

        List<InetSocketAddress> transportAddresses = new ArrayList<>();
        for (String serverAndPortStr : esServers.split(",")) {
            String[] serverAndPortArr = serverAndPortStr.split(":");
            transportAddresses.add(new InetSocketAddress(serverAndPortArr[0], Integer.valueOf(serverAndPortArr[1])));
        }

        sum.addSink(new ElasticsearchSink<>(userConfig, transportAddresses,
                new ElasticsearchSinkFunction<Tuple2<String, Integer>>(){
                    public IndexRequest createIndexRequest(Tuple2<String, Integer> element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element.f0+":"+element.f1);

                        return Requests.indexRequest()
                                .index("my-index")
                                .type("my-type")
                                .source(json);
                    }
                    @Override
                    public void process(Tuple2<String, Integer> tuple2, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(tuple2));
                    }
                },
                new ActionRequestFailureHandler(){
                    @Override
                    public void onFailure(ActionRequest action,
                                          Throwable failure,
                                          int i,
                                          RequestIndexer indexer) throws Throwable {
                        IndexRequest indexRequest = (IndexRequest) action;
                        if (ExceptionUtils.findThrowable(failure,EsRejectedExecutionException.class).isPresent()) {
                            // full queue; re-add document for indexing
                            indexer.add(indexRequest);
                        } else if (ExceptionUtils.findThrowable(failure,ElasticsearchParseException.class).isPresent()) {
                            // malformed document; simply drop request without failing sink
                        } else {
                            // for all other failures, fail the sink
                            // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                            throw failure;
                        }
                    }
                })).name("toES");

        env.execute("Kafka2ES");
    }
}


三.運行と結果展示
カフカのマシンで実行
kafka-console-producer --topic testtopic \
--broker-list cdh01.com:9092,cdh02.com:9092,cdh03.com:9092
aaa bbb aaaと入力し、kibanaアドレスにログインします.http://cdh03.com:5601、結果表示コマンド
GET my-index/_search
{
  "query": {
    "match_all": {}
  }
}

結果表示(hits内の内容のみ表示)
"hits": [
  {
    "_index": "my-index",
    "_type": "my-type",
    "_id": "AW6gZVp_gjxciqwfvKPH",
    "_score": 1,
    "_source": {
      "data": "bbb:1"
    }
  },
  {
    "_index": "my-index",
    "_type": "my-type",
    "_id": "AW6gZVp_gjxciqwfvKPG",
    "_score": 1,
    "_source": {
      "data": "aaa:2"
    }
  }
]

====================================================================
@羲凡——もっとよく生きるためだけに
ブログに何か問題があれば、メッセージ交換を歓迎します.