celeryによる複数のキューの実装

2792 ワード

Celeryはpythonで流行しているタイミングタスクですが、使用中に2つのニーズが重要です.それはタイミングタスクと優先度です.
まず、タイミングタスク、つまり、一定の時間に一定のタスクを実行します.しかし、通常はハードコーディングが必要です.
次に、実際のビジネスでは、重要なタスクがあります.私たちは彼が優先的に実行することを望んでいます.
#app.py
# *-* coding: utf-8 *-*

from flask import Flask

import tasks
app = Flask(__name__)

@app.route('/images', methods=['GET', 'POST'])
def image():
    tasks.image.delay()
    return "image success"

@app.route('/video/', methods=['GET'])
def video():
    tasks.video.delay()
    return "video success"

@app.route('/common/', methods=['GET', 'POST'])
def common():
    tasks.common.delay()
    return "common success"

if __name__ == "__main__":
    app.run(debug=True)

appではwebアプリケーションを定義し、各インタフェースが時間がかかると仮定し、各インタフェースタスクは異なるタスクキューを探す
#tasks.py
# *-* coding: utf-8 *-*

import time
from celery import Celery

celeryapp = Celery(broker='redis://localhost:6379/2')
celeryapp.config_from_object('celeryconfig')

@celeryapp.task
def video():
    print "processing video"
    time.sleep(10)

@celeryapp.task
def image():
    print "processing image"
    time.sleep(5)

@celeryapp.task
def common():
    print "processing common"
    time.sleep(3)

#celeryconfig.py
# *-* coding:utf-8 *-*

from kombu import Exchange, Queue

CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_BROKER = 'redis://locahost:6379/1'

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

#  3      
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image'),
)

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTTING_KEY = 'default'

#                     ,   routing_key
CELERY_ROUTES ={
        'tasks.image': {

            'queue': 'images',
            'routing_key': 'media.image'
            },

        'tasks.video': {

            'queue': 'videos',
            'routing_key': 'media.video'
            }
}
CELERY_IMPORTS = ('tasks')

Webアプリケーションpython appを起動します.py
別のworker celery worker-A tasks--loglevel=info--queues=videosを起動videoを処理するキューを起動
Celery worker-A tasks--loglevel=info--queues=images#imageを処理するキューを開始
Celery worker-A tasks--loglevel=info--queues=default#ルーティングに参加していない場合はデフォルトのキューに割り当てられます
これにより、異なるタスクが異なるキューに割り当てられます.