分散型タスクキュー-celery+redis

11993 ワード

一.基本情報
参照先:https://www.cnblogs.com/wdliu/p/9517535.html1.概要Celeryは分散型メッセージキューであり、通常、非同期タスクスケジューリングをサポートする時間のかかる操作とタイミングタスクを実行するために使用されます.Celeryは単機のみならず、複数のマシンで同時に動作し、データセンターを跨ぐこともできます.
2.構成
  • タスクキュークライアントはタスクを実行するメッセージをタスクキューに入れ、ノードworkerプロセスを実行してキューを監視し続け、新しいタスクがある場合は実行を取り出し、すなわち生産者-消費者モデル、クライアントは生産者として、ノードworkerを消費者として実行し、それらの間はタスクキューを通じて伝達される.
  • borkerメッセージミドルウェアは、タスク生産者から送信されたタスク処理メッセージを受信し、キューに格納してからスケジューリングを行い、タスク消費者(celery worker)に配布し、redisやRabbitMQなどが一般的である.
  • worker実行ユニット、タスクキューの消費者、タスクキューを監視し続け、新しいタスクがある場合に取り出して実行する.
  • backendタスク結果格納、worker実行タスクの結果格納、CeleryはRedis、MongoDB、Django ORM、AMQPなどの
  • を含むタスクの結果格納を異なる方法でサポートする.
  • beat Celery beatは、独立したプロセスとして存在するタスクスケジューラです.Celery beatプロセスでは、プロファイルの内容が読み込まれ、定期的に設定の期限切れになったタスクがタスクキュー(タイミングタスク)に送信されます.Celery beatはCeleryシステムに付属するタスク生産者である.1つのCeleryシステムでは、Celery beatスケジューラは1つしか存在しません.

  • 二.単純な例
    1.celeryプログラムの作成
    from celery import Celery
    
    borker = "redis://127.0.0.1:6379/1"
    backend = "redis://127.0.0.1:6379/2"
    #     Celery  
    app = Celery("my_task",  broker=borker, backend=backend)
    
    #              
    @app.task
    def add(x, y):
        print("    ")
        time.sleep(4)
        return x + y
    

    2.タスク実行ユニットworkerの起動
    celery -A celery_task worker -l INFO
    
  • -A celery_taskはプログラムのモジュール名(celery_task.pyファイルに対応)を表します.
  • worker:実行ユニットを起動することを示します.
  • -lは、印刷ログのレベル
  • を示す.
    win 10起動エラー:ValueError:not enough values to unpack(expected 3,got 0)ソリューション(参考:https://blog.csdn.net/qq_30242609/article/details/79047660)サードパーティライブラリのインストール:pip install eventlet起動コマンドから:celery-A celery_task worker -l INFO -P eventlet
    3.プログラミング呼び出しタスク関数
    from celery_task import add
    
    if __name__ == "__main__":
        print("      ...")
        # delay      
        result = add.delay(3, 6)
        print("      ...")
        print(result)    #      taskid:9e5eea50-26fc-4a67-8438-a5bdd351b4e3
    
    # result.ready()              
    # result.get()               
    # result.successful()              
    

    三.Celeryプロジェクト
    1.プロジェクト構造
    celery_project
    	│ __init__.py
    	│ app.py
    	│ settings.py
    	│ task1.py
    	│ task2.py
    

    2. app.py workerのエントリ
    from celery import Celery
    
    app = Celery("demo")
    #   celery  ,      
    app.config_from_object("celery_project.settings")
    

    3. settings.pyプロファイル
    BROKER_URL = "redis://127.0.0.1:6379/1"
    
    CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2"
    
    #     ,   UTC
    CELERY_TIMEZONE = "Asia/Shanghai"
    
    #       
    CELERY_IMPORTS = (
        'celery_project.task1',
        'celery_project.task2',
    )
    
    #          
    CELERY_RESULT_SERIALIZER = 'json'
    
    #       
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
    

    4. task1.py実行するタスク関数
    import time
    from celery_project.app import app
    
    @app.task
    def add(x, y):
        time.sleep(4)
        return x+y
    

    5.workerを起動してプロジェクトディレクトリの外で実行する
    celery -A celery_project.app worker -l INFO -P eventlet
    

    6.関数呼び出し項目ディレクトリ外作成runを実行する.py
    from celery_project import task1, task2
    
    print("    ")
    task1.add.delay(3,6)
    task2.multiply.delay(2,7)
    print("    ")
    

    四.タイミングタスク
    前項目の内容1に基づく.settings.pyでのタイミングタスク構成の追加
    from datetime import timedelta
    from celery.schedules import crontab
    
    #   BEAT:celery beat -A celery_project -l INFO
    #   worker: celery -A celery_project worker -l INFO -P eventlet
    #     :celery -B -A celery_project worker -l INFO (windows      )
    CELERYBEAT_SCHEDULE = {
         
        "task1": {
         
            "task": "celery_project.task1.add",
            "schedule": timedelta(seconds=10),    #  10     
            "args": (2,6),
        },
        "task2": {
         
            "task": "celery_project.task2.multiply",
            "schedule": crontab(hour=17, minute=40),  #    17:40  
            "args": (3, 6),
        }
    }
    

    2.2つのcmdでbeatとworkerを起動する
  • BEAT celery beat-A celery_を開くproject -l INFO
  • worker celery-A celery_を開くproject worker -l INFO -P eventlet
  • 同時にcelery-B-A celery_をオンにするProject worker-l INFO(windowsではサポートされていないようです)
  • 例:https://www.imooc.com/learn/1051