pythonの下でPoolとtargetの方法は同じクラスの中で注意する穴を書きます

9913 ワード

仕事の中で开発したインタフェースに対して圧力テストをすることに出会って、以前はどんな圧力测定ツールが使いやすいことを明らかにしたことがなくて、ちょうどインタフェースは复雑なインタフェースではありませんて、curl-X post“インタフェースアドレス”--data-binary@バイナリ认证ファイルOK!(@は検証データがファイルタイプであることを示す)
それならスクリプトを書きます.スクリプトの内容は以下の通りです.
======================================================================#!/usr/bin/evn python#_coding:utf8_from multiprocessing import Pool,Queueimport time,subprocess,osclass YaCe(object):def init(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):self.api = apiself.binfile = binfileself.status = statusself.maxpool = maxpoolself.maxrequest = maxrequestself.qu = qudef prorequest(self):for i in range(self.maxrequest):self.qu.put(i)print(i)for i in range(int(self.maxpool)):self.qu.put(None)print("None")
def conumers(self,i):
    while True:
        data = self.qu.get(True)
        if data == None:
            print("  %s    ..."%i)
            break
        else:
            command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            if self.status == "success":
                logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
                if "CgoyMDAwMDAwMDAw" in command:
                    print("  %s__%s..."%(str(i),str(data)))
                    with open(logfile,"a") as f:
                        f.write(command+"
") f.close() else: print(" %s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write("Faild
") f.write(command+"
") f.close() else: logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time #print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) if "CAES+" in command: print(" %s__%s..."%(str(i),str(data))) info = command.split('
')[-3:] info1 = "
".join(info) with open(logfile,"a") as f: f.write(info1+"
") f.close() else: print(" %s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write("Faild
") f.write(command+"
") f.close() def multirun(self): ps = int(int(self.maxpool) - 1) p = Pool(ps) for i in range(self.maxpool): print(" %s"%i) p.apply_async(self.conumers,args=(self,i)) print(' 。。。') p.close() p.join() endtime = time.strftime("%Y%m%d_%X",time.localtime()) if self.status == "success": logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time else: logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time with open(logfile,"a") as f: f.write("============[%s]============
"%endtime) f.close() print('End!!,PID:%s'% os.getpid())

if name="main":q=Queue()Yc=YaCe('圧着インタフェース','バイナリ認証ファイル',何個のプロセスを開くか,queue(キュー),maxrequest=100(シミュレーションテストに何回アクセスするか),status="faild"(ここではテストの2つのインタフェースのため,テストのインタフェースを異なるパラメータで区別する戻り値処理を返す))Yc.prorequest()print("++++++")global date_timedatetime = time.strftime("%Y%m%d%X",time.localtime())Yc.multirun()
====================================================================================================================================
0123456789 NoneNone+++++++オープンサブプロセス0オープンサブプロセス1追加されたすべてのプロセスの実行が完了するのを待つ.End!!,PID:4819
原因サブプロセスconumersメソッドは完全に実行されず、エラーも報告されていないので気まずいです.大量の文書資料を調べた.このpoolメソッドはqueueを使用していることが分かった.Queueはtaskを作業プロセスに渡します.Multiprocessingは、プロセス間で転送するためにデータをシーケンス化する必要があります.メソッドはモジュールの最上位にのみシーケンス化され、クラスにバインドされたメソッドはシーケンス化されず、上記の異常が発生します.あの腫れはどうするのか、私は簡単にあきらめた人ではありません.やっと私に方法を見つけられました.
注意解決者の著者はpython 3でテストし、python 2でスクリプトのsubprocessをvalue、command=commandsに変換する.getstatusoutput
解決方法1(親測定)1.まず、エラーを報告するには、YaCeクラスのmultirunメソッドでfor i in range(self.maxpool):print(「サブプロセス%sを開く」%i)p.apply_を変更するスクリプトを修正する必要があります.async(self.conumers,args=(self,i))はfor i in range(self.maxpool):print(「サブプロセス%sを開く」%i)res=p.apply_async(self.conumers,args=(self,i))print(res.get)これにより、エラー:cPickleが表示されます.PicklingError: Can't pickle : attribute lookup builtin.instancemethod failed
2.解決方法は以下のようにスクリプトに新しい関数(1)を加える.def conumers_wrapper(cls_instance,i):return cls_instance.conumers(i)
(2).YaCeでのmultirunメソッドの変更for i in range(self.maxpool):print(「サブプロセス%sを開く」%i)res=p.apply_async(self.conumers,args=(self,i))print(res.get()for i in range(self.maxpool):print(「サブプロセス%sを開く」%i)res=p.apply_async(conumers_wrapper,args=(self,i))print(res.get)
問題が解決しました.スクリプトを実行した結果、エラーが発生しました.RuntimeError:Queue objects should only be shared between processes through inheritance
理由ここでQueueは使えないので、Managerに変更します.Queue;プロセス前の同共離用Queueは問題を使うからです.
終了最終コードは次のとおりです.
==================================================================================#!/usr/bin/evn python#_coding:utf8_from multiprocessing import Pool,Queue,Managerimport time,subprocess,osclass YaCe(object):def init(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):self.api = apiself.binfile = binfileself.status = statusself.maxpool = maxpoolself.maxrequest = maxrequestself.qu = qudef prorequest(self):for i in range(self.maxrequest):self.qu.put(i)print(i)for i in range(int(self.maxpool)):self.qu.put(None)print("None")
def conumers(self,i):
    while True:
        data = self.qu.get(True)
        if data == None:
            print("  %s    ..."%i)
            break
        else:
            #print("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            #command = subprocess.getoutput("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            if self.status == "success":
                logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
                if "CgoyMDAwMDAwMDAw" in command:
                    print("  %s__%s..."%(str(i),str(data)))
                    with open(logfile,"a") as f:
                        f.write(command+"
") f.close() else: with open(logfile,"a") as f: f.write("Faild
") f.write(command+"
") f.close() else: logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time #print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) #command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) if "CAES+" in command: print(" %s__%s..."%(str(i),str(data))) info = command.split('
')[-3:] info1 = "
".join(info) with open(logfile,"a") as f: f.write(info1+"
") f.close() else: print(" %s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write("Faild
") f.write(command+"
") f.close() def multirun(self): ps = int(int(self.maxpool) - 1) p = Pool(ps) for i in range(self.maxpool): print(" %s"%i) p.apply_async(conumers_wrapper,args=(self,i)) #print(res.get) print(' 。。。') p.close() p.join() endtime = time.strftime("%Y%m%d_%X",time.localtime()) if self.status == "success": logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time else: logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time with open(logfile,"a") as f: f.write("============[%s]============
"%endtime) f.close() print('End!!,PID:%s'% os.getpid())

def conumers_wrapper(cls_instance,i):return cls_instance.conumers(i)
if name == "main":q = Manager().Queue()Yc=YaCe('圧測インタフェース','バイナリ認証ファイル',何個のプロセスを開くか,queue(キュー),maxrequest=100(シミュレーションテストで何回アクセスするか),status="faild"(ここではテストの2つのインタフェースのため,異なるstatusパラメータでテストのインタフェースを区別する戻り値処理を返す).Yc.prorequest()print("++++++")global date_timedatetime = time.strftime("%Y%m%d%X",time.localtime())Yc.multirun()
転載先:https://blog.51cto.com/wx22765652/2384679