pythonを使用してhiveクエリー結果をmysqlに保存


pythonスクリプト接続hive戻り値コードの取得
#!/usr/bin/env python import sys from hive_service import ThriftHive from hive_service.ttypes import HiveServerException from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol try: transport = TSocket.TSocket('localhost', 10000) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = ThriftHive.Client(protocol) transport.open() client.execute('ADD jar/home/soft/hadoop/hive-0.7.0/lib/hive-contrib-0.7.0.jar') query = ''' select count(1) from apilog ''' client.execute(query) row = client.fetchOne() print row transport.close() except Thrift.TException, tx: print '%s' % (tx.message)
 
このスクリプトはadd jar/fileをサポートします
 
 
ユーザHiveクエリ結果の戻り値更新MySQL指定テーブル指定フィールド
def mysqlExe(sql):

        conn = MySQLdb.connect (host = "10.10.111.111",

                                   user = "user",

                                   passwd = "password",

                                   db = "database")

        cursor = conn.cursor ()

        cursor.execute (sql)

        cursor.close ()

        conn.close ()

def hiveExeUpdate(sql,db,tableName,column,date):

	try:

	   transport = TSocket.TSocket('10.20.134.199', 10000)

	   transport = TTransport.TBufferedTransport(transport)

	   protocol = TBinaryProtocol.TBinaryProtocol(transport)

	   client = ThriftHive.Client(protocol)

	   transport.open()

	   client.execute(sql)
     for item in clinet.fetchAll()
	   update_sql= " update  " + tableName + " set " + column + " = " + item + " where id = '" + date + "'"

	   mysqlExe(update_sql) //    SQL  

	   transport.close()

       except Thrift.TException, tx:

	   print '%s' % (tx.message)