記sql大ファイル切断転送及びpandasデータ前処理
データ準備:sqlファイルが大きすぎて、インポートが遅すぎて、本文はmysqlデータベースのインポートとエクスポートを迂回して、pythonを通じてsqlを直接分割して、そしてデータテーブル構造とデータテーブルデータファイルをそれぞれ相応のsqlファイルとcsvファイルに保存して、pandasデータ分析用に供します.
データ前処理:pandasは列全体のデータを処理するのが遅すぎて、マルチスレッドで処理して、一時ファイルに書き込んで、ソートを読み取った後に最終的な分析データを生成します
一、データ準備:sqlファイル切断転送csvファイル
mysqlファイルが大きすぎて、インポートが遅すぎて、本文はmysqlデータベースのインポートエクスポートを迂回して、直接sqlファイルを対応するテーブルのcsvファイルに転送します.
pythonはsqlを分割し、pandasデータ分析用にデータテーブル構造とデータテーブルデータファイルをそれぞれ対応するsqlファイルとcsvファイルに格納します.
本論文の実験データ,mysqldumpから導出したsqlファイルサイズ5 G, headは、前のN行のデータを抽出し、抽出ルールを記述する.
Windowsはgitをインストールしているのでgit bash端末でheadコマンドを使用できます
この操作はmysqldumpからエクスポートされたデータファイルで、内容はテーブル構造とデータです.
いくつかのID:符号化(実装機能、未整備コード)
二、データの前処理年分のデータのみを読み込む addtimeはタイムスタンプフィールドであり、datetime時間に変換する必要があり、1000 wデータは比較的時間がかかり、大部分が計算であるため、非同期マルチプロセス処理を採用し、速度は を著しく向上させる. idソート
これでデータの前処理が完了し、その後、データ分析とレポートの生成ができます.
データ前処理:pandasは列全体のデータを処理するのが遅すぎて、マルチスレッドで処理して、一時ファイルに書き込んで、ソートを読み取った後に最終的な分析データを生成します
一、データ準備:sqlファイル切断転送csvファイル
mysqlファイルが大きすぎて、インポートが遅すぎて、本文はmysqlデータベースのインポートエクスポートを迂回して、直接sqlファイルを対応するテーブルのcsvファイルに転送します.
pythonはsqlを分割し、pandasデータ分析用にデータテーブル構造とデータテーブルデータファイルをそれぞれ対応するsqlファイルとcsvファイルに格納します.
本論文の実験データ,mysqldumpから導出したsqlファイルサイズ5 G,
head -100 sql
> tmp.sql Windowsはgitをインストールしているのでgit bash端末でheadコマンドを使用できます
この操作はmysqldumpからエクスポートされたデータファイルで、内容はテーブル構造とデータです.
いくつかのID:
Table structure for table `test1`
Dumping data for table `test1`
INSERT INTO `test1` VALUES
sqlfile = '111.sql'
# sql
base_path = Path('D:/pd-data/test/')
find_tpl_structure = '-- Table structure for table '
find_tpl_structure_len = len(find_tpl_structure)
find_tpl_data = '-- Dumping data for table '
find_tpl_data_len = len('-- Dumping data for table ')
curflag = None # start1 start2
curtable = '' # table name
curtype = '' # structure or data
curcontent = [] #
time_s = time.time()
# structure sql, data csv
def save_file(table, ftype, data):
types = {'structure': '.sql', 'data': '.csv'}
if table != '' and type != '' and table != []:
nfile = base_path.joinpath('csv/{}_{}{}'.format(curtable, curtype, types[ftype]))
if ftype == 'data':
with open(nfile, 'a', newline='') as fs:
ff = csv.writer(fs)
ff.writerows(data)
else:
with open(nfile, 'w+', encoding='utf-8') as fs:
fs.writelines(data)
print('{} ok'.format(nfile))
with open(base_path.joinpath(sqlfile), 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
#
if curflag == 'start2' or curflag == 'start1':
if curflag == 'start2':
if line.startswith('INSERT INTO '):
x = eval('[{}]'.format(line.strip()[22 + len(curtable):-1]))
total += len(x)
# curcontent.extend(x)
save_file(curtable, curtype, x)
print(total)
else:
curcontent.append(line)
# : ,
if line.startswith(find_tpl_structure):
#
# save_file(curtable, curtype, curcontent)
curcontent = []
#
curflag = 'start1'
curtype = 'structure'
curtable = line.strip()[find_tpl_structure_len + 1:-1]
# : ,
if line.startswith(find_tpl_data):
#
save_file(curtable, curtype, curcontent)
curcontent = []
#
curflag = 'start2'
curtype = 'data'
total = 0
curtable = line.strip()[find_tpl_data_len + 1:-1]
time_e = time.time()
print('runtime - {}s'.format(time_e - time_s))
二、データの前処理
data = pd.read_csv(
p.joinpath('deal_all.csv'),
header=0,
names=['id','name','addtime','dtype'],
)
df = data[data['addtime'].dt.year.isin([2016, 2017, 2018]) & ~data['dtype'].isin([0,2,3])]
df.to_csv(
p.joinpath('deal_2016_2017_2018.csv'),
index=False,
header=False,
)
import numpy as np
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
#
def timestampTostr(x):
return pd.to_datetime(x, unit='s', utc=False).tz_localize('Asia/Shanghai')
#
def worker(chunk):
chunk['addtime'] = chunk['addtime'].apply(timestampTostr)
return chunk
# csv
def writecsv(r):
global total
total += r.result().shape[0]
print('Total:', total)
r.result().to_csv(
p.joinpath('deal_2016_2017_2018.tmp.csv'),
index=False,
header=False,
mode='a' #
)
if __name__ == '__main__':
#
data = pd.read_csv(
p.joinpath('deal_2016_2017_2018.csv'),
header=0,
names=['id','name','addtime','dtype'],
iterator=True,
chunksize=5000,
# nrows=1000000,
low_memory=False,
# ,
# converters={
# 'addtime': timestampTostr,
#}
)
futures = []
total = 0
with ProcessPoolExecutor(max_workers=50) as executer:
for chunk in deal:
future = executer.submit(worker, chunk)
future.add_done_callback(writecsv)
print('end~~~~~~~~~~~~~~~~~~~~')
deal = pd.read_csv(
p.joinpath('deal_2016_2017_2018.csv'),
header=0,
names=['id','name','addtime','dtype'],
low_memory=False,
)
deal.sort_values('did', inplace=True)
deal.to_csv(
p.joinpath('deal_2016_2017_2018_end.csv'),
index=False,
header=False,
)
これでデータの前処理が完了し、その後、データ分析とレポートの生成ができます.