Clery使用+
4623 ワード
プロジェクトでセルを使う
プロジェクト構造:*------app:main:0 x 1012 d 8590 *------concurrency:8(processes) *------events:OFF(enable-E to monitor this worker) *------ ***---[Queues]、*****--.celly:exchange:cellery binding:cellery------- [2012-06-08 16:23:51,078:WARNING/MainProcess][email protected] started.
brook erとは、設定中のメッセージ・キューであり、concurrencyとはワーク・プロセス数(デフォルトはcpu数)である.すべてのプロセスが占有されている場合、新しいタスクは待つ必要があり、eventsとは、指定されたcell ryが監視情報を送信するかどうか、queueはキューである.
作業のプロセスを終了します
直接control-c(上述の命令に従ってフロントで起動する)は、もちろんバックグラウンドで起動と停止する場合:
作成されたワーカーを操作するときは、パラメータなど全く同じです.pidfileとlogfileが同じであれば大丈夫です.
呼び出し
タスクの署名(実装とタスクの実行オプションを含む)を別のプロセスに送信したり、パラメータを別の関数に送信したりしたい場合があります.
今まで使っていたのはdelay(*args、****kwargs)という方式です.asyncのコール方式は、実行時のオプションをサポートします.
Groups
groupは同時に起動された一連のタスクであり、特殊な結果を返してグループ内のすべての実行結果を得ることができ、タスクの順序に従ってその結果から対応するタスクの結果を得ることができる.
タスクを接続し、一つ実行したら結果を別の呼び出しに入れます.
チョルドはcalbackのあるグループです.
appレベルでtaskをどの列に配置しますか?
現在の作業を表示:
プロジェクト構造:
proj/__init__.py # django django
/celery.py
/tasks.py
proj/celler y.pyfrom __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('proj',
broker='amqp://', #
backend='amqp://', #
include=['proj.tasks']) #
if __name__ == '__main__':
app.start()
proj/tasky.pyfrom __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
作業プロセスを開始:celery -A proj worker -l info
[email protected] 4.0(latentcal)--*---------------***、[[Configration]-*、*-***..brook er:amqp://guest@local host:5672/brook erとは、設定中のメッセージ・キューであり、concurrencyとはワーク・プロセス数(デフォルトはcpu数)である.すべてのプロセスが占有されている場合、新しいタスクは待つ必要があり、eventsとは、指定されたcell ryが監視情報を送信するかどうか、queueはキューである.
作業のプロセスを終了します
直接control-c(上述の命令に従ってフロントで起動する)は、もちろんバックグラウンドで起動と停止する場合:
celery multi start w1 -A proj -l info
celery multi restart w1 -A proj -l info
#
celery multi stop w1 -A proj -l info
#
celery multi stopwait w1 -A proj -l info
デフォルトでは、現在のディレクトリでpidとロゴファイルを作成します.パスと名前を指定します.celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
-A(--app)オプションについては、指定されたcerryapは、module.path:atributeの形式で指定できますし、直接にpackageを指定することもできます.作成されたワーカーを操作するときは、パラメータなど全く同じです.pidfileとlogfileが同じであれば大丈夫です.
呼び出し
add.delay(1, 1)
add.apply_async((1, 1))
add.apply_async((2, 2), queue='lopri', countdown=10)
# countdown
#
res = add.delay(1, 1)
res.get()
celleryデフォルトでは結果が発生しない理由は、さまざまなアプリケーションのニーズが異なるため、大多数のタスクの保存リターン値は意味がないということです.また、結果はタスクや作業プロセスを監視するためのものではなく、イベントメッセージを使用する専用の監視モジュールです.res.id # id uuid
# raise propagate
res.get(propagate=False)
res.failed()
res.successful()
res.state
# PENDING -> STARTED -> SUCCESS(FAILURE)
# STARTED task_track_started @task(track_started=True)
#
# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
ワークフロータスクの署名(実装とタスクの実行オプションを含む)を別のプロセスに送信したり、パラメータを別の関数に送信したりしたい場合があります.
add.signature((2, 2), countdown=10) # with args and kwargs
add.s(2, 2) # with args only
そして得られた署名を呼び出すことができます.s1 = add.s(2, 2)
res = s1.delay()
res.get()
#
s2 = add.s(2)
res = s2.delay(8)
res.get()
呼び出し方式今まで使っていたのはdelay(*args、****kwargs)という方式です.asyncのコール方式は、実行時のオプションをサポートします.
Groups
groupは同時に起動された一連のタスクであり、特殊な結果を返してグループ内のすべての実行結果を得ることができ、タスクの順序に従ってその結果から対応するタスクの結果を得ることができる.
group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
#
g = group(add.s(i) for i in xrange(10))
g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chinsタスクを接続し、一つ実行したら結果を別の呼び出しに入れます.
chain(add.s(4, 4) | mul.s(8))().get()
g = chain(add.s(4) | mul.s(8))
g(4).get()
#
chorrdsチョルドはcalbackのあるグループです.
from celery import chord
from proj.tasks import add, xsum
chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
# group chord
(group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
#
upload_doc.s(file) | group(apply_filter.s() for filter in filters)
Routingappレベルでtaskをどの列に配置しますか?
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
呼び出し時に使用するキューを指定します.add.apply_async((2, 2), queue='hipri')
作業プロセスを開始するときに、プロセス処理のキューを指定します.celery -A proj worker -Q hipri
# :celery
celery -A proj worker -Q hipri,celery
リモートコントロール現在の作業を表示:
celery -A proj inspect active
このコマンドはラジオで、すべての作業プロセスが受信されます.(イベントの有効化が必要です)celery -A proj inspect active [email protected]
ワークプロセスをイベントを有効にするcelery -A proj control enable_events