Pythonソース学習Schedule

17828 ワード

私のプログラミング界の小さなプログラム猿について、現在、創業チームでteam leadを務めています.テクノロジースタックはAndroid、Python、Java、Goに関連しています.これも私たちのチームの主なテクノロジースタックです.連絡先:[email protected]
前回は簡単なPythonスケジューラの使用を紹介しましたが、その後、ソースコードをめくってみると、コアライブラリが1つのファイルしかなく、コード量が700行未満であることに驚きました.これは絶好の学習材料です.私を喜ばせたのは、このライブラリの作者が最近読んだ本「Python Tricks」の作者だったことだ.今、大神の実現構想を見てみましょう.
0 x 00準備
プロジェクトアドレス
github.com/dbader/sche…
コードcheckoutをローカルに
環境PyCharm+venv+Python3
0 x 01使用法
これは前編でも紹介しましたが、とても簡単です
import schedule

#          
def job():
    print("a simple scheduler in python.")

#        ,    2     
schedule.every(2).seconds.do(job)

if __name__ == '__main__':
    while True:
        schedule.run_pending()

#     
a simple scheduler in python.
a simple scheduler in python.
a simple scheduler in python.
...

このライブラリのドキュメントも詳しく、schedule.readthedocs.io/ライブラリの概要を参照できます.
0 x 02プロジェクト構造
(venv) ➜  schedule git:(master) tree -L 2
.
...
├── requirements-dev.txt
├── schedule
│   └── __init__.py
├── setup.py
├── test_schedule.py
├── tox.ini
└── venv
    ├── bin
    ├── include
    ├── lib
    ├── pip-selfcheck.json
    └── pyvenv.cfg

