python multiprocessingで注意すべき問題

14771 ワード

私たちはプログラムを書くときによくこのようにコードを書くのが好きです.
import MySQLdb
import time
from multiprocessing import Process

conn = MySQLdb.connect('localhost', 'vearne', 'xx', 'test')

def f(name):
    for i in xrange(10):
        cursor = conn.cursor()
        sql = "insert into car(name) values(%s)"
        param = [(name)]
        print param
        #time.sleep(1)
        n = cursor.execute(sql,param)
        cursor.close()
        conn.commit()

if __name__ == '__main__':
    for i in xrange(10):
        p = Process(target=f, args=('bob',))
        p.start()

上のプログラムに問題がありますか?以上のプログラムは単一プロセスの場合は問題ないはずですが、マルチプロセスの場合はエラーがあります.
まず次のソースを見てみましょう
class Process(object):
    ''' Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` '''
    _Popen = None

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        assert group is None, 'group argument must be None for now'
        count = _current_process._counter.next()
        self._identity = _current_process._identity + (count,)
        self._authkey = _current_process._authkey
        self._daemonic = _current_process._daemonic
        self._tempdir = _current_process._tempdir
        self._parent_pid = os.getpid()
        self._popen = None
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = name or type(self).__name__ + '-' + \
                     ':'.join(str(i) for i in self._identity)

    def run(self):
        ''' Method to be run in sub-process; can be overridden in sub-class '''
        if self._target:
            self._target(*self._args, **self._kwargs)

    def start(self):
        ''' Start child process '''
        assert self._popen is None, 'cannot start a process twice'
        assert self._parent_pid == os.getpid(), \
               'can only start a process object created by current process'
        assert not _current_process._daemonic, \
               'daemonic processes are not allowed to have children'
        _cleanup()
        if self._Popen is not None:
            Popen = self._Popen
        else:
            from .forking import Popen
        self._popen = Popen(self)   # --    Popen    --
        _current_process._children.add(self)
        #        ... ...
    def _bootstrap(self):    # -- _bootstrap    --
        from . import util
        global _current_process

        try:
            self._children = set()
            self._counter = itertools.count(1)
            try:
                sys.stdin.close()
                sys.stdin = open(os.devnull)
            except (OSError, ValueError):
                pass
            _current_process = self
            util._finalizer_registry.clear()
            util._run_after_forkers()
            util.info('child process calling self.run()')
            try:
                self.run()  # --   run   --
                exitcode = 0
            finally:
                util._exit_function()
        except SystemExit, e:
            if not e.args:
                exitcode = 1
            elif isinstance(e.args[0], int):
                exitcode = e.args[0]
            else:
                sys.stderr.write(str(e.args[0]) + '
'
) sys.stderr.flush() exitcode = 0 if isinstance(e.args[0], str) else 1 except: exitcode = 1 import traceback sys.stderr.write('Process %s:
'
% self.name) sys.stderr.flush() traceback.print_exc() util.info('process exiting with exitcode %d' % exitcode) return exitcode

from.forking import Popen定義
    class Popen(object):

        def __init__(self, process_obj):
            sys.stdout.flush()
            sys.stderr.flush()
            self.returncode = None

            self.pid = os.fork()     # -- fork    --
            # fork       ,      ,         ,   pid    0 
            #          ,   pid     0
            if self.pid == 0:        # pid     0                   
                if 'random' in sys.modules:
                    import random
                    random.seed()
                code = process_obj._bootstrap() # --   _bootstrap   --
                sys.stdout.flush()
                sys.stderr.flush()
                os._exit(code)

コードからpythonのmultiprocessingはforkを使用してサブプロセスを作成し、サブプロセスでrun関数を実行することがわかります.
man fork 

次の情報が得られます.
Fork() causes creation of a new process.  The new process (child process) is an exact copy of the calling process (parent process) except for the following:
           o   The child process has a unique process ID.
           o   The child process has a different parent process ID (i.e., the process ID of the parent process).
           o   The child process has its own copy of the parent's descriptors.  These descriptors reference the same underlying objects, so that, for instance, file pointers in file objects are shared between the child and the parent, so that an lseek(2) on a descriptor in the child process can affect a subsequent read or write by the parent.  This descriptor copying is also used by the shell to establish standard input and output for newly created processes as well as to set up pipes.
           o   The child processes resource utilizations are set to 0; see setrlimit(2).

fork関数で作成されたサブプロセスは、同じファイル記述子を持つ親プロセスの完全なコピーです.これにより、親プロセスで接続が作成されると、親プロセスと複数のサブプロセスが共通の接続を持ち、予期せぬエラーが発生します.(各接続には独立したリード・バッファとライト・バッファがあり、複数のプロセスのリード・バッファとライト・バッファの操作によりデータが混乱します)
したがって、サブプロセスで接続を作成する必要があります.これにより、問題の発生を回避できます.
import MySQLdb
import time
from multiprocessing import Process

class SLWorker(Process):
    def __init__(self):
        super(SLWorker, self).__init__()
        self.conn = None

    def run(self):
        # ***      ***       ,               
        if self.conn ==  None:  
            self.conn = MySQLdb.connect('localhost', 'vearne', 'xxx', 'test')
        for i in xrange(10):
            cursor = self.conn.cursor()
            sql = "insert into car(name) values(%s)"
            name = "bob"
            param = [(name)]
            print param
            #time.sleep(30)
            n = cursor.execute(sql,param)
            cursor.close()
            self.conn.commit()
    def __del__(self):
        if self.conn != None:
            self.conn.close()

if __name__ == '__main__':
    ll = []
    for i in xrange(10):
        p = SLWorker()
        p.start()
        ll.append(p)
    for p in ll:
        p.join()

答えは 、または でこの問題を解決するだけで、実際には現在多くの接続プールが遅延して接続を作成しており、よく見ていないので、研究の共有があります.
PS:celeryもrqもこのような問題がありますので、十分に重視してください