Celery beat実戦
36942 ワード
CELERYBEATをテストする例:
celery_test
|proj
|__init__.py
|celery.py
|email_task.py
|calcu_tasks.py
start_server.sh
|proj_v1
|xxx_task.py
__init__.pyが空のファイルです.
celery.py,
email_task.py,
calcu_tasks.py,
start_server.sh,
xxx_task.pyは任意に定義された類似です
email_task.pyのceleryサービスプログラムです.
1.経験的に、異なるディレクトリで同じapp=Celery()オブジェクトを共有することはできません.例えば、上記の例のprojディレクトリの下で
celery.pyのappはproj_に共有できませんv 1ディレクトリの下の
xxx_task.pyで使用します.
2.
nohup celery -A proj worker -n default_worker -c 2 -B -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -B -Q hipri -l debug &
上
-Bオプションで複数のworkerを開く
celery beatインスタンスが2つ開いているため、1つのタスクが2回実行されます.
正しいやり方で、
nohup celery -A proj worker -n default_worker -c 2 -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -Q hipri -l debug &
nohup celery -A proj beat &
すなわち、celery beatインスタンスは1つだけ開くべきである.
3. CELERYBEAT_SCHEDULEではroutingを実現するのが簡単そうです.
4.手動構成以外にcellyタイミングタスクを追加
CELERYBEAT_SCHEDULEは、関数を使用することもできます
add_periodic_task(...)
詳しくはhttp://docs.celeryproject.org/en/master/userguide/periodic-tasks.html .
celery_test
|proj
|__init__.py
|celery.py
|email_task.py
|calcu_tasks.py
start_server.sh
|proj_v1
|xxx_task.py
__init__.pyが空のファイルです.
celery.py,
#-*-coding=utf-8-*-
from __future__ import absolute_import
from celery import Celery
from celery.schedules import crontab
from kombu import Queue
app = Celery("proj",
broker = "redis://10.121.84.90:16379/6",
include = ['proj.email_task','proj.calcu_tasks'] #!!!!!
)
app.conf.update(
CELERY_DEFAULT_QUEUE = 'default',
CELERY_QUEUES = (Queue('hipri'),),
#CELERY_ROUTES={
#"proj.email_task.do_email":{'queue':'hipri'},
#},
CELERYBEAT_SCHEDULE = {
"do_email":{
"task":"proj.email_task.do_email",
"schedule":crontab(minute="*/1"),
"args":(),
"options":{'queue':'default'}
},
"do_email_new":{
"task":"proj.email_task.do_email_new",
"schedule":crontab(minute="*/1"),
"args":(),
"options":{'queue':'hipri'}
},
"add":{
"task":"proj.calcu_tasks.add",
"schedule":crontab(minute="*/1"),
"args":(3,4),
"options":{'queue':'hipri'}
},
},
)
if __name__ == '__main__':
pass
#app.start()
email_task.py,
from __future__ import absolute_import
import sys
import os
import hashlib
import time
from proj.celery import app
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append(os.path.join(os.path.dirname(__file__), "./"))
@app.task()
def do_email():
print 'begin to email'
time.sleep(5)
print 'email complete'
@app.task()
def do_email_new():
print 'begin to email new'
time.sleep(5)
print 'email new complete'
calcu_tasks.py,
from __future__ import absolute_import
import sys
import os
import hashlib
import time
from proj.celery import app
@app.task
def add(x, y):
print '%d + %d = %d'%(x, y, x+y)
start_server.sh,
#!/bin/bash
#nohup celery -A proj worker -n default_worker -c 2 -B -Q default -l debug &
#nohup celery -A proj worker -n hipri_worker -c 2 -B -Q hipri -l debug &
#celery multi start default_worker hipri_worker -A proj -c 2 -B -Q:default_worker default -Q:hipri_worker hipri -l debug
nohup celery -A proj worker -n default_worker -c 2 -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -Q hipri -l debug &
nohup celery -A proj beat &
xxx_task.pyは任意に定義された類似です
email_task.pyのceleryサービスプログラムです.
1.経験的に、異なるディレクトリで同じapp=Celery()オブジェクトを共有することはできません.例えば、上記の例のprojディレクトリの下で
celery.pyのappはproj_に共有できませんv 1ディレクトリの下の
xxx_task.pyで使用します.
2.
nohup celery -A proj worker -n default_worker -c 2 -B -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -B -Q hipri -l debug &
上
-Bオプションで複数のworkerを開く
celery beatインスタンスが2つ開いているため、1つのタスクが2回実行されます.
正しいやり方で、
nohup celery -A proj worker -n default_worker -c 2 -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -Q hipri -l debug &
nohup celery -A proj beat &
すなわち、celery beatインスタンスは1つだけ開くべきである.
3. CELERYBEAT_SCHEDULEではroutingを実現するのが簡単そうです.
4.手動構成以外にcellyタイミングタスクを追加
CELERYBEAT_SCHEDULEは、関数を使用することもできます
add_periodic_task(...)
詳しくはhttp://docs.celeryproject.org/en/master/userguide/periodic-tasks.html .