Celeryメモ


Celery

Celeryはアプリケーションで非同期タスクを実行するためのフレームワーク

メッセージを送受信するためにメッセージブローカーという別のサービスが必要になる
メッセージブローカーにはRabbitMQやRedisなどがある

今回はRabbitMQを使う

RabbitMQ

# install
$ brew install rabbitmq
$ export PATH=$PATH:/usr/local/sbin

# start
$ rabbitmq-server

Configuring logger redirection

  ##  ##      RabbitMQ 3.8.18
  ##  ##
  ##########  Copyright (c) 2007-2021 VMware, Inc. or its affiliates.
  ######  ##
  ##########  Licensed under the MPL 2.0. Website: https://rabbitmq.com

  Erlang:      24.0.3 [jit]
  TLS Library: OpenSSL - OpenSSL 1.1.1k  25 Mar 2021

  Doc guides:  https://rabbitmq.com/documentation.html
  Support:     https://rabbitmq.com/contact.html
  Tutorials:   https://rabbitmq.com/getstarted.html
  Monitoring:  https://rabbitmq.com/monitoring.html

  Logs: /usr/local/var/log/rabbitmq/[email protected]
        /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

  Config file(s): (none)

  Starting broker... completed with 6 plugins.

その他のコマンド

# run background
$ rabbitmq-server -detached

# stop
$ rabbitmqctl stop

# show status
$ rabbitmqctl status

サーバー起動中に以下のURLを開くとGUIで状況を確認できる
http://localhost:15672/

Postgresql

タスクの状態を保持するためにデータベースを用意する

# install
$ brew install postgresql

# start
$ pg_ctl -D /usr/local/var/postgres start

# path
$ export PGDATA=/usr/local/var/postgres

# create role
$ createuser -P admin

# create database
$ createdb demo_celery -O admin

# show databases
$ psql -l

# connect database
$ psql demo_celery

Workerを起動する

参考:https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html

インストール

$ pip install celery
$ pip install psycopg2  # postgresqlをpythonで使うために必要

Celeryインスタンスの作成とタスクの定義を行うtasks.pyを作成する

tasks.py
from celery import Celery

app = Celery(
    'tasks',
    backend='db+postgresql://admin:password@localhost:5432/demo_celery',
    broker='amqp://guest:guest@localhost//'
)

@app.task
def add(x, y):
    return x + y

postgresqlのURLの指定方法:https://docs.celeryproject.org/en/stable/userguide/configuration.html#database-url-examples

Workerを起動する

$ celery -A tasks worker --loglevel=INFO

celery@HOST v5.0.5 (singularity)

[config]
.> app:         tasks:0x10f3aac40
.> transport:   amqp://guest:**@localhost:5672//
.> results:     postgresql://admin:**@localhost:5432/demo_celery
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2021-06-29 22:43:12,415: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-06-29 22:43:12,426: INFO/MainProcess] mingle: searching for neighbors
[2021-06-29 22:43:13,453: INFO/MainProcess] mingle: all alone
[2021-06-29 22:43:13,467: INFO/MainProcess] celery@HOST ready.

タスクを呼び出す

>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result.ready()
True
>>> result.get(timeout=1)
8

設定ファイル

celeryconfig.pyに設定を書き、app.config_from_objectでその設定を呼び出すことができる

celeryconfig.py
broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:password@localhost:5432/demo_celery'
tasks.py
from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y