記sql大ファイル切断転送及びpandasデータ前処理


データ準備:sqlファイルが大きすぎて、インポートが遅すぎて、本文はmysqlデータベースのインポートとエクスポートを迂回して、pythonを通じてsqlを直接分割して、そしてデータテーブル構造とデータテーブルデータファイルをそれぞれ相応のsqlファイルとcsvファイルに保存して、pandasデータ分析用に供します.
データ前処理:pandasは列全体のデータを処理するのが遅すぎて、マルチスレッドで処理して、一時ファイルに書き込んで、ソートを読み取った後に最終的な分析データを生成します
一、データ準備:sqlファイル切断転送csvファイル
mysqlファイルが大きすぎて、インポートが遅すぎて、本文はmysqlデータベースのインポートエクスポートを迂回して、直接sqlファイルを対応するテーブルのcsvファイルに転送します.
pythonはsqlを分割し、pandasデータ分析用にデータテーブル構造とデータテーブルデータファイルをそれぞれ対応するsqlファイルとcsvファイルに格納します.
本論文の実験データ,mysqldumpから導出したsqlファイルサイズ5 G,
  • headは、前のN行のデータを抽出し、抽出ルールを記述する.
  • head -100 sql > tmp.sql
    Windowsはgitをインストールしているのでgit bash端末でheadコマンドを使用できます
    この操作はmysqldumpからエクスポートされたデータファイルで、内容はテーブル構造とデータです.
    いくつかのID:
    Table structure for table `test1`
    Dumping data for table `test1`
    INSERT INTO `test1` VALUES 
    
  • 符号化(実装機能、未整備コード)
  • sqlfile = '111.sql'
    
    # sql    
    base_path = Path('D:/pd-data/test/')
    
    find_tpl_structure = '-- Table structure for table '
    find_tpl_structure_len = len(find_tpl_structure)
    
    find_tpl_data = '-- Dumping data for table '
    find_tpl_data_len = len('-- Dumping data for table ')
    
    curflag = None  #    start1 start2
    curtable = ''   # table name
    curtype = ''    # structure or data
    curcontent = [] #      
    
    time_s = time.time()
    
    #      structure sql, data csv
    def save_file(table, ftype, data):
        types = {'structure': '.sql', 'data': '.csv'}
        if table != '' and type != '' and table != []:
            nfile = base_path.joinpath('csv/{}_{}{}'.format(curtable, curtype, types[ftype]))
            if ftype == 'data':
                with open(nfile, 'a', newline='') as fs:
                    ff = csv.writer(fs)
                    ff.writerows(data)
            else:
                with open(nfile, 'w+', encoding='utf-8') as fs:
                    fs.writelines(data)
    
                print('{} ok'.format(nfile))
    
    
    with open(base_path.joinpath(sqlfile), 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            #            
            if curflag == 'start2' or curflag == 'start1':
                if curflag == 'start2':
                    if line.startswith('INSERT INTO '):
                        x = eval('[{}]'.format(line.strip()[22 + len(curtable):-1]))
                        total += len(x)
                        # curcontent.extend(x)
                        save_file(curtable, curtype, x)
                        print(total)
                else:
                    curcontent.append(line)
    
            #         :       ,        
            if line.startswith(find_tpl_structure):
                #      
                # save_file(curtable, curtype, curcontent)
                curcontent = []
    
                #      
                curflag = 'start1'
                curtype = 'structure'
                curtable = line.strip()[find_tpl_structure_len + 1:-1]
    
            #         :       ,        
            if line.startswith(find_tpl_data):
                #      
                save_file(curtable, curtype, curcontent)
                curcontent = []
    
                #      
                curflag = 'start2'
                curtype = 'data'
                total = 0
                curtable = line.strip()[find_tpl_data_len + 1:-1]
    
    
    time_e = time.time()
    print('runtime - {}s'.format(time_e - time_s))
    

    二、データの前処理
  • 年分のデータのみを読み込む
  • data = pd.read_csv(
        p.joinpath('deal_all.csv'),
        header=0,
        names=['id','name','addtime','dtype'],
    )
    
    df = data[data['addtime'].dt.year.isin([2016, 2017, 2018]) & ~data['dtype'].isin([0,2,3])]
    
    df.to_csv(
        p.joinpath('deal_2016_2017_2018.csv'),
        index=False,
        header=False,
    )
    
  • addtimeはタイムスタンプフィールドであり、datetime時間に変換する必要があり、1000 wデータは比較的時間がかかり、大部分が計算であるため、非同期マルチプロセス処理を採用し、速度は
  • を著しく向上させる.
    import numpy as np
    import pandas as pd
    from concurrent.futures import ProcessPoolExecutor
    
    #        
    def timestampTostr(x):
        return pd.to_datetime(x, unit='s', utc=False).tz_localize('Asia/Shanghai')
    	
    #     
    def worker(chunk):
        chunk['addtime'] = chunk['addtime'].apply(timestampTostr)
        return chunk
    
    #   csv  
    def writecsv(r):
        global total
        total += r.result().shape[0]
        print('Total:', total)
        r.result().to_csv(
            p.joinpath('deal_2016_2017_2018.tmp.csv'),
            index=False,
            header=False,
            mode='a'	#     
        )
    	
    if __name__ == '__main__':
    
    	#     
    	data = pd.read_csv(
    		p.joinpath('deal_2016_2017_2018.csv'),
    		header=0,
    		names=['id','name','addtime','dtype'],
    		iterator=True,
    		chunksize=5000,
    		# nrows=1000000,
    		low_memory=False,
    		
    		#            ,        
    		# converters={
    		# 	'addtime': timestampTostr,
    		#}
    	)
    	
        futures = []
        total = 0
        with ProcessPoolExecutor(max_workers=50) as executer:
            for chunk in deal:
                future = executer.submit(worker, chunk)
                future.add_done_callback(writecsv)
    
        print('end~~~~~~~~~~~~~~~~~~~~')
    
  • idソート
  • deal = pd.read_csv(
        p.joinpath('deal_2016_2017_2018.csv'),
        header=0,
        names=['id','name','addtime','dtype'],
        low_memory=False,
    )
    
    deal.sort_values('did', inplace=True)
    
    deal.to_csv(
    	p.joinpath('deal_2016_2017_2018_end.csv'),
    	index=False,
    	header=False,
    )
    

    これでデータの前処理が完了し、その後、データ分析とレポートの生成ができます.