PythonタイミングタスクAPSchedulerのインストールと使用解析


1、概要
APSchedulerはPythonのタイムミッションフレームで、使いやすいです。日付、固定時間間隔、およびCrontabタイプに基づくタスクを提供し、タスクを永続化し、daemen方式でアプリケーションを実行することができます。
2、APSchedulerの四つのコンポーネント
APSchedulerの4つのコンポーネントはそれぞれ、トリガー、ジョブストア、アクチュエータ、スケジューラです。
トリガー
スケジュールロジックを含めて、各作業には自分の触発器があり、次のどの作業が実行されるかを決めるために使われます。彼ら自身の初期配置以外は、トリガーは完全に無状態です。
APSchedulerには三種類のビルドがあります。
  • date:特定の時点でトリガ
  • interval:固定時間間隔トリガ
  • cron:特定の時間に
  • を周期的にトリガする。
    ジョブストア(job store)
    スケジュールされたジョブを格納し、デフォルトのジョブストアは簡単にジョブをメモリに保存し、他のジョブストアはジョブをデータベースに保存する。一つの作業のデータは、耐久化された作業の記憶に保存されている間は、順序化され、ロードされている時は、逆順序化されます。スケジューラは同じ作業メモリを共有できません。
    APSchedulerはデフォルトではMemoryJobStoreを使用しています。DBメモリ方式を変更できます。
    アクチュエータ(exector)
    作業の実行は、通常、作業中に作成された呼び出し可能なオブジェクトをスレッドまたは池に提出することによって行われる。作業が完了すると、アクチュエータがスケジューラに通知します。
    最もよく使われるexectorには二つの種類があります。
  • ProcesssPool Exector
  • ThreadPool Exector
  • スケジューラ(scheduler)
    通常、アプリケーションには1つのスケジューラしかありません。アプリケーションの開発者は通常、作業メモリ、スケジューラ、触発器を直接処理しません。逆に、スケジューラはこれらを処理するための適切なインターフェースを提供します。ジョブの記憶と実行器の設定は、スケジューラにおいて完了してもよく、例えば、追加、修正、除去作業などがあります。
    2、据え付け$ pip install apscheduler次に簡単ないくつかの例を見ます。
    
    ===============interval:         ===============
    from apscheduler.schedulers.blocking import BlockingScheduler
    from datetime import datetime
    
    def job():
      print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    #   BlockingScheduler
    sched = BlockingScheduler()
    sched.add_job(job, 'interval', seconds=5) 
    sched.start()
    
    ===============cron:           ===============
    import time
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    def job(text):
      t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
      print('{} --- {}'.format(text, t))
    
    scheduler = BlockingScheduler()
    #    22 ,   1        job   
    scheduler.add_job(job, 'cron', hour=17, minute='*/1', args=['job1'])
    #    22 23  25 ,     job   
    scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
    
    scheduler.start()
    装飾器でscheduled_job()添加方法
    
    import time
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    
    @scheduler.scheduled_job('interval', seconds=5)
    def job1():
      t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
      print('job1 --- {}'.format(t))
    
    @scheduler.scheduled_job('cron', second='*/7')
    def job2():
      t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
      print('job2 --- {}'.format(t))
    
    scheduler.start()
    以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。