pythonマルチプロセス共有変数
2316 ワード
複数のデータファイルを読み込み、そこからデータを抽出して結果をマージ(word countに相当)
方法1つは、従来の分割マルチスレッドがデータ出力をそれぞれ処理し、最後に辞書遍歴出力ファイルを定義して結果セットをマージすることです.
方法2は,まず結果セット辞書を定義し,その後マルチスレッドでデータを処理し,辞書に入れるときに同期ロックを加える.
方法2は方法よりずっと速いと思っていたが、大量のIOを省いたので、合併結果の時だけロックをかけた.
しかし、テストの結果、同期ロックはかえって最大のボトルネックであることが分かった.
20 Wのデータをテストして、あるフィールドを抽出して16スレッドを走って、方法は1は10 sを使って、方法は2は55 sを使いました
この差は大きすぎて・・・
だから正直にマルチスレッド同期共有変数を放棄して性能が悪い
pythonマルチスレッドロック同期共有変数のコードは次のとおりです.
方法1つは、従来の分割マルチスレッドがデータ出力をそれぞれ処理し、最後に辞書遍歴出力ファイルを定義して結果セットをマージすることです.
方法2は,まず結果セット辞書を定義し,その後マルチスレッドでデータを処理し,辞書に入れるときに同期ロックを加える.
方法2は方法よりずっと速いと思っていたが、大量のIOを省いたので、合併結果の時だけロックをかけた.
しかし、テストの結果、同期ロックはかえって最大のボトルネックであることが分かった.
20 Wのデータをテストして、あるフィールドを抽出して16スレッドを走って、方法は1は10 sを使って、方法は2は55 sを使いました
この差は大きすぎて・・・
だから正直にマルチスレッド同期共有変数を放棄して性能が悪い
pythonマルチスレッドロック同期共有変数のコードは次のとおりです.
#!/usr/bin/env python
#-*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import MySQLdb
import re
import multiprocessing
import os
import traceback
import jieba
import ConfigParser
import datetime
import json
import logging
def myprocess(num,indir,titles,lock):
try:
fin = open(indir + os.sep + str(num),'r')
cnt0 = 0
for line in fin:
cnt0 += 1
if cnt0 %100 == 0:
print num,cnt0
if cnt0 == 10000:
break
data = eval(line.strip())
us_id = int(data[21])
title = data[8]
if title.strip() == '':
continue
with lock:
if title in titles:
titles[title].append(us_id)
else:
titles[title]=[us_id]
print num,'END'
except Exception,e:
traceback.print_exc()
def func():
indir = '/root/data/bids/data_all'
outdir = './titles1119'
#if not os.path.exists(outdir):
# os.mkdir(outdir)
start,end,add = 0,20,1
manager = multiprocessing.Manager()
lock = manager.Lock()
titles = manager.dict()
#
pool = multiprocessing.Pool(processes = 16)
for num in range(start,end,add):
pool.apply_async(myprocess,[num,indir,titles,lock])
pool.close()
pool.join()
titles = dict(titles)
with open(outdir,'w') as fout:
for title in titles:
print >> fout,'%s\t%s'%(title,titles[title])
if len(titles[title])>1:
print titles[title]
if __name__ == '__main__':
program = os.path.basename(sys.argv[0])
logger = logging.getLogger(program)
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(message)s',filename = program+'.log',filemode='a')
logging.root.setLevel(level=logging.INFO)
logger.info("start running %s" % ' '.join(sys.argv))
func()
logger.info("end\t running %s" % ' '.join(sys.argv))