コード農業技術の株売買の道--任務管理器
6473 ワード
システムタスクと通常のタスクは、タスクマネージャによってスケジュールされます.これらの違いは、システムタスクはプログラムの実行後に変更されず、通常のタスクは変更されます.(転載breaksoftwareからのcsdnブログを明記してください)
どうしてこんなデザインがあるの?サービスを停止することなく、関連する構成を更新できるシステムであることを望んでいます.たとえば、通常のタスクを追加します.通常のタスクプロファイルを変更すればいいです.たとえば、データベース内のテーブル構造を変更する必要があります.サービス変更コードを停止してデータフォーマットの一貫性を保証する必要はありません.
プロファイルが変更されたかどうかを知る必要があります.どうやって?1つの方法は、いくつかのシステム方法を借りて、対応するプロファイルの変更を傍受し、ファイルが変更されると、すぐにメインプログラムに処理を通知することです.もう1つは、ポーリングチェックメカニズム、すなわち、定期的に差異結果を生成することです.このシステムをもっと複雑にしないために、後者を選びました.それは私のいわゆるシステムタスクです.
システムタスクでも通常タスクでも、実装されるクラスはjob_に継承されます.base
この制限は主に各タスクがrunメソッドであることを保証するためである.スケジュール・フレームワークは、タスクの実行を完了するためにこのメソッドを実行します.
「当社のテクノロジーの株式売買の道-アーキテクチャと設計」では、APSchedulerに基づいてタスクスケジューリング機能を実現することを紹介しています.まず、BackgroundSchedulerオブジェクトを起動する必要があります.
タスクに追加する必要がある場合は、次の方法を呼び出します.
jobs_infoはタスクプロファイルのタスク情報を保存しています.サンプルを見てみましょう.
この構成ではtypeとclassを除いてAPSchedulerフレームワークのadd_jobメソッドのパラメータ.上海時間で、月曜日から金曜日まで、朝9時30分10秒に1回実行します.
add_jobsでclassフィールドを使用して、そのclassに対応するオブジェクトを取得します.
ここでは、私が以前特に強調した単例の使い方についても言及します.テストを経て、「私たちの技術が株を売る道--配置マネージャ、ログマネージャ」の単例の実現は、上記の方法が同じオブジェクトを取得することを保証することができ、ネット上の他の単例モデルはだめです.オブジェクトを取得したら、実行するコマンドを組み立てます.cmd="self._sched.add_job(job_handle.run,job_type,id=job_name"ではjob_handleが上で取得したオブジェクトであり、runは各jobに必要な方法である.これは、各タスククラスがjob_baseに継承される理由でもある.
を選択して、join_paramsプロファイル内の他の情報をパラメータに組み立てて完全なコマンドをつなぐ
これにより、構成中のタスクがAPSchedulerスケジューリングキューに追加されます.
タスクを削除する方法を見てみましょう
7行目はタスク名でAPSchedulerでタスクを削除します.8行目は、タスクに対応するオブジェクトをリストから削除します.なぜ_を使うのかjob_id_handleはこれらのタスクオブジェクトを保存しますか?より大きなライフサイクルで保存しないと、ローカル変数として認識され、解放され、APSchedulerが呼び出されなくなるためです.一般的なタスクを管理するシステムタスクコードを見てみましょう
runメソッドは定期的に実行されます.通常のタスクプロファイル情報は、固定ディレクトリから読み込まれます.そして_execute_jobsメソッドでは,前回読み出したタスク情報と比較して,削除が必要なタスク,新規のタスク,修正が必要なタスクの3つの辞書を生成する.修正が必要なタスクは、削除後に追加される方法になります.したがって、最後に操作されるのは2つのフィールド情報です.通常のタスクについては説明しませんが、後で紹介する各キャプチャおよびオフラインコンピューティングビジネスは通常のタスクです.
どうしてこんなデザインがあるの?サービスを停止することなく、関連する構成を更新できるシステムであることを望んでいます.たとえば、通常のタスクを追加します.通常のタスクプロファイルを変更すればいいです.たとえば、データベース内のテーブル構造を変更する必要があります.サービス変更コードを停止してデータフォーマットの一貫性を保証する必要はありません.
プロファイルが変更されたかどうかを知る必要があります.どうやって?1つの方法は、いくつかのシステム方法を借りて、対応するプロファイルの変更を傍受し、ファイルが変更されると、すぐにメインプログラムに処理を通知することです.もう1つは、ポーリングチェックメカニズム、すなわち、定期的に差異結果を生成することです.このシステムをもっと複雑にしないために、後者を選びました.それは私のいわゆるシステムタスクです.
システムタスクでも通常タスクでも、実装されるクラスはjob_に継承されます.base
from abc import ABCMeta,abstractmethod
class job_base:
__metaclass__ = ABCMeta
@abstractmethod
def run(self):
pass
この制限は主に各タスクがrunメソッドであることを保証するためである.スケジュール・フレームワークは、タスクの実行を完了するためにこのメソッドを実行します.
「当社のテクノロジーの株式売買の道-アーキテクチャと設計」では、APSchedulerに基づいてタスクスケジューリング機能を実現することを紹介しています.まず、BackgroundSchedulerオブジェクトを起動する必要があります.
from apscheduler.schedulers.background import BackgroundScheduler
@singleton
class job_center():
def __init__(self):
self._sched = None
self._job_conf_path = ""
self._job_id_handle = {}
self._static_job_id_handle = {}
def start(self):
self._sched = BackgroundScheduler()
self._sched.start()
タスクに追加する必要がある場合は、次の方法を呼び出します.
def add_jobs(self, jobs_info, is_static = False):
if None == self._sched:
LOG_WARNING("job center must call start() first")
return
for (job_name,job_info) in jobs_info.items():
if is_static and job_name in self._static_job_id_handle.keys():
continue
job_type = job_info["type"]
class_name = job_info["class"]
job_handle = self._get_obj(class_name)
if is_static:
self._static_job_id_handle[job_name] = job_handle
else:
self._job_id_handle[job_name] = job_handle
cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"
params = self._join_params(job_info)
if 0 != len(params):
cmd += " , "
cmd += params
cmd += ")"
#print cmd
eval(cmd)
jobs_infoはタスクプロファイルのタスク情報を保存しています.サンプルを見てみましょう.
[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai
この構成ではtypeとclassを除いてAPSchedulerフレームワークのadd_jobメソッドのパラメータ.上海時間で、月曜日から金曜日まで、朝9時30分10秒に1回実行します.
add_jobsでclassフィールドを使用して、そのclassに対応するオブジェクトを取得します.
def _get_obj(self, _cls_name):
_packet_name = _cls_name
_module_home = __import__(_packet_name,globals(),locals(),[_cls_name])
obj = getattr(_module_home,_cls_name)
class_obj = obj()
return class_obj
ここでは、私が以前特に強調した単例の使い方についても言及します.テストを経て、「私たちの技術が株を売る道--配置マネージャ、ログマネージャ」の単例の実現は、上記の方法が同じオブジェクトを取得することを保証することができ、ネット上の他の単例モデルはだめです.オブジェクトを取得したら、実行するコマンドを組み立てます.cmd="self._sched.add_job(job_handle.run,job_type,id=job_name"ではjob_handleが上で取得したオブジェクトであり、runは各jobに必要な方法である.これは、各タスククラスがjob_baseに継承される理由でもある.
を選択して、join_paramsプロファイル内の他の情報をパラメータに組み立てて完全なコマンドをつなぐ
def _join_params(self, job_info):
params = ""
param = ""
job_type = job_info["type"]
for key in job_info.keys():
if key in conf_keys.job_conf_info_dict[job_type]:
if 0 != len(params):
params += ' , '
value = job_info[key]
if value.isdigit():
param = key + " = " + value
else:
param = key + " = '" + value + "'"
if 0 != len(param):
params += param
return params
これにより、構成中のタスクがAPSchedulerスケジューリングキューに追加されます.
タスクを削除する方法を見てみましょう
def remove_jobs(self, jobs_info):
if None == self._sched:
LOG_WARNING("job center must call start() first")
return
for job_name in jobs_info.keys():
self._sched.remove_job(job_name)
self._job_id_handle.pop(job_name)
7行目はタスク名でAPSchedulerでタスクを削除します.8行目は、タスクに対応するオブジェクトをリストから削除します.なぜ_を使うのかjob_id_handleはこれらのタスクオブジェクトを保存しますか?より大きなライフサイクルで保存しないと、ローカル変数として認識され、解放され、APSchedulerが呼び出されなくなるためです.一般的なタスクを管理するシステムタスクコードを見てみましょう
@singleton
class j_load_job_conf(job_base):
def __init__(self):
self._pre_jobs_info = {}
self._frame_conf_inst = scheduler_frame_conf_inst()
self._job_center = job_center()
def run(self):
section_name = "strategy_job"
option_name = "conf_path"
if False == self._frame_conf_inst.has_option(section_name, option_name):
LOG_WARNING("no %s %s" % (section_name, option_name))
return
conf_path = self._frame_conf_inst.get(section_name, option_name)
LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))
job_conf_parser_obj = job_conf_parser()
jobs_info = job_conf_parser_obj.parse(conf_path)
self._execute_jobs(jobs_info)
def _execute_jobs(self, jobs_info):
add_dict = {}
remove_dict = {}
modify_dict = {}
frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)
add_jobs_info = dict(add_dict, **modify_dict)
remove_jobs_info = {}
for item in modify_dict.keys():
remove_jobs_info[item] = self._pre_jobs_info[item]
LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))
LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))
if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):
return
self._pre_jobs_info = jobs_info
self._job_center.remove_jobs(remove_jobs_info)
self._job_center.add_jobs(add_jobs_info)
runメソッドは定期的に実行されます.通常のタスクプロファイル情報は、固定ディレクトリから読み込まれます.そして_execute_jobsメソッドでは,前回読み出したタスク情報と比較して,削除が必要なタスク,新規のタスク,修正が必要なタスクの3つの辞書を生成する.修正が必要なタスクは、削除後に追加される方法になります.したがって、最後に操作されるのは2つのフィールド情報です.通常のタスクについては説明しませんが、後で紹介する各キャプチャおよびオフラインコンピューティングビジネスは通常のタスクです.