HIVEテーブルMySQLテーブルへの同期

33803 ワード

  • 問題の説明:現在、企業はクラスタ内の1台のマシン上のHIVEテーブルデータを別のクラスタの1台のマシンのMySQLテーブルに同期する必要があり、この2つのマシンのセグメントは接続されていません.どのように実現しますか?
  • 問題解析:2台の機器が接続されていないため、HIVEテーブルが存在するクラスタのSqoopを介してMySQLテーブルに直接データを同期することはできません.ただし、HIVEテーブルが存在するクラスタにはftpサービスがあり、MySQLテーブルが存在するクラスタには外部ネットワークにアクセスできるマシンがあります.
  • 解決構想:
  • Step one:HIVEテーブルデータをHIVEテーブルマシンローカル
  • にエクスポートする
  • Step two:HIVEテーブルがあるマシンをローカルにエクスポートファイル圧縮パッケージし、ftpを介してftpサイト
  • に格納する.
  • Step three:MySQLテーブルが存在するクラスタの外部ネットワークにアクセスできるマシンftpサイトから圧縮ファイルをダウンロードし、ローカル
  • に解凍する
  • Step four:JAVA jdbcでローカルファイルのデータをMySQLライブラリ
  • に格納
  • 具体的に関連するプログラム:
  • Step one:HIVEテーブルデータをローカルにエクスポートし、圧縮、HiveToMySQL.sh

  • #!/bin/bash
    source ~/.bash_profile
    directory='/data/X. T. Xiao/HiveToMySQL/'
    if [ -d $directory ]
    then
      echo "    !"
    else
       echo “     !”
       mkdir $directory
    fi
    
    cd $directory
    rm -rf ./*
    
    hive -e "select distinct cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type from tag_model.t_live_on_demand" >> /data/X. T. Xiao/HiveToMySQL/JHTJ_tongbu.txt
    hive -e "select distinct cnt_date, recommended_cnt, play_cnt, boot_cnt, play_percent, recommended_percent, type from tag_model.t_boot_live" >> /data/X. T. Xiao/HiveToMySQL/KJTJ_tongbu.txt
    hive -e "select distinct cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type from tag_model.t_vip_film" >> /data/X. T. Xiao/HiveToMySQL/VIPTS_tongbu.txt
    hive -e "select distinct cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type from tag_model.t_binge_watching" >> /data/X. T. Xiao/HiveToMySQL/ZJTS_tongbu.txt
    hive -e "select distinct cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type from tag_model.t_search_individuation" >> /data/X. T. Xiao/HiveToMySQL/QJTJ_tongbu.txt
    
    echo 'Loading hive data to Local is OK !'
    
    cd /data/X. T. Xiao/
    rm -rf HiveToMySQL.tar.gz
    
    cd /data/X. T. Xiao/
    tar zcvf HiveToMySQL.tar.gz HiveToMySQL
    
    echo 'Data compressed OK !'
    * Step two:         ftp  , ftp_tran.py
    
    #-*-coding:utf-8-*-
    import sys
    
    __author__ = 'X. T. Xiao'
    #encoding=utf8
    from ftplib import FTP #  ftp  
    import sys
    IP = '5.210.10.12'
    user = 'X. T. Xiao'
    password = 'NjExTllZGFmOTQ5'
    
    def put_file(filename,path):
        ftp=FTP() #    
        ftp.set_debuglevel(2) #      2,      
        ftp.connect(IP) #   ftp sever   
        ftp.login(user,password)#      ,  
        #print ftp.getwelcome() #       
        ftp.storbinary('STOR %s'%filename, open(path, 'rb',8192))
        print('success')
        ftp.close()
    if __name__ == '__main__':
        put_file('HiveToMySQL.tar.gz', '/data/X. T. Xiao/HiveToMySQL.tar.gz')
    * Step three: ftp       , tarToLocal.sh
    
    #!/bin/bash
    directory='/home/ftpdata/ftp_test/'
    if [ -d $directory ]
    then
      echo "    !"
    else
       echo “     !”
       mkdir $directory
    fi
    
    cd $directory
    rm -rf ./HiveToMySQL
    
    wget ftp://5.210.10.12:21/HiveToMySQL.tar.gz --ftp-user=X. T. Xiao --ftp-password=NjExTllZGFmOTQ5 -r 
    
    
    cd $directory
    tar -xzf ./5.210.10.12/HiveToMySQL.tar.gz
    
    
    rm -rf /home/ftpdata/ftp_test/5.210.10.12
    * Step four:       jdbc   MySQL , txtToMySQL.java
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.sql.Connection;
    import java.sql.Statement;
    
    
    public class TxtToMySql {
        /**
         *       HIVE -e txt  ,         ,      MySQL recommend t_live_on_demand 
         * 
         * @param fileName
         *               
         * @throws SQLException
         */
        public static void insertIntoJHTJTable(String fileName) throws SQLException {
            Connection conn = null;
            String sql;
            try {
                Class.forName("com.mysql.jdbc.Driver").newInstance();//     mysql  
                System.out.println("MySQL driver is prepared OK !");
                //   Connection         
                conn = DriverManager.getConnection(
                        "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
                        "rcmnd20151027"); //     MYSQL
                // Statement        ,  executeUpdate      ,      
                Statement stmt = conn.createStatement();
                sql = "delete from t_live_on_demand";
                stmt.executeUpdate(sql);
                File file = new File(fileName);
                BufferedReader reader = null;
                String[] arrs = null;
                try {
                    reader = new BufferedReader(new FileReader(file));
                    String tempString = null;
                    //       ,      
                    while ((tempString = reader.readLine()) != null) {
                        if (tempString.indexOf("[GC") != -1)
                            continue;
                        arrs = tempString.split("\t");
                        sql = "insert into t_live_on_demand"
                                + "(cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type) "
                                + "values " + "(" + arrs[0].replace("-", "") + ","
                                + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
                                + arrs[4] + "," + arrs[5] + "," + arrs[6] + ","
                                + arrs[7] + "," + arrs[8] + ")";
                        // System.out.println(sql);
                        stmt.executeUpdate(sql);
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException e1) {
                        }
                    }
                }
    
            } catch (SQLException e) {
                System.out.println("MySQL    ");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                conn.close();
            }
    
        }
    
        /**
         *   vip  HIVE -e txt  ,         ,      MySQL recommend t_vip_film 
         * 
         * @param fileName
         *               
         * @throws SQLException
         */
        public static void insertIntoVIPTSTable(String fileName)
                throws SQLException {
            Connection conn = null;
            String sql;
            try {
                Class.forName("com.mysql.jdbc.Driver").newInstance();//     mysql  
                System.out.println("MySQL driver is prepared OK !");
                //   Connection         
                conn = DriverManager.getConnection(
                        "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
                        "rcmnd20151027"); //     MYSQL
                // Statement        ,  executeUpdate      ,      
                Statement stmt = conn.createStatement();
                sql = "delete from t_vip_film";
                stmt.executeUpdate(sql);
                File file = new File(fileName);
                BufferedReader reader = null;
                String[] arrs = null;
                try {
                    reader = new BufferedReader(new FileReader(file));
                    String tempString = null;
                    //       ,      
                    while ((tempString = reader.readLine()) != null) {
                        if (tempString.indexOf("[GC") != -1)
                            continue;
                        arrs = tempString.split("\t");
                        sql = "insert into t_vip_film"
                                + "(cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type) "
                                + "values " + "(" + arrs[0].replace("-", "") + ","
                                + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
                                + arrs[4] + "," + arrs[5] +"," + arrs[6] + ")";
                        // System.out.println(sql);
                        stmt.executeUpdate(sql);
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException e1) {
                        }
                    }
                }
    
            } catch (SQLException e) {
                System.out.println("MySQL    ");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                conn.close();
            }
    
        }
    
        /**
         *       HIVE -e txt  ,         ,      MySQL recommend t_binge_watching 
         * 
         * @param fileName
         *               
         * @throws SQLException
         */
        public static void insertIntoZJTSTable(String fileName) throws SQLException {
            Connection conn = null;
            String sql;
            try {
                Class.forName("com.mysql.jdbc.Driver").newInstance();//     mysql  
                System.out.println("MySQL driver is prepared OK !");
                //   Connection         
                conn = DriverManager.getConnection(
                        "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
                        "rcmnd20151027"); //     MYSQL
                // Statement        ,  executeUpdate      ,      
                Statement stmt = conn.createStatement();
                sql = "delete from t_binge_watching";
                stmt.executeUpdate(sql);
                File file = new File(fileName);
                BufferedReader reader = null;
                String[] arrs = null;
                try {
                    reader = new BufferedReader(new FileReader(file));
                    String tempString = null;
                    //       ,      
                    while ((tempString = reader.readLine()) != null) {
                        if (tempString.indexOf("[GC") != -1)
                            continue;
                        arrs = tempString.split("\t");
                        sql = "insert into t_binge_watching"
                                + "(cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type) "
                                + "values " + "(" + arrs[0].replace("-", "") + ","
                                + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
                                + arrs[4] + "," + arrs[5] + "," + arrs[6] + ")";
                        // System.out.println(sql);
                        stmt.executeUpdate(sql);
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException e1) {
                        }
                    }
                }
    
            } catch (SQLException e) {
                System.out.println("MySQL    ");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                conn.close();
            }
    
        }
    
        /**
         *         HIVE -e txt  ,         ,      MySQL recommend t_boot_live 
         * 
         * @param fileName
         *               
         * @throws SQLException
         */
        public static void insertIntoKJTJTable(String fileName) throws SQLException {
            Connection conn = null;
            String sql;
            try {
                Class.forName("com.mysql.jdbc.Driver").newInstance();//     mysql  
                System.out.println("MySQL driver is prepared OK !");
                //   Connection         
                conn = DriverManager.getConnection(
                        "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
                        "rcmnd20151027"); //     MYSQL
                // Statement        ,  executeUpdate      ,      
                Statement stmt = conn.createStatement();
                sql = "delete from t_boot_live";
                stmt.executeUpdate(sql);
                File file = new File(fileName);
                BufferedReader reader = null;
                String[] arrs = null;
                try {
                    reader = new BufferedReader(new FileReader(file));
                    String tempString = null;
                    //       ,      
                    while ((tempString = reader.readLine()) != null) {
                        if (tempString.indexOf("[GC") != -1)
                            continue;
                        arrs = tempString.split("\t");
                        sql = "insert into t_boot_live"
                                + "(cnt_date, recommended_cnt, play_cnt, boot_cnt, play_percent, recommended_percent, type) "
                                + "values " + "(" + arrs[0].replace("-", "") + ","
                                + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
                                + arrs[4] + "," + arrs[5] + "," + arrs[6] + ")";
                        // System.out.println(sql);
                        stmt.executeUpdate(sql);
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException e1) {
                        }
                    }
                }
    
            } catch (SQLException e) {
                System.out.println("MySQL    ");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                conn.close();
            }
    
        }
    
    
        /**
         *        HIVE -e txt  ,         ,      MySQL recommend t_search_individuation 
         * 
         * @param fileName
         *               
         * @throws SQLException
         */
        public static void insertIntoQJTJTable(String fileName) throws SQLException {
            Connection conn = null;
            String sql;
            try {
                Class.forName("com.mysql.jdbc.Driver").newInstance();//     mysql  
                System.out.println("MySQL driver is prepared OK !");
                //   Connection         
                conn = DriverManager.getConnection(
                        "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
                        "rcmnd20151027"); //     MYSQL
                // Statement        ,  executeUpdate      ,      
                Statement stmt = conn.createStatement();
                sql = "delete from t_search_individuation";
                stmt.executeUpdate(sql);
                File file = new File(fileName);
                BufferedReader reader = null;
                String[] arrs = null;
                try {
                    reader = new BufferedReader(new FileReader(file));
                    String tempString = null;
                    //       ,      
                    while ((tempString = reader.readLine()) != null) {
                        if (tempString.indexOf("[GC") != -1)
                            continue;
                        arrs = tempString.split("\t");
                        sql = "insert into t_search_individuation"
                                + "(cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type) "
                                + "values " + "(" + arrs[0].replace("-", "") + ","
                                + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
                                + arrs[4] + "," + arrs[5] + "," + arrs[6] + ","
                                + arrs[7] + "," + arrs[8] + ")";
                        // System.out.println(sql);
                        stmt.executeUpdate(sql);
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException e1) {
                        }
                    }
                }
    
            } catch (SQLException e) {
                System.out.println("MySQL    ");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                conn.close();
            }
    
        }   
    
    
    
        public static void main(String[] args) throws SQLException {
            String fileName1 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/JHTJ_tongbu.txt";
            TxtToMySql.insertIntoJHTJTable(fileName1);
            System.out.println("JHTJ is OK!");
            String fileName2 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/KJTJ_tongbu.txt";
            TxtToMySql.insertIntoKJTJTable(fileName2);
            System.out.println("KJTJ is OK!");  
            String fileName3 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/QJTJ_tongbu.txt";
            TxtToMySql.insertIntoQJTJTable(fileName3);
            System.out.println("QJTJ is OK!");      
            String fileName4 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/VIPTS_tongbu.txt";
            TxtToMySql.insertIntoVIPTSTable(fileName4);
            System.out.println("VIPTS is OK!");     
            String fileName5 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/ZJTS_tongbu.txt";
            TxtToMySql.insertIntoZJTSTable(fileName5);
            System.out.println("ZJTS is OK!");              
        }
    }
    
    * Step five:  ,      
    

    crontab -e 5 1 * * */bin/bash ./HiveToMySQL.sh 5 3 * * * python ./ftp_tran.py
    crontab -e 15 3 * * */bin/bash ./tarToLocal.sh 35 3 * * * java -jar txtToMySQL.jar
  • 注意事項:
  • hive-eのtxtテキスト結果に[GC....]が存在する可能性があります.類似文字、
  • を処理する必要があります
  • 各パスは
  • を注意深くテストする必要がある.
  • タイミングタスクタスクタスクは、タスクの処理遅延
  • を考慮する.