pythonタイミングイニシエータメソッド

63297 ワード

第一の方法は最も簡単で暴力的だ.それは,デッドサイクルにおいてスレッド睡眠関数sleep()を用いることである.
短所:CPUメモリ、デッドサイクル+ブロックスレッド
def doSth():
    #            
    print(u'             ')


#       1:00     ,          

def main(h=1, m=0):
    while True:
        now = datetime.datetime.now()
        print(now.hour, now.minute)
        if now.hour == h and now.minute == m:
            break
        #   60     
        time.sleep(60)
    doSth()

Python標準ライブラリthreadingにはTimerクラスがあります.タイミングタスクを実行するためにスレッドが新しく起動されるので、非ブロック関数です.
2秒ごとのループ呼び出し
from threading import Timer


def printHello():
    print('TimeNow:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    t = Timer(2, printHello)
    t.start()

if __name__ == "__main__":
    printHello()

2秒後にコール終了
from threading import Timer


def task():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
def printHello():
    t = Timer(2, task)
    t.start()


if __name__ == "__main__":
    printHello()

schedulerクラスによってイベントをスケジューリングし、タスクをタイミングよく実行する効果を達成します.
まずパッケージをインストールします:pip install schedule各イベントは同じスレッドで実行されるため、1つのイベントが(関数にtime.sleep(3))より長い時間かかる場合、遅延イベントが重なります.イベントを失わないように、遅延イベントは前のイベントが実行されてから実行されますが、一部の遅延イベントは元の計画されたイベントよりも遅くなる可能性があります.
コード#コード#
import schedule


def job1():
    print('Job1:  10        ,    2 ')
    print('Job1-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(2)
    print('Job1-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job2():
    print('Job2:  30     ,    5 ')
    print('Job2-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(5)
    print('Job2-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job3():
    print('Job3:  1      ,    10 ')
    print('Job3-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(10)
    print('Job3-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job4():
    print('Job4:    17:49    ,    20 ')
    print('Job4-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(20)
    print('Job4-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job5():
    print('Job5:  5  10     ,    3 ')
    print('Job5-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(3)
    print('Job5-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job6():
    exit()


if __name__ == '__main__':
    # schedule.every(10).seconds.do(job1)
    # schedule.every(30).seconds.do(job2)
    # schedule.every(1).minutes.do(job3)
    # schedule.every().day.at('09:29').do(job4)
    schedule.every().day.at('16:54').do(job6)
    schedule.every(5).to(10).seconds.do(job5)
    while True:
        schedule.run_pending()

結果:
------------------------------------------------------------------------
Job1:  102 
Job1-startTime:2019-11-12 14:36:01
Job1-endTime:2019-11-12 14:36:03
------------------------------------------------------------------------
Job2:  305 
Job2-startTime:2019-11-12 14:36:09
Job2-endTime:2019-11-12 14:36:14
------------------------------------------------------------------------
Job1:  102 
Job1-startTime:2019-11-12 14:36:14
Job1-endTime:2019-11-12 14:36:16
------------------------------------------------------------------------

時間的な障害を加えないでマルチスレッド実行に相当
コード#コード#
import time

import schedule


def job():
    print("I'm working")
    worker_main()

def worker_main():
    print('Job5-startTime:%s' % (time.time()))
# datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')


schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)


while 1:
    schedule.run_pending()

結果:
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931

sched-汎用時間スケジューラ
schedモジュールは、スケジューラクラスで遅延関数を使用して特定の時間を待機し、タスクを実行する汎用イベントスケジューラを実現します.同時にマルチスレッドアプリケーションをサポートし、各タスクが実行されるとすぐに遅延関数を呼び出し、他のスレッドも実行できるようにします.
コード#コード#
import sched
import time

#     
scheduler = sched.scheduler(time.time, time.sleep)


def print_event(name):
    print ('EVENT:', time.time(), name)

print ('START:', time.time())

#        2 、3         
scheduler.enter(2, 1, print_event, ('first',))
scheduler.enter(3, 1, print_event, ('second',))

#     
scheduler.run()

+++++++++++++++++++++++++++++++++++++++++++++++
以上がタイムタイマーですが、タイミングタスクをループ実行することはできません.そのため、この重任を担うライブラリが必要です.APSchedulerです
APSchedulerのフルネームはAdvanced Python Schedulerです.軽量レベルのPythonタイミングタスクスケジューリングフレームワークです.APSchedulerスケジューラ(scheduler)タスクコントローラ:executor、jobstore、triggerを構成することにより、スレッドプール(ThreadPoolExecutorデフォルト20)またはプロセスプール(ProcessPoolExecutorデフォルト5)を使用し、デフォルトで最大3つ(max_instances)のタスクインスタンスを同時に実行し、jobの削除・変更などのスケジューリング制御を実現
アプリケーション環境とAPSchedulerを使用する目的に応じて、適切なスケジューラを選択する必要があります.通常最もよく使われる2つ:
BlockingScheduler:                    。 

BackgroundScheduler:              ,               。

ジョブストア
  4     ,   :MemoryJobStore(      )、sqlalchemy(      )、mongodb(     )、redis(         )

トリガモード
date:       :       ,        ;         ,       
interval:       ,            。
cron:cron       。

シンプルで実用的:
コード:
def tick(s):
    print("Tick! The time is: %s" % datetime.now(), s)


scheduler = BlockingScheduler()
# ----------------------------------interval  :           -------------------------
#      interval,   3      
# jitter    ,               ,         ,            。
scheduler.add_job(tick, 'interval', seconds=3, jitter=10, id="test", args=['text'])
#   2019-04-15 17:00:00 ~ 2019-12-31 24:00:00   ,           job_func   
a = scheduler.add_job(tick, 'interval', seconds=3, id="test1", start_date='2019-11-12 11:13:00',
                      end_date='2019-11-12 11:14:00')
#  2    
scheduler.add_job(tick, 'interval', hours=2)
jitter    ,               ,         ,            。
#    (    120    )  `job_function`
sched.add_job(job_function, 'interval', hours=1, jitter=120)
# -----------------------------------cron  :         --------------------------------

# hour =19 ,minute =23        19:23      
scheduler.add_job(tick, 'cron', hour=11, minute=50)
#     6 、7 、8 、11  12       ,00:00、01:00、02:00 03:00  
scheduler.add_job(tick, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
#  2014-05-30 00:00:00 ,        5:30  
scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

# ---------------------------------date  :           ------------------------------
#   run_date     date  、datetime       ,  2010 - 2 - 25 19: 05:06    ,args    text  。
scheduler.add_job(tick, 'date', run_date=datetime(2019, 11, 12, 13, 56, 25), args=['text1'], id='job1')


scheduler.start()

もう一つは装飾器の形で
コード:
from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()


@scheduler.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")
    
#       1 30 50     scheduled_job()     
@scheduler.scheduled_job('cron', day_of_week='*', hour=1, minute='30', second='00')
def tick():
    print("Tick! The time is: %s" % datetime.now())

try:
    scheduler.start()
    print('        ')
except Exception as e:
    scheduler.shutdown()
    print('        ')
finally:
    exit()

タスクの削除
  • 呼び出しremove_job()パラメータは、タスクID、タスクレジスタ名
  • add_を通過job()が作成したタスクインスタンスでremove()メソッドを呼び出す2つ目の方法は便利ですが、タスクインスタンスを作成するときにインスタンスが変数に保存される必要があることを前提としています.scheduled_を通過する場合job()が作成したタスクは、最初の方法しか選択できません.タスクのスケジュールが終了すると(たとえば、タスクのトリガが次回の実行時間を生成しなくなる)、タスクは自動的に
  • を削除します.
    job = scheduler.add_job(myfunc, 'interval', minutes=2)
    job.remove()
    #   id
    scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
    scheduler.remove_job('my_job_id')
    

    タスクの一時停止と再開
    タスクインスタンスまたはスケジューラを使用すると、タスクを一時停止およびリカバリできます.タスクが一時停止されると、タスクの次の実行時間が削除されます.タスクをリカバリする前に、実行回数のカウントも統計されません.タスクを一時停止するには、apschedulerという2つの方法があります.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_ジョブ()リカバリタスク:apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
    スケジューラを閉じる
    閉じる方法は次のとおりです.
    scheduler.shutdown()
    #      ,                ,            。  ,        ,       :
    scheduler.shutdown(wait=False)
    

    タスクプロセススケジューラを一時停止、リカバリして、実行中のタスクを一時停止します.
    scheduler.pause()
    

    タスクをリカバリすることもできます.
    scheduler.resume()
    

    また、スケジューラの起動時にデフォルトのすべてのタスクを一時停止状態に設定することもできます.
    scheduler.start(paused=True)
    

    サマータイム問題
    一部のtimezoneタイムゾーンでは、サマータイムの問題がある可能性があります.これにより、コマンドの切り替え時にタスクが実行されないか、タスクが2回実行される可能性があります.この問題を回避するには、UTC時間を使用するか、実行の問題を事前に予知して計画することができます.
    #  Europe/Helsinki  ,               ;              
    sched.add_job(job_function, 'cron', hour=3, minute=30)
    

    プログラムの実行が完了すると表示されるログを追加
    from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
    import logging
    
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        filename='log1.log',
                        filemode='a',
                        )
    logging.debug("    ")
    logging.info("    ")
    logging.warning("    ")
    logging.error("    ")
    logging.critical("      ")
    
    
    def aps_test(x):
        print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
        print(1 / 0)
    
    
    def date_test(x):
        print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
    
    
    def my_listener(event):
        if event.exception:
            a = '     '
            print(a)
    
    
        else:
            a = '      '
            print(a)
    
    
    
    scheduler = BlockingScheduler()
    scheduler.add_job(func=aps_test, args=('     ,   ',), next_run_time=datetime.now() + timedelta(seconds=13),
                      id='date_task')
    scheduler.add_job(func=date_test, args=('    ',), trigger='interval', seconds=5, id='interval_task')
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    scheduler._logger = logging
    
    scheduler.start()
    

    閲覧ありがとうございます!!!