python multiprocessingで注意すべき問題
私たちはプログラムを書くときによくこのようにコードを書くのが好きです.
上のプログラムに問題がありますか?以上のプログラムは単一プロセスの場合は問題ないはずですが、マルチプロセスの場合はエラーがあります.
まず次のソースを見てみましょう
from.forking import Popen定義
コードからpythonのmultiprocessingはforkを使用してサブプロセスを作成し、サブプロセスでrun関数を実行することがわかります.
次の情報が得られます.
fork関数で作成されたサブプロセスは、同じファイル記述子を持つ親プロセスの完全なコピーです.これにより、親プロセスで接続が作成されると、親プロセスと複数のサブプロセスが共通の接続を持ち、予期せぬエラーが発生します.(各接続には独立したリード・バッファとライト・バッファがあり、複数のプロセスのリード・バッファとライト・バッファの操作によりデータが混乱します)
したがって、サブプロセスで接続を作成する必要があります.これにより、問題の発生を回避できます.
答えは
PS:celeryもrqもこのような問題がありますので、十分に重視してください
import MySQLdb
import time
from multiprocessing import Process
conn = MySQLdb.connect('localhost', 'vearne', 'xx', 'test')
def f(name):
for i in xrange(10):
cursor = conn.cursor()
sql = "insert into car(name) values(%s)"
param = [(name)]
print param
#time.sleep(1)
n = cursor.execute(sql,param)
cursor.close()
conn.commit()
if __name__ == '__main__':
for i in xrange(10):
p = Process(target=f, args=('bob',))
p.start()
上のプログラムに問題がありますか?以上のプログラムは単一プロセスの場合は問題ないはずですが、マルチプロセスの場合はエラーがあります.
まず次のソースを見てみましょう
class Process(object):
''' Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` '''
_Popen = None
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
assert group is None, 'group argument must be None for now'
count = _current_process._counter.next()
self._identity = _current_process._identity + (count,)
self._authkey = _current_process._authkey
self._daemonic = _current_process._daemonic
self._tempdir = _current_process._tempdir
self._parent_pid = os.getpid()
self._popen = None
self._target = target
self._args = tuple(args)
self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
def run(self):
''' Method to be run in sub-process; can be overridden in sub-class '''
if self._target:
self._target(*self._args, **self._kwargs)
def start(self):
''' Start child process '''
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
assert not _current_process._daemonic, \
'daemonic processes are not allowed to have children'
_cleanup()
if self._Popen is not None:
Popen = self._Popen
else:
from .forking import Popen
self._popen = Popen(self) # -- Popen --
_current_process._children.add(self)
# ... ...
def _bootstrap(self): # -- _bootstrap --
from . import util
global _current_process
try:
self._children = set()
self._counter = itertools.count(1)
try:
sys.stdin.close()
sys.stdin = open(os.devnull)
except (OSError, ValueError):
pass
_current_process = self
util._finalizer_registry.clear()
util._run_after_forkers()
util.info('child process calling self.run()')
try:
self.run() # -- run --
exitcode = 0
finally:
util._exit_function()
except SystemExit, e:
if not e.args:
exitcode = 1
elif isinstance(e.args[0], int):
exitcode = e.args[0]
else:
sys.stderr.write(str(e.args[0]) + '
')
sys.stderr.flush()
exitcode = 0 if isinstance(e.args[0], str) else 1
except:
exitcode = 1
import traceback
sys.stderr.write('Process %s:
' % self.name)
sys.stderr.flush()
traceback.print_exc()
util.info('process exiting with exitcode %d' % exitcode)
return exitcode
from.forking import Popen定義
class Popen(object):
def __init__(self, process_obj):
sys.stdout.flush()
sys.stderr.flush()
self.returncode = None
self.pid = os.fork() # -- fork --
# fork , , , pid 0
# , pid 0
if self.pid == 0: # pid 0
if 'random' in sys.modules:
import random
random.seed()
code = process_obj._bootstrap() # -- _bootstrap --
sys.stdout.flush()
sys.stderr.flush()
os._exit(code)
コードからpythonのmultiprocessingはforkを使用してサブプロセスを作成し、サブプロセスでrun関数を実行することがわかります.
man fork
次の情報が得られます.
Fork() causes creation of a new process. The new process (child process) is an exact copy of the calling process (parent process) except for the following:
o The child process has a unique process ID.
o The child process has a different parent process ID (i.e., the process ID of the parent process).
o The child process has its own copy of the parent's descriptors. These descriptors reference the same underlying objects, so that, for instance, file pointers in file objects are shared between the child and the parent, so that an lseek(2) on a descriptor in the child process can affect a subsequent read or write by the parent. This descriptor copying is also used by the shell to establish standard input and output for newly created processes as well as to set up pipes.
o The child processes resource utilizations are set to 0; see setrlimit(2).
fork関数で作成されたサブプロセスは、同じファイル記述子を持つ親プロセスの完全なコピーです.これにより、親プロセスで接続が作成されると、親プロセスと複数のサブプロセスが共通の接続を持ち、予期せぬエラーが発生します.(各接続には独立したリード・バッファとライト・バッファがあり、複数のプロセスのリード・バッファとライト・バッファの操作によりデータが混乱します)
したがって、サブプロセスで接続を作成する必要があります.これにより、問題の発生を回避できます.
import MySQLdb
import time
from multiprocessing import Process
class SLWorker(Process):
def __init__(self):
super(SLWorker, self).__init__()
self.conn = None
def run(self):
# *** *** ,
if self.conn == None:
self.conn = MySQLdb.connect('localhost', 'vearne', 'xxx', 'test')
for i in xrange(10):
cursor = self.conn.cursor()
sql = "insert into car(name) values(%s)"
name = "bob"
param = [(name)]
print param
#time.sleep(30)
n = cursor.execute(sql,param)
cursor.close()
self.conn.commit()
def __del__(self):
if self.conn != None:
self.conn.close()
if __name__ == '__main__':
ll = []
for i in xrange(10):
p = SLWorker()
p.start()
ll.append(p)
for p in ll:
p.join()
答えは
、または
でこの問題を解決するだけで、実際には現在多くの接続プールが遅延して接続を作成しており、よく見ていないので、研究の共有があります.PS:celeryもrqもこのような問題がありますので、十分に重視してください