python非同期でタイミングタスクとサイクルタスクを実現する方法


一.どうやって呼び出しますか

def f1(arg1, arg2):
  print('f1', arg1, arg2)
 
 
def f2(arg1):
  print('f2', arg1)
 
 
def f3():
  print('f3')
 
 
def f4():
  print('    ', int(time.time()))
 
 
timer = TaskTimer()
#          
timer.join_task(f1, [1, 2], timing=15.5) #   15:30  
timer.join_task(f2, [3], timing=14) #   14:00  
timer.join_task(f3, [], timing=15) #   15:00  
timer.join_task(f4, [], interval=10) #  10   1 
#     (        )
timer.start()
f 1~f 4はタイミング実行が必要な関数です。
まずTaskyTimerオブジェクト(TaskyTimerのコードは以下の通り)を作成します。ジョイン_を呼び出しますtask関数は、実行する関数をタスクキューに追加します。最後にstartを呼び出すと、タスクが実行されます。
ジョン.taskパラメータ:
fun:実行したい関数
arg:funのパラメータは、ないと空のリストに伝えられます。
interval:このパラメータがあれば、タスクはサイクルタスク、単位は秒です。
timing:このパラメータがあれば、タスクがタイミングタスクであることを説明します。
注意:intervalとtimingは1つしか書けません。
ソースコード

import datetime
import time
from threading import Thread
from time import sleep
 
 
class TaskTimer:
  __instance = None
 
  def __new__(cls, *args, **kwargs):
    """
        
    """
    if not cls.__instance:
      cls.__instance = object.__new__(cls)
    return cls.__instance
 
  def __init__(self):
    if not hasattr(self, 'task_queue'):
      setattr(self, 'task_queue', [])
 
    if not hasattr(self, 'is_running'):
      setattr(self, 'is_running', False)
 
  def write_log(self, level, msg):
    cur_time = datetime.datetime.now()
    with open('./task.log', mode='a+', encoding='utf8') as file:
      s = "[" + str(cur_time) + "][" + level + "]  " + msg
      print(s)
      file.write(s + "
") def work(self): """ """ while True: for task in self.task_queue: if task['interval']: self.cycle_task(task) elif task['timing']: self.timing_task(task) sleep(5) def cycle_task(self, task): """ """ if task['next_sec'] <= int(time.time()): try: task['fun'](*task['arg']) self.write_log(" ", " :" + task['fun'].__name__ + " ") except Exception as e: self.write_log(" ", " :" + task['fun'].__name__ + " :" + str(e)) finally: task['next_sec'] = int(time.time()) + task['interval'] def timing_task(self, task): """ """ # today_sec = self.get_today_until_now() # , if task['today'] != self.get_today(): task['today'] = self.get_today() task['today_done'] = False # if task['first_work']: if today_sec >= task['task_sec']: task['today_done'] = True task['first_work'] = False else: task['first_work'] = False # if not task['today_done']: if today_sec >= task['task_sec']: # , try: task['fun'](*task['arg']) self.write_log(" ", " :" + task['fun'].__name__ + " ") except Exception as e: self.write_log(" ", " :" + task['fun'].__name__ + " :" + str(e)) finally: task['today_done'] = True if task['first_work']: task['first_work'] = False def get_today_until_now(self): """ """ i = datetime.datetime.now() return i.hour * 3600 + i.minute * 60 + i.second def get_today(self): """ """ i = datetime.datetime.now() return i.day def join_task(self, fun, arg, interval=None, timing=None): """ interval timing 1 :param fun: :param arg: fun :param interval: , :param timing: , :[0,24) """ # if (interval != None and timing != None) or (interval == None and timing == None): raise Exception('interval timing 1 ') if timing and not 0 <= timing < 24: raise Exception('timing [0,24)') if interval and interval < 5: raise Exception('interval 5') # task task = { 'fun': fun, 'arg': arg, 'interval': interval, 'timing': timing, } # if timing: task['task_sec'] = timing * 3600 task['today_done'] = False task['first_work'] = True task['today'] = self.get_today() elif interval: task['next_sec'] = int(time.time()) + interval # task self.task_queue.append(task) self.write_log(" ", " :" + fun.__name__) def start(self): """ """ if not self.is_running: thread = Thread(target=self.work) thread.start() self.is_running = True self.write_log(" ", "TaskTimer !") return thread.ident self.write_log(" ", "TaskTimer , !")
以上のpythonはタイミングタスクとサイクルタスクを実現する方法は編集者が皆さんに提供した内容を全部共有することです。参考にしていただければと思います。よろしくお願いします。