HIVEテーブルMySQLテーブルへの同期
33803 ワード
#!/bin/bash
source ~/.bash_profile
directory='/data/X. T. Xiao/HiveToMySQL/'
if [ -d $directory ]
then
echo " !"
else
echo “ !”
mkdir $directory
fi
cd $directory
rm -rf ./*
hive -e "select distinct cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type from tag_model.t_live_on_demand" >> /data/X. T. Xiao/HiveToMySQL/JHTJ_tongbu.txt
hive -e "select distinct cnt_date, recommended_cnt, play_cnt, boot_cnt, play_percent, recommended_percent, type from tag_model.t_boot_live" >> /data/X. T. Xiao/HiveToMySQL/KJTJ_tongbu.txt
hive -e "select distinct cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type from tag_model.t_vip_film" >> /data/X. T. Xiao/HiveToMySQL/VIPTS_tongbu.txt
hive -e "select distinct cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type from tag_model.t_binge_watching" >> /data/X. T. Xiao/HiveToMySQL/ZJTS_tongbu.txt
hive -e "select distinct cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type from tag_model.t_search_individuation" >> /data/X. T. Xiao/HiveToMySQL/QJTJ_tongbu.txt
echo 'Loading hive data to Local is OK !'
cd /data/X. T. Xiao/
rm -rf HiveToMySQL.tar.gz
cd /data/X. T. Xiao/
tar zcvf HiveToMySQL.tar.gz HiveToMySQL
echo 'Data compressed OK !'
* Step two: ftp , ftp_tran.py
#-*-coding:utf-8-*-
import sys
__author__ = 'X. T. Xiao'
#encoding=utf8
from ftplib import FTP # ftp
import sys
IP = '5.210.10.12'
user = 'X. T. Xiao'
password = 'NjExTllZGFmOTQ5'
def put_file(filename,path):
ftp=FTP() #
ftp.set_debuglevel(2) # 2,
ftp.connect(IP) # ftp sever
ftp.login(user,password)# ,
#print ftp.getwelcome() #
ftp.storbinary('STOR %s'%filename, open(path, 'rb',8192))
print('success')
ftp.close()
if __name__ == '__main__':
put_file('HiveToMySQL.tar.gz', '/data/X. T. Xiao/HiveToMySQL.tar.gz')
* Step three: ftp , tarToLocal.sh
#!/bin/bash
directory='/home/ftpdata/ftp_test/'
if [ -d $directory ]
then
echo " !"
else
echo “ !”
mkdir $directory
fi
cd $directory
rm -rf ./HiveToMySQL
wget ftp://5.210.10.12:21/HiveToMySQL.tar.gz --ftp-user=X. T. Xiao --ftp-password=NjExTllZGFmOTQ5 -r
cd $directory
tar -xzf ./5.210.10.12/HiveToMySQL.tar.gz
rm -rf /home/ftpdata/ftp_test/5.210.10.12
* Step four: jdbc MySQL , txtToMySQL.java
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.Statement;
public class TxtToMySql {
/**
* HIVE -e txt , , MySQL recommend t_live_on_demand
*
* @param fileName
*
* @throws SQLException
*/
public static void insertIntoJHTJTable(String fileName) throws SQLException {
Connection conn = null;
String sql;
try {
Class.forName("com.mysql.jdbc.Driver").newInstance();// mysql
System.out.println("MySQL driver is prepared OK !");
// Connection
conn = DriverManager.getConnection(
"jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
"rcmnd20151027"); // MYSQL
// Statement , executeUpdate ,
Statement stmt = conn.createStatement();
sql = "delete from t_live_on_demand";
stmt.executeUpdate(sql);
File file = new File(fileName);
BufferedReader reader = null;
String[] arrs = null;
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// ,
while ((tempString = reader.readLine()) != null) {
if (tempString.indexOf("[GC") != -1)
continue;
arrs = tempString.split("\t");
sql = "insert into t_live_on_demand"
+ "(cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type) "
+ "values " + "(" + arrs[0].replace("-", "") + ","
+ arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
+ arrs[4] + "," + arrs[5] + "," + arrs[6] + ","
+ arrs[7] + "," + arrs[8] + ")";
// System.out.println(sql);
stmt.executeUpdate(sql);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
} catch (SQLException e) {
System.out.println("MySQL ");
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
conn.close();
}
}
/**
* vip HIVE -e txt , , MySQL recommend t_vip_film
*
* @param fileName
*
* @throws SQLException
*/
public static void insertIntoVIPTSTable(String fileName)
throws SQLException {
Connection conn = null;
String sql;
try {
Class.forName("com.mysql.jdbc.Driver").newInstance();// mysql
System.out.println("MySQL driver is prepared OK !");
// Connection
conn = DriverManager.getConnection(
"jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
"rcmnd20151027"); // MYSQL
// Statement , executeUpdate ,
Statement stmt = conn.createStatement();
sql = "delete from t_vip_film";
stmt.executeUpdate(sql);
File file = new File(fileName);
BufferedReader reader = null;
String[] arrs = null;
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// ,
while ((tempString = reader.readLine()) != null) {
if (tempString.indexOf("[GC") != -1)
continue;
arrs = tempString.split("\t");
sql = "insert into t_vip_film"
+ "(cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type) "
+ "values " + "(" + arrs[0].replace("-", "") + ","
+ arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
+ arrs[4] + "," + arrs[5] +"," + arrs[6] + ")";
// System.out.println(sql);
stmt.executeUpdate(sql);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
} catch (SQLException e) {
System.out.println("MySQL ");
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
conn.close();
}
}
/**
* HIVE -e txt , , MySQL recommend t_binge_watching
*
* @param fileName
*
* @throws SQLException
*/
public static void insertIntoZJTSTable(String fileName) throws SQLException {
Connection conn = null;
String sql;
try {
Class.forName("com.mysql.jdbc.Driver").newInstance();// mysql
System.out.println("MySQL driver is prepared OK !");
// Connection
conn = DriverManager.getConnection(
"jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
"rcmnd20151027"); // MYSQL
// Statement , executeUpdate ,
Statement stmt = conn.createStatement();
sql = "delete from t_binge_watching";
stmt.executeUpdate(sql);
File file = new File(fileName);
BufferedReader reader = null;
String[] arrs = null;
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// ,
while ((tempString = reader.readLine()) != null) {
if (tempString.indexOf("[GC") != -1)
continue;
arrs = tempString.split("\t");
sql = "insert into t_binge_watching"
+ "(cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type) "
+ "values " + "(" + arrs[0].replace("-", "") + ","
+ arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
+ arrs[4] + "," + arrs[5] + "," + arrs[6] + ")";
// System.out.println(sql);
stmt.executeUpdate(sql);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
} catch (SQLException e) {
System.out.println("MySQL ");
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
conn.close();
}
}
/**
* HIVE -e txt , , MySQL recommend t_boot_live
*
* @param fileName
*
* @throws SQLException
*/
public static void insertIntoKJTJTable(String fileName) throws SQLException {
Connection conn = null;
String sql;
try {
Class.forName("com.mysql.jdbc.Driver").newInstance();// mysql
System.out.println("MySQL driver is prepared OK !");
// Connection
conn = DriverManager.getConnection(
"jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
"rcmnd20151027"); // MYSQL
// Statement , executeUpdate ,
Statement stmt = conn.createStatement();
sql = "delete from t_boot_live";
stmt.executeUpdate(sql);
File file = new File(fileName);
BufferedReader reader = null;
String[] arrs = null;
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// ,
while ((tempString = reader.readLine()) != null) {
if (tempString.indexOf("[GC") != -1)
continue;
arrs = tempString.split("\t");
sql = "insert into t_boot_live"
+ "(cnt_date, recommended_cnt, play_cnt, boot_cnt, play_percent, recommended_percent, type) "
+ "values " + "(" + arrs[0].replace("-", "") + ","
+ arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
+ arrs[4] + "," + arrs[5] + "," + arrs[6] + ")";
// System.out.println(sql);
stmt.executeUpdate(sql);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
} catch (SQLException e) {
System.out.println("MySQL ");
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
conn.close();
}
}
/**
* HIVE -e txt , , MySQL recommend t_search_individuation
*
* @param fileName
*
* @throws SQLException
*/
public static void insertIntoQJTJTable(String fileName) throws SQLException {
Connection conn = null;
String sql;
try {
Class.forName("com.mysql.jdbc.Driver").newInstance();// mysql
System.out.println("MySQL driver is prepared OK !");
// Connection
conn = DriverManager.getConnection(
"jdbc:mysql://17.7.113.133:8096/recommend", "recommend",
"rcmnd20151027"); // MYSQL
// Statement , executeUpdate ,
Statement stmt = conn.createStatement();
sql = "delete from t_search_individuation";
stmt.executeUpdate(sql);
File file = new File(fileName);
BufferedReader reader = null;
String[] arrs = null;
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// ,
while ((tempString = reader.readLine()) != null) {
if (tempString.indexOf("[GC") != -1)
continue;
arrs = tempString.split("\t");
sql = "insert into t_search_individuation"
+ "(cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type) "
+ "values " + "(" + arrs[0].replace("-", "") + ","
+ arrs[1] + "," + arrs[2] + "," + arrs[3] + ","
+ arrs[4] + "," + arrs[5] + "," + arrs[6] + ","
+ arrs[7] + "," + arrs[8] + ")";
// System.out.println(sql);
stmt.executeUpdate(sql);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
} catch (SQLException e) {
System.out.println("MySQL ");
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
conn.close();
}
}
public static void main(String[] args) throws SQLException {
String fileName1 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/JHTJ_tongbu.txt";
TxtToMySql.insertIntoJHTJTable(fileName1);
System.out.println("JHTJ is OK!");
String fileName2 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/KJTJ_tongbu.txt";
TxtToMySql.insertIntoKJTJTable(fileName2);
System.out.println("KJTJ is OK!");
String fileName3 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/QJTJ_tongbu.txt";
TxtToMySql.insertIntoQJTJTable(fileName3);
System.out.println("QJTJ is OK!");
String fileName4 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/VIPTS_tongbu.txt";
TxtToMySql.insertIntoVIPTSTable(fileName4);
System.out.println("VIPTS is OK!");
String fileName5 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/ZJTS_tongbu.txt";
TxtToMySql.insertIntoZJTSTable(fileName5);
System.out.println("ZJTS is OK!");
}
}
* Step five: ,
crontab -e 5 1 * * */bin/bash ./HiveToMySQL.sh 5 3 * * * python ./ftp_tran.py
crontab -e 15 3 * * */bin/bash ./tarToLocal.sh 35 3 * * * java -jar txtToMySQL.jar