8 directories, 18 files

  • scheduleディレクトリの下に__init__.pyファイルがあります.これは私たちが重点的に学ぶ必要がある場所です.
  • setup.pyファイルはパブリッシュプロジェクトのプロファイル
  • test_schedule.pyはユニットテストファイルで、最初はドキュメントを見る以外に、ユニットテストから着手して、このライブラリの使用
  • を理解することができます.
  • requirements-dev.txt開発環境の依存ライブラリファイルは、コアのライブラリがサードパーティの依存を必要としない場合、ユニットテストは
  • venvは私checkoutの後に作成されたもので、元のプロジェクトはありません
  • 0x03 schedule __init__.pyPythonパッケージを定義するために必要なファイルであることを知っています.このファイルで定義するメソッド、クラスは、importコマンドを使用するときにプロジェクトにインポートし、使用できます.
    scheduleソース
    以下はscheduleで使用されるモジュールであり、いずれもPython内部のモジュールである.
    import collections
    import datetime
    import functools
    import logging
    import random
    import re
    import time
    
    logger = logging.getLogger('schedule')
    

    次に、ログ印刷ツールのインスタンスを定義します.
    次に,このモジュールを定義した3つの異常クラスの構造体系は,Exceptionから派生し,ScheduleError,ScheduleValueErrorおよびIntervalErrorである.
    class ScheduleError(Exception):
        """Base schedule exception"""
        pass
    
    class ScheduleValueError(ScheduleError):
        """Base schedule value error"""
        pass
    
    class IntervalError(ScheduleValueError):
        """An improper interval was used"""
        pass
    
    

    スケジューラの継続的な実行を取り消すCancelJobクラスも定義されています.
    class CancelJob(object):
        """
        Can be returned from a job to unschedule itself.
        """
        pass
    
    

    例えば、このCancelJobクラスをカスタマイズされたスケジューリング方法で返すことで、使い捨てのタスクを実現することができる.
    #          
    def job():
        print("a simple scheduler in python.")
        #   CancelJob            
        return schedule.CancelJob
    

    次に、このライブラリの2つのコアクラスSchedulerJobです.
    class Scheduler(object):
        """
        Objects instantiated by the :class:`Scheduler ` are
        factories to create jobs, keep record of scheduled jobs and
        handle their execution.
        """
        
    class Job(object):
        """
        A periodic job as used by :class:`Scheduler`.
    
        :param interval: A quantity of a certain time unit
        :param scheduler: The :class:`Scheduler ` instance that
                          this job will register itself with once it has
                          been fully configured in :meth:`Job.do()`.
    
        Every job runs at a given fixed time interval that is defined by:
    
        * a :meth:`time unit `
        * a quantity of `time units` defined by `interval`
    
        A job is usually created and returned by :meth:`Scheduler.every`
        method, which also defines its `interval`.
        """
    
    Schedulerはスケジューラの実装クラスであり、スケジューラタスク(job)の作成と実行を担当します.Jobは、タスクを実行する必要がある抽象である.
    この2つのクラスはこのライブラリのコアであり、後で詳細な分析も見られます.次に、デフォルトスケジューラdefault_schedulerとタスクリストjobsの作成です.
    # The following methods are shortcuts for not having to
    # create a Scheduler instance:
    
    #: Default :class:`Scheduler ` object
    default_scheduler = Scheduler()
    
    #: Default :class:`Jobs ` list
    jobs = default_scheduler.jobs  # todo: should this be a copy, e.g. jobs()?
    
    import scheduleを実行すると、デフォルトでdefault_schedulerが作成されます.Schedulerの構造方法は
    def __init__(self):
        self.jobs = []
    

    初期化を実行すると、スケジューラは空のタスクリストを作成します.
    ファイルの最後にチェーン呼び出しの方法を定義し、使用するのも非常に人間的で、学ぶ価値があります.ここでの方法は、モジュールの下に定義され、default_schedulerインスタンスがカプセル化された呼び出しである.
    def every(interval=1):
        """Calls :meth:`every ` on the
        :data:`default scheduler instance `.
        """
        return default_scheduler.every(interval)
    
    
    def run_pending():
        """Calls :meth:`run_pending ` on the
        :data:`default scheduler instance `.
        """
        default_scheduler.run_pending()
    
    
    def run_all(delay_seconds=0):
        """Calls :meth:`run_all ` on the
        :data:`default scheduler instance `.
        """
        default_scheduler.run_all(delay_seconds=delay_seconds)
    
    
    def clear(tag=None):
        """Calls :meth:`clear ` on the
        :data:`default scheduler instance `.
        """
        default_scheduler.clear(tag)
    
    
    def cancel_job(job):
        """Calls :meth:`cancel_job ` on the
        :data:`default scheduler instance `.
        """
        default_scheduler.cancel_job(job)
    
    
    def next_run():
        """Calls :meth:`next_run ` on the
        :data:`default scheduler instance `.
        """
        return default_scheduler.next_run
    
    
    def idle_seconds():
        """Calls :meth:`idle_seconds ` on the
        :data:`default scheduler instance `.
        """
        return default_scheduler.idle_seconds
    

    入口メソッドrun_pending()を参照すると、本明細書の冒頭のDemoから、これがスケジューラを起動する方法であることが分かる.ここでは、default_schedulerのメソッドを実行します.
    default_scheduler.run_pending()
    

    だから私たちはScheduler類の相応の方法に目を向けました
    def run_pending(self):
        """
        Run all jobs that are scheduled to run.
    
        Please note that it is *intended behavior that run_pending()
        does not run missed jobs*. For example, if you've registered a job
        that should run every minute and you only call run_pending()
        in one hour increments then your job won't be run 60 times in
        between but only once.
        """
        runnable_jobs = (job for job in self.jobs if job.should_run)
        for job in sorted(runnable_jobs):
            self._run_job(job)
    

    この方法では、まずjobsリストから実行する必要があるタスクをフィルタリングしてrunnable_jobsリストに配置し、それをソートして内部の_run_job(job)方法を順次実行する.
    def _run_job(self, job):
        ret = job.run()
        if isinstance(ret, CancelJob) or ret is CancelJob:
            self.cancel_job(job)
    
    _run_jobメソッドではjobクラスのrunメソッドが呼び出され、戻り値に基づいてタスクのキャンセルが必要かどうかを判断します.
    このとき、Jobクラスの実装ロジックを見てみましょう.
    まず、Jobがいつ作成されたかを見てみましょう.やはりDemoのコードから
    schedule.every(2).seconds.do(job)
    

    ここではまずschedule.every()メソッドを実行した
    def every(interval=1):
        """Calls :meth:`every ` on the
        :data:`default scheduler instance `.
        """
        return default_scheduler.every(interval)
    

    この方法はschedulerクラスのeveryメソッドです
    def every(self, interval=1):
        """
        Schedule a new periodic job.
    
        :param interval: A quantity of a certain time unit
        :return: An unconfigured :class:`Job `
        """
        job = Job(interval, self)
        return job
    

    ここでは、タスクjobが作成され、パラメータintervalおよびschedulerインスタンスが構築方法に転送され、最後にjobインスタンスがチェーン呼び出しを実現するために返される.Jobにジャンプする構造方法
    def __init__(self, interval, scheduler=None):
        self.interval = interval  # pause interval * unit between runs
        self.latest = None  # upper limit to the interval
        self.job_func = None  # the job job_func to run
        self.unit = None  # time units, e.g. 'minutes', 'hours', ...
        self.at_time = None  # optional time at which this job runs
        self.last_run = None  # datetime of the last run
        self.next_run = None  # datetime of the next run
        self.period = None  # timedelta between runs, only valid for
        self.start_day = None  # Specific day of the week to start on
        self.tags = set()  # unique set of tags for the job
        self.scheduler = scheduler  # scheduler to register with
    

    主に間隔時間構成,実行する方法,スケジューラの各種時間単位などを初期化した.everyメソッドを実行した後、secondsというプロパティメソッドが呼び出されました.
    @property
    def seconds(self):
        self.unit = 'seconds'
        return self
    

    時間単位が設定されており、この設定秒には、もちろん他の類似の属性方法minuteshoursdaysなどがある.
    最後にdoメソッドを実行しました
    def do(self, job_func, *args, **kwargs):
        """
        Specifies the job_func that should be called every time the
        job runs.
    
        Any additional arguments are passed on to job_func when
        the job runs.
    
        :param job_func: The function to be scheduled
        :return: The invoked job instance
        """
        self.job_func = functools.partial(job_func, *args, **kwargs)
        try:
            functools.update_wrapper(self.job_func, job_func)
        except AttributeError:
            # job_funcs already wrapped by functools.partial won't have
            # __name__, __module__ or __doc__ and the update_wrapper()
            # call will fail.
            pass
        self._schedule_next_run()
        self.scheduler.jobs.append(self)
        return self
    

    ここでは、functoolsツールのバイアス関数partialを使用して、カスタマイズした方法を呼び出し可能なオブジェクトにカプセル化します.
    それから_schedule_next_run方法を呼び出して、それは主に時間に対する解析で、時間によってjobを並べ替えて、私はこの方法が本プロジェクトの中の技術点だと思って、論理も少し複雑でなくして、よく読んで理解することができて、主に時間datetimeの使用に対してです.紙面のため、ここにはコードが貼られません.
    ここでタスクjobの追加を完了します.その後、run_pendingメソッドを呼び出すと、タスクを実行できます.
    0 x 04まとめてscheduleライブラリは、2つのコアクラスSchedulerおよびJobを定義します.パッケージをインポートすると、デフォルトでSchedulerオブジェクトが作成され、タスクリストが初期化されます.scheduleモジュールはチェーン呼び出しのインタフェースを提供し、scheduleパラメータを構成すると、タスクオブジェクトjobが作成され、jobがタスクリストに追加され、最後にrun_pendingメソッドを実行すると、カスタムメソッドが呼び出されます.このライブラリの核心思想は対象向けの方法を使用して、物事に対して正確に抽象することができて、その全体の論理は複雑ではありませんて、ソースコードを学ぶのはとても良い例です.
    0 x 05学習資料
  • github.com/dbader/sche…
  • schedule.readthedocs.io

  • 転載先:https://juejin.im/post/5d2c7f286fb9a07ef161b444