celeryによる複数のキューの実装
2792 ワード
Celeryはpythonで流行しているタイミングタスクですが、使用中に2つのニーズが重要です.それはタイミングタスクと優先度です.
まず、タイミングタスク、つまり、一定の時間に一定のタスクを実行します.しかし、通常はハードコーディングが必要です.
次に、実際のビジネスでは、重要なタスクがあります.私たちは彼が優先的に実行することを望んでいます.
#app.py
appではwebアプリケーションを定義し、各インタフェースが時間がかかると仮定し、各インタフェースタスクは異なるタスクキューを探す
#tasks.py
#celeryconfig.py
Webアプリケーションpython appを起動します.py
別のworker celery worker-A tasks--loglevel=info--queues=videosを起動videoを処理するキューを起動
Celery worker-A tasks--loglevel=info--queues=images#imageを処理するキューを開始
Celery worker-A tasks--loglevel=info--queues=default#ルーティングに参加していない場合はデフォルトのキューに割り当てられます
これにより、異なるタスクが異なるキューに割り当てられます.
まず、タイミングタスク、つまり、一定の時間に一定のタスクを実行します.しかし、通常はハードコーディングが必要です.
次に、実際のビジネスでは、重要なタスクがあります.私たちは彼が優先的に実行することを望んでいます.
#app.py
# *-* coding: utf-8 *-*
from flask import Flask
import tasks
app = Flask(__name__)
@app.route('/images', methods=['GET', 'POST'])
def image():
tasks.image.delay()
return "image success"
@app.route('/video/', methods=['GET'])
def video():
tasks.video.delay()
return "video success"
@app.route('/common/', methods=['GET', 'POST'])
def common():
tasks.common.delay()
return "common success"
if __name__ == "__main__":
app.run(debug=True)
appではwebアプリケーションを定義し、各インタフェースが時間がかかると仮定し、各インタフェースタスクは異なるタスクキューを探す
#tasks.py
# *-* coding: utf-8 *-*
import time
from celery import Celery
celeryapp = Celery(broker='redis://localhost:6379/2')
celeryapp.config_from_object('celeryconfig')
@celeryapp.task
def video():
print "processing video"
time.sleep(10)
@celeryapp.task
def image():
print "processing image"
time.sleep(5)
@celeryapp.task
def common():
print "processing common"
time.sleep(3)
#celeryconfig.py
# *-* coding:utf-8 *-*
from kombu import Exchange, Queue
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_BROKER = 'redis://locahost:6379/1'
default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')
# 3
CELERY_QUEUES = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image'),
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTTING_KEY = 'default'
# , routing_key
CELERY_ROUTES ={
'tasks.image': {
'queue': 'images',
'routing_key': 'media.image'
},
'tasks.video': {
'queue': 'videos',
'routing_key': 'media.video'
}
}
CELERY_IMPORTS = ('tasks')
Webアプリケーションpython appを起動します.py
別のworker celery worker-A tasks--loglevel=info--queues=videosを起動videoを処理するキューを起動
Celery worker-A tasks--loglevel=info--queues=images#imageを処理するキューを開始
Celery worker-A tasks--loglevel=info--queues=default#ルーティングに参加していない場合はデフォルトのキューに割り当てられます
これにより、異なるタスクが異なるキューに割り当てられます.