pythonタイミングイニシエータメソッド
63297 ワード
第一の方法は最も簡単で暴力的だ.それは,デッドサイクルにおいてスレッド睡眠関数sleep()を用いることである.
短所:CPUメモリ、デッドサイクル+ブロックスレッド
Python標準ライブラリthreadingにはTimerクラスがあります.タイミングタスクを実行するためにスレッドが新しく起動されるので、非ブロック関数です.
2秒ごとのループ呼び出し
2秒後にコール終了
schedulerクラスによってイベントをスケジューリングし、タスクをタイミングよく実行する効果を達成します.
まずパッケージをインストールします:pip install schedule各イベントは同じスレッドで実行されるため、1つのイベントが(関数にtime.sleep(3))より長い時間かかる場合、遅延イベントが重なります.イベントを失わないように、遅延イベントは前のイベントが実行されてから実行されますが、一部の遅延イベントは元の計画されたイベントよりも遅くなる可能性があります.
コード#コード#
結果:
時間的な障害を加えないでマルチスレッド実行に相当
コード#コード#
結果:
sched-汎用時間スケジューラ
schedモジュールは、スケジューラクラスで遅延関数を使用して特定の時間を待機し、タスクを実行する汎用イベントスケジューラを実現します.同時にマルチスレッドアプリケーションをサポートし、各タスクが実行されるとすぐに遅延関数を呼び出し、他のスレッドも実行できるようにします.
コード#コード#
+++++++++++++++++++++++++++++++++++++++++++++++
以上がタイムタイマーですが、タイミングタスクをループ実行することはできません.そのため、この重任を担うライブラリが必要です.APSchedulerです
APSchedulerのフルネームはAdvanced Python Schedulerです.軽量レベルのPythonタイミングタスクスケジューリングフレームワークです.APSchedulerスケジューラ(scheduler)タスクコントローラ:executor、jobstore、triggerを構成することにより、スレッドプール(ThreadPoolExecutorデフォルト20)またはプロセスプール(ProcessPoolExecutorデフォルト5)を使用し、デフォルトで最大3つ(max_instances)のタスクインスタンスを同時に実行し、jobの削除・変更などのスケジューリング制御を実現
アプリケーション環境とAPSchedulerを使用する目的に応じて、適切なスケジューラを選択する必要があります.通常最もよく使われる2つ:
ジョブストア
トリガモード
シンプルで実用的:
コード:
もう一つは装飾器の形で
コード:
タスクの削除呼び出しremove_job()パラメータは、タスクID、タスクレジスタ名 add_を通過job()が作成したタスクインスタンスでremove()メソッドを呼び出す2つ目の方法は便利ですが、タスクインスタンスを作成するときにインスタンスが変数に保存される必要があることを前提としています.scheduled_を通過する場合job()が作成したタスクは、最初の方法しか選択できません.タスクのスケジュールが終了すると(たとえば、タスクのトリガが次回の実行時間を生成しなくなる)、タスクは自動的に を削除します.
タスクの一時停止と再開
タスクインスタンスまたはスケジューラを使用すると、タスクを一時停止およびリカバリできます.タスクが一時停止されると、タスクの次の実行時間が削除されます.タスクをリカバリする前に、実行回数のカウントも統計されません.タスクを一時停止するには、apschedulerという2つの方法があります.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_ジョブ()リカバリタスク:apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
スケジューラを閉じる
閉じる方法は次のとおりです.
タスクプロセススケジューラを一時停止、リカバリして、実行中のタスクを一時停止します.
タスクをリカバリすることもできます.
また、スケジューラの起動時にデフォルトのすべてのタスクを一時停止状態に設定することもできます.
サマータイム問題
一部のtimezoneタイムゾーンでは、サマータイムの問題がある可能性があります.これにより、コマンドの切り替え時にタスクが実行されないか、タスクが2回実行される可能性があります.この問題を回避するには、UTC時間を使用するか、実行の問題を事前に予知して計画することができます.
プログラムの実行が完了すると表示されるログを追加
閲覧ありがとうございます!!!
短所:CPUメモリ、デッドサイクル+ブロックスレッド
def doSth():
#
print(u' ')
# 1:00 ,
def main(h=1, m=0):
while True:
now = datetime.datetime.now()
print(now.hour, now.minute)
if now.hour == h and now.minute == m:
break
# 60
time.sleep(60)
doSth()
Python標準ライブラリthreadingにはTimerクラスがあります.タイミングタスクを実行するためにスレッドが新しく起動されるので、非ブロック関数です.
2秒ごとのループ呼び出し
from threading import Timer
def printHello():
print('TimeNow:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
t = Timer(2, printHello)
t.start()
if __name__ == "__main__":
printHello()
2秒後にコール終了
from threading import Timer
def task():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def printHello():
t = Timer(2, task)
t.start()
if __name__ == "__main__":
printHello()
schedulerクラスによってイベントをスケジューリングし、タスクをタイミングよく実行する効果を達成します.
まずパッケージをインストールします:pip install schedule各イベントは同じスレッドで実行されるため、1つのイベントが(関数にtime.sleep(3))より長い時間かかる場合、遅延イベントが重なります.イベントを失わないように、遅延イベントは前のイベントが実行されてから実行されますが、一部の遅延イベントは元の計画されたイベントよりも遅くなる可能性があります.
コード#コード#
import schedule
def job1():
print('Job1: 10 , 2 ')
print('Job1-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(2)
print('Job1-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job2():
print('Job2: 30 , 5 ')
print('Job2-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(5)
print('Job2-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job3():
print('Job3: 1 , 10 ')
print('Job3-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(10)
print('Job3-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job4():
print('Job4: 17:49 , 20 ')
print('Job4-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(20)
print('Job4-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job5():
print('Job5: 5 10 , 3 ')
print('Job5-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(3)
print('Job5-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
print('------------------------------------------------------------------------')
def job6():
exit()
if __name__ == '__main__':
# schedule.every(10).seconds.do(job1)
# schedule.every(30).seconds.do(job2)
# schedule.every(1).minutes.do(job3)
# schedule.every().day.at('09:29').do(job4)
schedule.every().day.at('16:54').do(job6)
schedule.every(5).to(10).seconds.do(job5)
while True:
schedule.run_pending()
結果:
------------------------------------------------------------------------
Job1: 10 , 2
Job1-startTime:2019-11-12 14:36:01
Job1-endTime:2019-11-12 14:36:03
------------------------------------------------------------------------
Job2: 30 , 5
Job2-startTime:2019-11-12 14:36:09
Job2-endTime:2019-11-12 14:36:14
------------------------------------------------------------------------
Job1: 10 , 2
Job1-startTime:2019-11-12 14:36:14
Job1-endTime:2019-11-12 14:36:16
------------------------------------------------------------------------
時間的な障害を加えないでマルチスレッド実行に相当
コード#コード#
import time
import schedule
def job():
print("I'm working")
worker_main()
def worker_main():
print('Job5-startTime:%s' % (time.time()))
# datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)
schedule.every(10).seconds.do(job)
while 1:
schedule.run_pending()
結果:
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931
I'm working
Job5-startTime:1573541137.3658931
sched-汎用時間スケジューラ
schedモジュールは、スケジューラクラスで遅延関数を使用して特定の時間を待機し、タスクを実行する汎用イベントスケジューラを実現します.同時にマルチスレッドアプリケーションをサポートし、各タスクが実行されるとすぐに遅延関数を呼び出し、他のスレッドも実行できるようにします.
コード#コード#
import sched
import time
#
scheduler = sched.scheduler(time.time, time.sleep)
def print_event(name):
print ('EVENT:', time.time(), name)
print ('START:', time.time())
# 2 、3
scheduler.enter(2, 1, print_event, ('first',))
scheduler.enter(3, 1, print_event, ('second',))
#
scheduler.run()
+++++++++++++++++++++++++++++++++++++++++++++++
以上がタイムタイマーですが、タイミングタスクをループ実行することはできません.そのため、この重任を担うライブラリが必要です.APSchedulerです
APSchedulerのフルネームはAdvanced Python Schedulerです.軽量レベルのPythonタイミングタスクスケジューリングフレームワークです.APSchedulerスケジューラ(scheduler)タスクコントローラ:executor、jobstore、triggerを構成することにより、スレッドプール(ThreadPoolExecutorデフォルト20)またはプロセスプール(ProcessPoolExecutorデフォルト5)を使用し、デフォルトで最大3つ(max_instances)のタスクインスタンスを同時に実行し、jobの削除・変更などのスケジューリング制御を実現
アプリケーション環境とAPSchedulerを使用する目的に応じて、適切なスケジューラを選択する必要があります.通常最もよく使われる2つ:
BlockingScheduler: 。
BackgroundScheduler: , 。
ジョブストア
4 , :MemoryJobStore( )、sqlalchemy( )、mongodb( )、redis( )
トリガモード
date: : , ; ,
interval: , 。
cron:cron 。
シンプルで実用的:
コード:
def tick(s):
print("Tick! The time is: %s" % datetime.now(), s)
scheduler = BlockingScheduler()
# ----------------------------------interval : -------------------------
# interval, 3
# jitter , , , 。
scheduler.add_job(tick, 'interval', seconds=3, jitter=10, id="test", args=['text'])
# 2019-04-15 17:00:00 ~ 2019-12-31 24:00:00 , job_func
a = scheduler.add_job(tick, 'interval', seconds=3, id="test1", start_date='2019-11-12 11:13:00',
end_date='2019-11-12 11:14:00')
# 2
scheduler.add_job(tick, 'interval', hours=2)
jitter , , , 。
# ( 120 ) `job_function`
sched.add_job(job_function, 'interval', hours=1, jitter=120)
# -----------------------------------cron : --------------------------------
# hour =19 ,minute =23 19:23
scheduler.add_job(tick, 'cron', hour=11, minute=50)
# 6 、7 、8 、11 12 ,00:00、01:00、02:00 03:00
scheduler.add_job(tick, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 2014-05-30 00:00:00 , 5:30
scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# ---------------------------------date : ------------------------------
# run_date date 、datetime , 2010 - 2 - 25 19: 05:06 ,args text 。
scheduler.add_job(tick, 'date', run_date=datetime(2019, 11, 12, 13, 56, 25), args=['text1'], id='job1')
scheduler.start()
もう一つは装飾器の形で
コード:
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")
# 1 30 50 scheduled_job()
@scheduler.scheduled_job('cron', day_of_week='*', hour=1, minute='30', second='00')
def tick():
print("Tick! The time is: %s" % datetime.now())
try:
scheduler.start()
print(' ')
except Exception as e:
scheduler.shutdown()
print(' ')
finally:
exit()
タスクの削除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
# id
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
タスクの一時停止と再開
タスクインスタンスまたはスケジューラを使用すると、タスクを一時停止およびリカバリできます.タスクが一時停止されると、タスクの次の実行時間が削除されます.タスクをリカバリする前に、実行回数のカウントも統計されません.タスクを一時停止するには、apschedulerという2つの方法があります.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_ジョブ()リカバリタスク:apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
スケジューラを閉じる
閉じる方法は次のとおりです.
scheduler.shutdown()
# , , 。 , , :
scheduler.shutdown(wait=False)
タスクプロセススケジューラを一時停止、リカバリして、実行中のタスクを一時停止します.
scheduler.pause()
タスクをリカバリすることもできます.
scheduler.resume()
また、スケジューラの起動時にデフォルトのすべてのタスクを一時停止状態に設定することもできます.
scheduler.start(paused=True)
サマータイム問題
一部のtimezoneタイムゾーンでは、サマータイムの問題がある可能性があります.これにより、コマンドの切り替え時にタスクが実行されないか、タスクが2回実行される可能性があります.この問題を回避するには、UTC時間を使用するか、実行の問題を事前に予知して計画することができます.
# Europe/Helsinki , ;
sched.add_job(job_function, 'cron', hour=3, minute=30)
プログラムの実行が完了すると表示されるログを追加
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log1.log',
filemode='a',
)
logging.debug(" ")
logging.info(" ")
logging.warning(" ")
logging.error(" ")
logging.critical(" ")
def aps_test(x):
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
print(1 / 0)
def date_test(x):
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def my_listener(event):
if event.exception:
a = ' '
print(a)
else:
a = ' '
print(a)
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(' , ',), next_run_time=datetime.now() + timedelta(seconds=13),
id='date_task')
scheduler.add_job(func=date_test, args=(' ',), trigger='interval', seconds=5, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging
scheduler.start()
閲覧ありがとうございます!!!