「0から1まで学ぶFlink」——FlinkがKafkaデータを読み上げてMySQLに一括書き込む


前言
以前は「0から1までFlinkを学ぶ」--Data Sinkをどのようにカスタマイズしましたか?文章の中で実はすでに少しデータをMySQLに书いて、しかしいくつかの配置化のものは当时书くのが死んで、通用することができなくて、最近知识の星の中で友达が私に叫びます:kafkaの中からデータを読み取って、Flinkを通じて予集积して、それからデータベースの接続プールを作成してmysqlにデータを大量に书く例を书きます.
そこでこの文章があって、もっと多くの質問と私が書きたい文章は知識の星の中で私のように質問することができて、私は質問に基づいてタイムリーに答えて、できるだけ文章の修正をします.
の準備を
この2つの依存をpomに追加する必要がありますxmlで

    mysql
    mysql-connector-java
    5.1.34

kafkaデータの読み出し
ここで私は依然として以前のstudentクラスを使って、自分でkafkaをローカルに起こしていくつかのテストデータを作って、ここで私たちは1本のデータをテストしてsleep 10 sを送って、kafkaの中で1分に6本のデータを送ることを意味します.
package com.zhisheng.connectors.mysql.utils;

import com.zhisheng.common.utils.GsonUtil;
import com.zhisheng.connectors.mysql.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Desc:  kafka    ,      main      
 * Created by zhisheng on 2019-02-17
 * Blog: http://www.54tianzhisheng.cn/tags/Flink/
 */
public class KafkaUtil {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "student";  //kafka topic     flink        topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);

        for (int i = 1; i <= 100; i++) {
            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord(topic, null, null, GsonUtil.toJson(student));
            producer.send(record);
            System.out.println("    : " + GsonUtil.toJson(student));
            Thread.sleep(10 * 1000); //       sleep 10s,    1    6  
        }
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}

kafkaからデータを読み出し、studentオブジェクトにシーケンス化します.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>(
        "student",   //   kafka topic            topic   
        new SimpleStringSchema(),
        props)).setParallelism(1)
        .map(string -> GsonUtil.fromJson(string, Student.class)); //,       student   

RichSinkFunctionではsinkがmysqlに1つのデータを渡すとinvokeメソッドが1回呼び出されるので、一括書き込みを実現するにはsinkの前にデータを集約したほうがいいです.では、ここでは1分間のウィンドウを開いてStudentデータを集約します.
student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() {
    @Override
    public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception {
        ArrayList students = Lists.newArrayList(values);
        if (students.size() > 0) {
            System.out.println("1        student       :" + students.size());
            out.collect(students);
        }
    }
});

データベースへの書き込み
ここではDBCP接続プールを使用してデータベースmysql,pomに接続する.xmlに依存を追加するには:

    org.apache.commons
    commons-dbcp2
    2.1.1

他のデータベース接続プールを使用する場合は、対応する依存関係を追加します.
ここではMySQLにデータを書き込み、以前の記事と同様にRichSinkFunctionクラスを継承し、書き換える方法です.
package com.zhisheng.connectors.mysql.sinks;

import com.zhisheng.connectors.mysql.model.Student;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;

/**
 * Desc:      sink     mysql
 * Created by zhisheng_tian on 2019-02-17
 * Blog: http://www.54tianzhisheng.cn/tags/Flink/
 */
public class SinkToMySQL extends RichSinkFunction> {
    PreparedStatement ps;
    BasicDataSource dataSource;
    private Connection connection;

