Celery beat実戦


CELERYBEATをテストする例:
celery_test
    |proj
    
    |__init__.py
    
    |celery.py
    
    |email_task.py
    
    |calcu_tasks.py
    start_server.sh
    |proj_v1
    
    |xxx_task.py
__init__.pyが空のファイルです.
celery.py,

    
    
    
    
#-*-coding=utf-8-*-
from __future__ import absolute_import
from celery import Celery
from celery.schedules import crontab
from kombu import Queue
 
app = Celery("proj",
             broker = "redis://10.121.84.90:16379/6",
             include = ['proj.email_task','proj.calcu_tasks'] #!!!!!
             )
 
app.conf.update(
        CELERY_DEFAULT_QUEUE = 'default',
        CELERY_QUEUES = (Queue('hipri'),),
        #CELERY_ROUTES={
        #"proj.email_task.do_email":{'queue':'hipri'},
        #},
        CELERYBEAT_SCHEDULE = {
            "do_email":{
                "task":"proj.email_task.do_email",
                "schedule":crontab(minute="*/1"),
                "args":(),
                "options":{'queue':'default'}
                },
            "do_email_new":{
                "task":"proj.email_task.do_email_new",
                "schedule":crontab(minute="*/1"),
                "args":(),
                "options":{'queue':'hipri'}
                },
            "add":{
                "task":"proj.calcu_tasks.add",
                "schedule":crontab(minute="*/1"),
                "args":(3,4),
                "options":{'queue':'hipri'}
                },
            },
    )
 
if __name__ == '__main__':
    pass
    #app.start()

email_task.py,

    
    
    
    
from __future__ import absolute_import
 
import sys
import os
import hashlib
import time
 
from proj.celery import app
 
reload(sys)
sys.setdefaultencoding('utf-8')
 
sys.path.append(os.path.join(os.path.dirname(__file__), "./"))
 
@app.task()
def do_email():
    print 'begin to email'
    time.sleep(5)
    print 'email complete'
 
@app.task()
def do_email_new():
    print 'begin to email new'
    time.sleep(5)
    print 'email new complete'

calcu_tasks.py,

    
    
    
    
from __future__ import absolute_import
 
import sys
import os
import hashlib
import time
 
from proj.celery import app
 
@app.task
def add(x, y):
    print '%d + %d = %d'%(x, y, x+y)

start_server.sh,

    
    
    
    
#!/bin/bash
#nohup celery -A proj worker -n default_worker -c 2 -B -Q default -l debug &
#nohup celery -A proj worker -n hipri_worker -c 2 -B -Q hipri -l debug &
 
#celery multi start default_worker hipri_worker -A proj -c 2 -B -Q:default_worker default -Q:hipri_worker hipri -l debug
 
nohup celery -A proj worker -n default_worker -c 2 -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -Q hipri -l debug &
nohup celery -A proj beat &

xxx_task.pyは任意に定義された類似です
email_task.pyのceleryサービスプログラムです.
1.経験的に、異なるディレクトリで同じapp=Celery()オブジェクトを共有することはできません.例えば、上記の例のprojディレクトリの下で
celery.pyのappはproj_に共有できませんv 1ディレクトリの下の
xxx_task.pyで使用します.
2. 
nohup celery -A proj worker -n default_worker -c 2 -B -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -B -Q hipri -l debug &

-Bオプションで複数のworkerを開く
celery beatインスタンスが2つ開いているため、1つのタスクが2回実行されます.
正しいやり方で、
nohup celery -A proj worker -n default_worker -c 2 -Q default -l debug &
nohup celery -A proj worker -n hipri_worker -c 2 -Q hipri -l debug &
nohup celery -A proj beat &
すなわち、celery beatインスタンスは1つだけ開くべきである.
3. CELERYBEAT_SCHEDULEではroutingを実現するのが簡単そうです.
4.手動構成以外にcellyタイミングタスクを追加
CELERYBEAT_SCHEDULEは、関数を使用することもできます
add_periodic_task(...)
詳しくはhttp://docs.celeryproject.org/en/master/userguide/periodic-tasks.html .