FlinkによるMYSQLデータの移行

4753 ワード

環境
1 Flink 1.4.1
2 java 1.8+
3 mysql 5.7+
 
Demoニーズ
FlinkでMYSQLデータベースからデータを移行し、データを処理して別のMYSQLライブラリに移行
 
ケースコード
package com.bigdata.flink.java.demo;

import com.bigdata.flink.scala.Constants;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.BatchTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;

/**
 * @author    
 * @Date 2018-03-01
 *   Flink  Mysql  
 */

public class FromMysqlToMysqlDemo {

    public static void main(String[] args) throws Exception{

        //      
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

        //  Table  
        BatchTableEnvironment tabEnv= BatchTableEnvironment.getTableEnvironment(env);

        //Table table = mutilDataSet (env, tabEnv);

        Table table = singleDataSet (env, tabEnv);

        //  Sink
        JDBCAppendTableSink jdbcSink=JDBCAppendTableSink.builder()
                .setDrivername(Constants.MYSQL_DRIVER_NAME())  //mysql  
                .setDBUrl(Constants.MYSQL_URL())               //url
                .setUsername(Constants.MYSQL_USERNAME())       //   
                .setPassword(Constants.MYSQL_PASSWD())         //  
                .setQuery("insert into test (login_name,role_name) values(?,?)").setParameterTypes(Types.STRING(),Types.STRING()).build();

        // table       sink
        table.writeToSink(jdbcSink);

        //    
        env.execute();
    }

    /**
     *   DataSet       
     * @param env
     * @param tabEnv
     * @return
     */
    private static Table mutilDataSet(ExecutionEnvironment env, BatchTableEnvironment tabEnv) {

        //      
        TypeInformation types[]={Types.STRING(),Types.STRING()};

        //        
        String filedNames[]={"id","name"};

        //   
        RowTypeInfo rowTypeInfo=new RowTypeInfo(types,filedNames);

        DataSet userDS = env.createInput (JDBCInputFormat
                .buildJDBCInputFormat ()
                .setDrivername (Constants.MYSQL_DRIVER_NAME ())
                .setDBUrl (Constants.MYSQL_URL ())
                .setUsername (Constants.MYSQL_USERNAME ())
                .setPassword (Constants.MYSQL_PASSWD ())
                .setQuery ("select cast(id as char),login_name from sys_user")
                .setRowTypeInfo (rowTypeInfo)
                .finish ());

        DataSet roleDS = env.createInput (JDBCInputFormat
                .buildJDBCInputFormat ()
                .setDrivername (Constants.MYSQL_DRIVER_NAME ())
                .setDBUrl (Constants.MYSQL_URL ())
                .setUsername (Constants.MYSQL_USERNAME ())
                .setPassword (Constants.MYSQL_PASSWD ())
                .setQuery ("select cast(user_id as char),role_name from sys_role")
                .setRowTypeInfo (rowTypeInfo)
                .finish ());

        tabEnv.registerDataSetInternal ("user", userDS);

        tabEnv.registerDataSetInternal ("role", roleDS);

        return tabEnv.sqlQuery ("select u.name,r.name from `user` as u inner join `role` as r on(u.id=r.id)");
    }


    /**
     *   MYSQL    sql     table  
     * @param env
     * @param tabEnv
     * @return
     */
    private static Table singleDataSet(ExecutionEnvironment env, BatchTableEnvironment tabEnv){


        //      
        TypeInformation types[]={Types.STRING(),Types.STRING()};

        //        
        String filedNames[]={"login_name","role_name"};

        //   
        RowTypeInfo rowTypeInfo=new RowTypeInfo(types,filedNames);


        DataSet ds = env.createInput (JDBCInputFormat
                .buildJDBCInputFormat ()
                .setDrivername (Constants.MYSQL_DRIVER_NAME ())   //MYSQL  
                .setDBUrl (Constants.MYSQL_URL ())                //URL
                .setUsername (Constants.MYSQL_USERNAME ())
                .setPassword (Constants.MYSQL_PASSWD ())
                .setQuery ("select u.login_name,r.role_name from sys_user u inner join sys_role r on (u.id=r.user_id)")
                .setRowTypeInfo (rowTypeInfo)
                .finish ());
        tabEnv.registerDataSetInternal ("ds", ds);
        return tabEnv.sqlQuery ("select login_name,role_name from `ds`");
    }

}