    /**
     * open()        ,       invoke               
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new BasicDataSource();
        connection = getConnection(dataSource);
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //         
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     *               invoke()   
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(List value, Context context) throws Exception {
        //      
        for (Student student : value) {
            ps.setInt(1, student.getId());
            ps.setString(2, student.getName());
            ps.setString(3, student.getPassword());
            ps.setInt(4, student.getAge());
            ps.addBatch();
        }
        int[] count = ps.executeBatch();//     
        System.out.println("      " + count.length + "   ");
    }


    private static Connection getConnection(BasicDataSource dataSource) {
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //  ,         mysql          、  
        dataSource.setUrl("jdbc:mysql://localhost:3306/test");
        dataSource.setUsername("root");
        dataSource.setPassword("root123456");
        //          
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            con = dataSource.getConnection();
            System.out.println("     :" + con);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

コアクラスMain
コア・プログラムは次のとおりです.
public class Main {
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>(
                "student",   //   kafka topic            topic   
                new SimpleStringSchema(),
                props)).setParallelism(1)
                .map(string -> GsonUtil.fromJson(string, Student.class)); //
        student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() {
            @Override
            public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception {
                ArrayList students = Lists.newArrayList(values);
                if (students.size() > 0) {
                    System.out.println("1        student       :" + students.size());
                    out.collect(students);
                }
            }
        }).addSink(new SinkToMySQL());

        env.execute("flink learning connectors kafka");
    }
}

プロジェクトの実行
Mainクラスを実行してKafkaUtilsを実行する.JAvaクラス!
次の図は、Kafkaに送信されたデータです.
次の図は、Mainクラスを実行するログです.4つの接続プールが作成されるのは、デフォルトの4つの並列度のためです.addSinkという演算子で並列度を1に設定すると、接続プールが作成されます.
次の図は、一括データベース挿入の結果です.
まとめ
知識星の友人の疑問から書きました彼の条件(ロット/データベース接続プール/mysql書き込み)を満たしているはずですが、確かにネット上の多くの例は簡単なdemo形式で、すべて単一のデータでデータベース接続を作成してMySQLに挿入します.書くデータの量が多いと、MySQLの書き込みに大きなプレッシャーがかかります.これも私が以前『0から1までFlinkを勉強しました』で——FlinkはElasticSearchにデータを書き込み、ESが強調したように、性能を向上させるには必ず大量の書き込みが必要です.私たちの今の文章では、データ量が大きいと、1分間に1万件のデータが集約されると、一括書きは1件を書くよりも性能が向上するのではないでしょうか.
このオリジナルアドレスは次のとおりです.http://www.54tianzhisheng.cn/2019/01/15/Flink-MySQL-sink/、無断転載は禁止されています.
私に注目して
微信公衆番号:zhisheng
また、私は自分でFlinkの学習資料を整理して、現在すでにすべて微信の公衆番号に置いています.あなたは私の微信を加えることができます:zhisheng_Tian、キーワードに返信:Flinkは無条件に取得できます.
もっとプライベートな資料は知識の星に参加してください!
Githubコードウェアハウス
https://github.com/zhisheng17/flink-learning/
今後、このプロジェクトのすべてのコードはこの倉庫に置かれ、flinkを勉強するいくつかのdemoとブログが含まれています.
本明細書のプロジェクトコードはhttps://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-mysql
関連記事
1、『0から1までFlinkを学ぶ』——Apache Flink紹介
2、『0から1までFlinkを学ぶ』--MacにFlink 1.6を構築する.0環境の構築とシンプルなプログラムの実行
3、『0から1までFlinkを学ぶ』——Flinkプロファイルの詳細
4、『0から1までFlinkを学ぶ』——Data Source紹介
5、「0から1までFlinkを学ぶ」--Data Sourceをカスタマイズするにはどうすればいいですか?
6、『0から1までFlinkを学ぶ』——Data Sink紹介
7、「0から1までFlinkを学ぶ」--Data Sinkをどのようにカスタマイズしますか?
8、『0から1までFlinkを学ぶ』——Flink Data transformation(変換)
9、『0から1までFlinkを学ぶ』——Flinkの中のStream Windowsを紹介する
10、『0から1までFlinkを学ぶ』——Flinkの中のいくつかのTimeの詳細
11、『0から1までFlinkを学ぶ』——FlinkはデータをElasticSearchに書き込む
12、『0から1までFlinkを学ぶ』--Flinkプロジェクトはどのように実行しますか?
13、『0から1までFlinkを学ぶ』——FlinkはKafkaにデータを書き込む
14、『0から1までFlinkを学ぶ』——Flink JobManager高可用性構成
15、『0から1までFlinkを学ぶ』——Flink parallelismとSlot紹介
16、『0から1までFlinkを勉強する』――FlinkはKafkaデータを読み上げてMySQLに大量に書き込む