Clery使用+

4623 ワード

プロジェクトでセルを使う
プロジェクト構造:
proj/__init__.py  #        django         django   
      /celery.py
      /tasks.py
proj/celler y.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
         broker='amqp://',  #        
         backend='amqp://',  #        
         include=['proj.tasks'])  #    

if __name__ == '__main__':
    app.start()
proj/tasky.py
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y): 
    return x + y
作業プロセスを開始:
celery -A proj worker -l info
[email protected] 4.0(latentcal)--*---------------***、[[Configration]-*、*-***..brook er:amqp://guest@local host:5672/
  • *------app:main:0 x 1012 d 8590
  • *------concurrency:8(processes)
  • *------events:OFF(enable-E to monitor this worker)
  • *------
  • ***---[Queues]、*****--.celly:exchange:cellery binding:cellery-------
  • [2012-06-08 16:23:51,078:WARNING/MainProcess][email protected] started.
    brook erとは、設定中のメッセージ・キューであり、concurrencyとはワーク・プロセス数(デフォルトはcpu数)である.すべてのプロセスが占有されている場合、新しいタスクは待つ必要があり、eventsとは、指定されたcell ryが監視情報を送信するかどうか、queueはキューである.
    作業のプロセスを終了します
    直接control-c(上述の命令に従ってフロントで起動する)は、もちろんバックグラウンドで起動と停止する場合:
    celery multi start w1 -A proj -l info
    celery  multi restart w1 -A proj -l info
    
    #          
    celery multi stop w1 -A proj -l info
    #         
    celery multi stopwait w1 -A proj -l info
    
    デフォルトでは、現在のディレクトリでpidとロゴファイルを作成します.パスと名前を指定します.
    celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log
    
    -A(--app)オプションについては、指定されたcerryapは、module.path:atributeの形式で指定できますし、直接にpackageを指定することもできます.
    作成されたワーカーを操作するときは、パラメータなど全く同じです.pidfileとlogfileが同じであれば大丈夫です.
    呼び出し
    add.delay(1, 1)
    add.apply_async((1, 1))
    
    add.apply_async((2, 2), queue='lopri', countdown=10)
    #                  countdown
    
    #     
    res  = add.delay(1, 1)
    res.get()
    
    celleryデフォルトでは結果が発生しない理由は、さまざまなアプリケーションのニーズが異なるため、大多数のタスクの保存リターン値は意味がないということです.また、結果はタスクや作業プロセスを監視するためのものではなく、イベントメッセージを使用する専用の監視モジュールです.
    res.id  #   id uuid
    
    #      raise       propagate
    res.get(propagate=False)
    res.failed()
    res.successful()
    res.state
    
    # PENDING -> STARTED -> SUCCESS(FAILURE)
    # STARTED      task_track_started         @task(track_started=True)
    #        
    # PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
    
    ワークフロー
    タスクの署名(実装とタスクの実行オプションを含む)を別のプロセスに送信したり、パラメータを別の関数に送信したりしたい場合があります.
    add.signature((2, 2), countdown=10)  # with args and kwargs
    add.s(2, 2)  # with args only
    
    そして得られた署名を呼び出すことができます.
    s1 = add.s(2, 2)
    res = s1.delay()
    res.get()
    
    #                         
    s2 = add.s(2)
    res = s2.delay(8)
    res.get()
    
    呼び出し方式
    今まで使っていたのはdelay(*args、****kwargs)という方式です.asyncのコール方式は、実行時のオプションをサポートします.
    Groups
    groupは同時に起動された一連のタスクであり、特殊な結果を返してグループ内のすべての実行結果を得ることができ、タスクの順序に従ってその結果から対応するタスクの結果を得ることができる.
    group(add.s(i, i) for i in xrange(10))().get()
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    
    #          
    g = group(add.s(i) for i in xrange(10))
    g(10).get()
    [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    
    Chins
    タスクを接続し、一つ実行したら結果を別の呼び出しに入れます.
    chain(add.s(4, 4) | mul.s(8))().get()
    g = chain(add.s(4) | mul.s(8))
    g(4).get()
    #         
    
    chorrds
    チョルドはcalbackのあるグループです.
    from celery import chord
    from proj.tasks import add, xsum
    
    chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
    #  group                 chord
    (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
    
    #               
    upload_doc.s(file) | group(apply_filter.s() for filter in filters)
    
    Routing
    appレベルでtaskをどの列に配置しますか?
    app.conf.update(
        task_routes = {
            'proj.tasks.add': {'queue': 'hipri'},
        },
    )
    
    呼び出し時に使用するキューを指定します.
    add.apply_async((2, 2), queue='hipri')
    
    作業プロセスを開始するときに、プロセス処理のキューを指定します.
    celery -A proj worker -Q hipri
    
    #                 :celery
    celery -A proj worker -Q hipri,celery
    
    リモートコントロール
    現在の作業を表示:
    celery -A proj inspect active
    
    このコマンドはラジオで、すべての作業プロセスが受信されます.(イベントの有効化が必要です)
    celery -A proj inspect active [email protected]
    
    ワークプロセスをイベントを有効にする
    celery -A proj control enable_events