コード農業技術の株売買の道--任務管理器

6473 ワード

システムタスクと通常のタスクは、タスクマネージャによってスケジュールされます.これらの違いは、システムタスクはプログラムの実行後に変更されず、通常のタスクは変更されます.(転載breaksoftwareからのcsdnブログを明記してください)
どうしてこんなデザインがあるの?サービスを停止することなく、関連する構成を更新できるシステムであることを望んでいます.たとえば、通常のタスクを追加します.通常のタスクプロファイルを変更すればいいです.たとえば、データベース内のテーブル構造を変更する必要があります.サービス変更コードを停止してデータフォーマットの一貫性を保証する必要はありません.
プロファイルが変更されたかどうかを知る必要があります.どうやって?1つの方法は、いくつかのシステム方法を借りて、対応するプロファイルの変更を傍受し、ファイルが変更されると、すぐにメインプログラムに処理を通知することです.もう1つは、ポーリングチェックメカニズム、すなわち、定期的に差異結果を生成することです.このシステムをもっと複雑にしないために、後者を選びました.それは私のいわゆるシステムタスクです.
システムタスクでも通常タスクでも、実装されるクラスはjob_に継承されます.base
from abc import ABCMeta,abstractmethod
class job_base:
    __metaclass__ = ABCMeta
    @abstractmethod
    def run(self):
        pass

この制限は主に各タスクがrunメソッドであることを保証するためである.スケジュール・フレームワークは、タスクの実行を完了するためにこのメソッドを実行します.
「当社のテクノロジーの株式売買の道-アーキテクチャと設計」では、APSchedulerに基づいてタスクスケジューリング機能を実現することを紹介しています.まず、BackgroundSchedulerオブジェクトを起動する必要があります.
from apscheduler.schedulers.background import BackgroundScheduler

@singleton
class job_center():
    def __init__(self):
        self._sched = None
        self._job_conf_path = ""
        self._job_id_handle = {}
        self._static_job_id_handle = {}
    
    def start(self):
        self._sched = BackgroundScheduler()
        self._sched.start()

タスクに追加する必要がある場合は、次の方法を呼び出します.
    def add_jobs(self, jobs_info, is_static = False):
        if None == self._sched:
            LOG_WARNING("job center must call start() first")
            return

        for (job_name,job_info) in jobs_info.items():
            if is_static and job_name in self._static_job_id_handle.keys():
                continue
            job_type = job_info["type"]
            class_name = job_info["class"]
            job_handle = self._get_obj(class_name)
            if is_static:
                self._static_job_id_handle[job_name] = job_handle
            else:
                self._job_id_handle[job_name] = job_handle
            cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"
            params = self._join_params(job_info)
            if 0 != len(params):
                cmd += " , "
                cmd += params
            cmd += ")"
            #print cmd
            eval(cmd)

        jobs_infoはタスクプロファイルのタスク情報を保存しています.サンプルを見てみましょう.
[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai

この構成ではtypeとclassを除いてAPSchedulerフレームワークのadd_jobメソッドのパラメータ.上海時間で、月曜日から金曜日まで、朝9時30分10秒に1回実行します.
        add_jobsでclassフィールドを使用して、そのclassに対応するオブジェクトを取得します.
    def _get_obj(self, _cls_name):  
        _packet_name = _cls_name  
        _module_home = __import__(_packet_name,globals(),locals(),[_cls_name])
        obj =  getattr(_module_home,_cls_name)  
        class_obj = obj()
        return class_obj

ここでは、私が以前特に強調した単例の使い方についても言及します.テストを経て、「私たちの技術が株を売る道--配置マネージャ、ログマネージャ」の単例の実現は、上記の方法が同じオブジェクトを取得することを保証することができ、ネット上の他の単例モデルはだめです.オブジェクトを取得したら、実行するコマンドを組み立てます.cmd="self._sched.add_job(job_handle.run,job_type,id=job_name"ではjob_handleが上で取得したオブジェクトであり、runは各jobに必要な方法である.これは、各タスククラスがjob_baseに継承される理由でもある.
を選択して、join_paramsプロファイル内の他の情報をパラメータに組み立てて完全なコマンドをつなぐ
    def _join_params(self, job_info):
        params = ""
        param = ""
        job_type = job_info["type"]
        for key in job_info.keys():
            if key in conf_keys.job_conf_info_dict[job_type]:
                if  0 != len(params):
                    params += ' , '
                value = job_info[key]
                if value.isdigit():
                    param = key + " = " + value
                else:
                    param = key + " = '" + value + "'"
                if 0 != len(param):
                    params += param
        return params

これにより、構成中のタスクがAPSchedulerスケジューリングキューに追加されます.
タスクを削除する方法を見てみましょう
    def remove_jobs(self, jobs_info):
        if None == self._sched:
            LOG_WARNING("job center must call start() first")
            return

        for job_name in jobs_info.keys():
            self._sched.remove_job(job_name)
            self._job_id_handle.pop(job_name)

7行目はタスク名でAPSchedulerでタスクを削除します.8行目は、タスクに対応するオブジェクトをリストから削除します.なぜ_を使うのかjob_id_handleはこれらのタスクオブジェクトを保存しますか?より大きなライフサイクルで保存しないと、ローカル変数として認識され、解放され、APSchedulerが呼び出されなくなるためです.一般的なタスクを管理するシステムタスクコードを見てみましょう
@singleton
class j_load_job_conf(job_base):
    def __init__(self):
        self._pre_jobs_info = {}
        self._frame_conf_inst = scheduler_frame_conf_inst()
        self._job_center = job_center()

    def run(self):
        section_name = "strategy_job"
        option_name = "conf_path"
        if False == self._frame_conf_inst.has_option(section_name, option_name):
            LOG_WARNING("no %s %s" % (section_name, option_name))
            return
        conf_path = self._frame_conf_inst.get(section_name, option_name)
        LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))
        
        job_conf_parser_obj = job_conf_parser()
        jobs_info = job_conf_parser_obj.parse(conf_path)
        self._execute_jobs(jobs_info)

    def _execute_jobs(self, jobs_info):
        add_dict = {}
        remove_dict = {}
        modify_dict = {}

        frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)

        add_jobs_info = dict(add_dict, **modify_dict)

        remove_jobs_info = {}
        for item in modify_dict.keys():
            remove_jobs_info[item] = self._pre_jobs_info[item]

        LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))
        LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))
        
        if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):
            return

        self._pre_jobs_info = jobs_info

        self._job_center.remove_jobs(remove_jobs_info)
        self._job_center.add_jobs(add_jobs_info)

runメソッドは定期的に実行されます.通常のタスクプロファイル情報は、固定ディレクトリから読み込まれます.そして_execute_jobsメソッドでは,前回読み出したタスク情報と比較して,削除が必要なタスク,新規のタスク,修正が必要なタスクの3つの辞書を生成する.修正が必要なタスクは、削除後に追加される方法になります.したがって、最後に操作されるのは2つのフィールド情報です.通常のタスクについては説明しませんが、後で紹介する各キャプチャおよびオフラインコンピューティングビジネスは通常のタスクです.