python multiprocessingモジュールマルチプロセスタスクでのマルチプログラムサブタスクの実行を実現

3618 ワード

python multiprocessingモジュールは、マルチプロセスタスクでマルチプログラムサブタスクを実行し、同時制御を実現します.原因はcelery+ansibleを使って任务の実行と回収をしたいので、コードが书いた后に、寝槽celeryが彼の実行する任务が更にサブプロセスを创立することを许さないことを発见して、これは比较的にばつが悪くて、カプセル化したansibleインタフェースは使えませんか??では、どのようにしてマルチタスクを自動的に実行しますか?
研究celeryは彼のデフォルトの実行するマルチプロセスのメカニズムがmultiprocessingモジュールのPoolであることを発見して、コードを通じてこのモジュールをテストして、彼も自分の任務が更にサブプロセスを創立することを許さないことを発見して、そこで大体どんなことを理解します.後でcelery workerのデフォルトの同時メカニズムをどのように修正するかを検討することができる.
しかし、今回は彼を迂回しようとmultiprocessingのProcessモジュールをテストしたところ、このモジュールはタスクでサブプロセスを確立することを許可していることが分かった.それから自分でタスクアクチュエータを書き始めて、まずmysqlを持って簡単にタスクキューを作って、測定してからkafkaあるいはその他を交換します.テストが完成したら、任務は並行して実行することができますが、リズムをコントロールしなければなりません.何万も同時に自分で切るのではないでしょうか.半日ドキュメントをめくってmultiprocessingのSemaphoreモジュールができることを発見し、テスト後、コードは以下の通りです.
#!/bin/python

from multiprocessing import Pool
from multiprocessing import Process,Semaphore,current_process
import sys, os, time, random, json

project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath('__file__'))))
sys.path.append(project_dir)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "wk_api.settings")

import django

django.setup()

from wkexe.models import WorkTables
from wkexe.models import ExecuteTables
from utils.ansible_api import ANSRunner


def RunPlaybook(ip, ymldir):
    s.acquire()
    print(time.strftime('%H:%M:%S'), current_process().name + "      ");
    time.sleep(random.random() * 5)
    rbt = ANSRunner(ip)
    rbt.run_playbook(playbook_path='%s' % (ymldir))
    result = json.dumps(rbt.get_playbook_result(), indent=4)
    print(result)
    s.release()
    print(time.strftime('%H:%M:%S'), current_process().name + "      ");


def RunModel(ip, model, module_args):
    s.acquire()
    print(time.strftime('%H:%M:%S'), current_process().name + "      ");
    rbt = ANSRunner(ip)
    rbt.run_model(model, module_args)
    result = json.dumps(rbt.get_model_result(), indent=4)
    print(result)
    s.release()
    print(time.strftime('%H:%M:%S'), current_process().name + "      ");



if __name__ == '__main__':
    while True:
        p_list = []
        works = WorkTables.objects.all().filter(status=10)
        for work in works:
            concurrent = work.concurrent  #     
            executeInfo = work.executetables_set.all()  #        
            taskType = work.taskname.tasktype  #     
            p = Pool(1)
            for ip in executeInfo:
                ipAdd = ip.ip
                ipId = ip.id
                if taskType == 0:
                    ymlDir = work.taskname.taskymldir
                    print(ymlDir)
                    p = Process(target=RunPlaybook, args=(ipAdd, ymlDir))
                    # p.start()
                    # p.join()
                    # p.apply_async(RunPlaybook, args=(ipAdd, ymlDir))
                    p_list.append(p)
                elif taskType == 1:
                    model = work.taskname.taskmodel
                    modelArgs = work.taskname.taskargs
                    p = Process(target=RunModel, args=(ipAdd, model, modelArgs))
                    # p.start()
                    # p.join()
                    # p.apply_async(RunModel, args=(ipAdd, model, modelArgs))
                    p_list.append(p)
                else:
                    print("        ")

        s = Semaphore(concurrent) #              ,              
        for p in p_list:
            p.start()

        #
        for p in p_list:
            # p.close()
            p.join()
        print("    ")