flaskでceleryを使用する実践
前言
Web開発では、メール/メールの送信、さまざまなタスクの実行など、時間のかかる操作によく遭遇します.この場合、私たちはこれらのタスクを非同期で実行します.celeryはこのような非同期の分散タスク処理フレームワークです.官方文書今日、私たちのテーマはceleryがflaskとどのように仕事をしているかを知っています.flaskは非常にコンパクトなWebフレームワークであり、多くの拡張があり、celeryも例外ではありません.現在よく使われているいくつかのflask-celeryの拡張を見てみましょう. Flask-celery:celeryの著者が開発したのは、拡張ではなく、celeryとその関連コンポーネントをインストールする機能です.ここでは説明しません. Flask-celery-Helper:以前の拡張、作者はメンテナンスせず、現在の4.0バージョン をサポートしていません. Flask-celeryExt:4.0バージョンをサポートし、現在は比較的使いやすい拡張 これらの拡張に加えて、flaskの公式ドキュメントではflaskでceleryを使用する方法が示されていますが、それは単一ファイルでflaskを実行するdemoであり、実際のプロジェクトで使用するか、注意すべき点がたくさんあります.次に、flaskプロジェクトでceleryを使用する方法を探ってみましょう.
プロジェクト構造
公式サンプルコード
本プロジェクトでは拡張は使用されていませんが、公式ドキュメントの例に基づいてさらに適用されます.
これはceleryのファクトリ関数で、flask appの構成を使用してcelery関連のプロパティを設定し、flaskのアプリケーションコンテキストを使用できるようにceleryオブジェクトのTaskを変更することが重要です.このコードをflaskプロジェクト初期化ファイル、すなわちtestapi/_に配置します.init_\.py
celeryオブジェクトの構築
celerytask/__init_\.py
ここではタスク情報を追加するためにTaskをさらにカスタマイズしました.
タスクの作成
このタスクは簡単で、Userモデルクラスを使用してデータベースに非同期でデータを追加し、時間のかかる操作を実現するためにsleep関数シミュレーションを使用します.
ビュー関数で使用
test_api/api/v1/views.py
ビュー関数は非常に簡単で、タスクをコミットする操作しかできません.
起動とテスト
celeryの起動
ループインポートの問題を回避するために、プロジェクトルートディレクトリの下にmanageを新規作成します.py
このファイルはceleryを起動するためにのみ使用され、起動コマンドは次のとおりです.
起動に成功したことを示す出力が表示されます.
flaskを起動するには:
デバッグインタフェース:
celeryログを表示するには、次の手順に従います.
タスクの実行に成功しました.データベースデータを表示します.
データが挿入され、実験に成功しました.
まとめ
いくつかの穴がありますのでご注意ください
1.app初期化ファイルにおける青写真インポート位置の問題により循環インポートが発生し、import Error
エラーファイル:testapi/_init_\.py
celeryとリクエストインタフェースを起動すると、エラースタックは次のようにエラーが表示されます.
解決方法:
青写真のインポートを下に配置青写真登録関数のtestapi/_init_\.py:
2.celeryはflask-sqlalemyの接続構成情報を読めません
タスクをコミットし、celeryは次のようにエラーを報告しました.
デバッグを通じて、flaskのappの構成は手に入れることができることを発見しました.私たちは工場関数の中で応用コンテキストをプッシュしたので、私のデータベースの構成情報はキー値の形式で書きました.Envファイルでは、これも現在flaskが推奨している方法です.では、なぜceleryはデータベース接続構成を取得できないのでしょうか.実は、celeryを起動するappと私たちのwebサービスで使うappは2つの独立したappで、celeryは通過できません.Envの環境変数は対応する値に取得され、ここでは3つの解決策があります.環境変数を使用せずに、関連情報をプロファイルに直接書き込みます.たとえば、SQLALCHEMY_DATABASE_URI = "mysql+pymysql://xxx:[email protected]:3306/test?charset=utf8" は、システム環境変数(/etc/profile) に構成を書き込む. dotenvを使用してロード.Envの環境変数 それに比べて、シナリオ3は比較的多く採用されているので、test_api/settings.pyファイルに次のコードを追加します.
find_dotenv関数は、現在および親ディレクトリで検索されます.Envファイル、load_dotenv関数は環境変数のロードを担当します.こうして、大功が成し遂げられた.私たちはコードを楽しく続けることができます.添付:プロジェクトソース
Web開発では、メール/メールの送信、さまざまなタスクの実行など、時間のかかる操作によく遭遇します.この場合、私たちはこれらのタスクを非同期で実行します.celeryはこのような非同期の分散タスク処理フレームワークです.官方文書今日、私たちのテーマはceleryがflaskとどのように仕事をしているかを知っています.flaskは非常にコンパクトなWebフレームワークであり、多くの拡張があり、celeryも例外ではありません.現在よく使われているいくつかのflask-celeryの拡張を見てみましょう.
プロジェクト構造
├── celery_task # celery
│ ├── __init__.py
│ ├── tasks.py
│ └── test.py
├── manage.py # celery worker
├── requirements.txt #
└── test_api # flask
├── api #
│ ├── __init__.py
│ └── v1
│ ├── __init__.py
│ └── views.py
├── extensions.py #
├── __init__.py # flask app
├── models.py #
└── settings.py #
公式サンプルコード
本プロジェクトでは拡張は使用されていませんが、公式ドキュメントの例に基づいてさらに適用されます.
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
これはceleryのファクトリ関数で、flask appの構成を使用してcelery関連のプロパティを設定し、flaskのアプリケーションコンテキストを使用できるようにceleryオブジェクトのTaskを変更することが重要です.このコードをflaskプロジェクト初期化ファイル、すなわちtestapi/_に配置します.init_\.py
celeryオブジェクトの構築
celerytask/__init_\.py
rom test_api import create_app, make_celery
app = create_app()
celery = make_celery(app)
class MyTask(celery.Task): # celery
def on_success(self, retval, task_id, args, kwargs):
#
print('MyTasks , ')
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
#
# ,
print('MyTasks , ')
return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
ここではタスク情報を追加するためにTaskをさらにカスタマイズしました.
タスクの作成
import datetime
import time
import os
import random
from flask import current_app
from test_api.models import User
from test_api.extensions import db
from celery_task import celery, MyTask
@celery.task(bind=True, base=MyTask)
def apptask(self):
print(current_app.config)
print("==============%s " % current_app.config["SQLALCHEMY_DATABASE_URI"])
print("++++++++++++++%s " % os.getenv("DATABASE_URL"))
time.sleep(5)
user = User(username="user%s" % random.randint(1,100))
db.session.add(user)
db.session.commit()
return 'success'
このタスクは簡単で、Userモデルクラスを使用してデータベースに非同期でデータを追加し、時間のかかる操作を実現するためにsleep関数シミュレーションを使用します.
ビュー関数で使用
test_api/api/v1/views.py
from flask import jsonify
from celery_task.tasks import apptask
from test_api.api.v1 import api_v1
from test_api.extensions import db
from flask import current_app
@api_v1.route("/", methods=["GET"])
def index():
r = apptask.apply_async()
return jsonify({"status": "success"})
ビュー関数は非常に簡単で、タスクをコミットする操作しかできません.
起動とテスト
celeryの起動
ループインポートの問題を回避するために、プロジェクトルートディレクトリの下にmanageを新規作成します.py
from test_api import create_app, make_celery
app = create_app()
celery = make_celery(app)
if __name__ == '__main__':
app.run()
このファイルはceleryを起動するためにのみ使用され、起動コマンドは次のとおりです.
# celery worker -A manage:celery -l debug
起動に成功したことを示す出力が表示されます.
-------------- celery@test-3 v4.4.0 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2020-03-03 21:14:13
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: test_api:0x7f87c31a4e48
- ** ---------- .> transport: redis://127.0.0.1:6379/3
- ** ---------- .> results: redis://127.0.0.1:6379/4
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. celery_task.tasks.apptask
[2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Hub
[2020-03-03 21:14:13,632: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Pool
[2020-03-03 21:14:13,690: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,691: DEBUG/MainProcess] | Worker: Starting Consumer
[2020-03-03 21:14:13,691: DEBUG/MainProcess] | Consumer: Starting Connection
[2020-03-03 21:14:13,708: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3
[2020-03-03 21:14:13,708: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,708: DEBUG/MainProcess] | Consumer: Starting Events
[2020-03-03 21:14:13,718: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,718: DEBUG/MainProcess] | Consumer: Starting Mingle
[2020-03-03 21:14:13,718: INFO/MainProcess] mingle: searching for neighbors
[2020-03-03 21:14:14,743: INFO/MainProcess] mingle: all alone
[2020-03-03 21:14:14,743: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,744: DEBUG/MainProcess] | Consumer: Starting Gossip
[2020-03-03 21:14:14,748: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,748: DEBUG/MainProcess] | Consumer: Starting Heart
[2020-03-03 21:14:14,750: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,750: DEBUG/MainProcess] | Consumer: Starting Tasks
[2020-03-03 21:14:14,756: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,756: DEBUG/MainProcess] | Consumer: Starting Control
[2020-03-03 21:14:14,759: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,759: DEBUG/MainProcess] | Consumer: Starting event loop
[2020-03-03 21:14:14,759: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2020-03-03 21:14:14,760: INFO/MainProcess] celery@test-3 ready.
[2020-03-03 21:14:14,760: DEBUG/MainProcess] basic.qos: prefetch_count->8
flaskを起動するには:
# flask run
* Serving Flask app "test_api" (lazy loading)
* Environment: development
* Debug mode: on
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 237-492-852
デバッグインタフェース:
# curl http://127.0.0.1:5000/api/v1/
{
"status": "success"
}
celeryログを表示するには、次の手順に従います.
[2020-03-03 21:17:31,330: WARNING/ForkPoolWorker-2]
[2020-03-03 21:17:31,330: DEBUG/MainProcess] Task accepted: celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] pid:2341
[2020-03-03 21:17:36,391: WARNING/ForkPoolWorker-2] MyTasks ,
[2020-03-03 21:17:36,392: INFO/ForkPoolWorker-2] Task celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] succeeded in 5.0624741315841675s: 'success'
タスクの実行に成功しました.データベースデータを表示します.
mysql> select * from user order by id;
+----+----------+
| id | username |
+----+----------+
| 1 | user26 |
| 2 | user69 |
| 3 | user71 |
| 4 | user35 |
| 5 | user13 |
| 6 | user54 |
| 7 | user88 |
| 8 | user63 |
| 9 | user87 |
| 10 | user90 |
| 11 | user3 |
| 12 | user18 |
| 13 | user65 |
+----+----------+
データが挿入され、実験に成功しました.
まとめ
いくつかの穴がありますのでご注意ください
1.app初期化ファイルにおける青写真インポート位置の問題により循環インポートが発生し、import Error
エラーファイル:testapi/_init_\.py
import os
import click
from flask import Flask, jsonify
from test_api.api.v1 import api_v1 # ,
from test_api.settings import config
from test_api.models import User
from celery import Celery
def make_celery(app):
...
def create_app(config_name=None):
if config_name is None:
config_name = os.getenv('FLASK_ENV', 'development')
app = Flask('test_api')
app.config.from_object(config[config_name])
register_extensions(app)
register_blueprints(app)
register_commands(app)
register_errors(app)
return app
#
def register_blueprints(app):
app.register_blueprint(api_v1, url_prefix='/api/v1')
celeryとリクエストインタフェースを起動すると、エラースタックは次のようにエラーが表示されます.
from test_api import create_app, make_celery
File "/tmp/test/test_api/__init__.py", line 5, in
from test_api.api.v1 import api_v1
File "/tmp/test/test_api/api/v1/__init__.py", line 9, in
from test_api.api.v1 import views
File "/tmp/test/test_api/api/v1/views.py", line 2, in
from celery_task.tasks import apptask
File "/tmp/test/celery_task/__init__.py", line 1, in
from test_api import create_app, make_celery
ImportError: cannot import name 'create_app'
解決方法:
青写真のインポートを下に配置青写真登録関数のtestapi/_init_\.py:
...
def register_blueprints(app):
from test_api.api.v1 import api_v1
app.register_blueprint(api_v1, url_prefix='/api/v1')
...
2.celeryはflask-sqlalemyの接続構成情報を読めません
タスクをコミットし、celeryは次のようにエラーを報告しました.
...
options = self.get_options(sa_url, echo)
File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 575, in get_options
self._sa.apply_driver_hacks(self._app, sa_url, options)
File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 877, in apply_driver_hacks
if sa_url.drivername.startswith('mysql'):
AttributeError: 'NoneType' object has no attribute 'drivername'
デバッグを通じて、flaskのappの構成は手に入れることができることを発見しました.私たちは工場関数の中で応用コンテキストをプッシュしたので、私のデータベースの構成情報はキー値の形式で書きました.Envファイルでは、これも現在flaskが推奨している方法です.では、なぜceleryはデータベース接続構成を取得できないのでしょうか.実は、celeryを起動するappと私たちのwebサービスで使うappは2つの独立したappで、celeryは通過できません.Envの環境変数は対応する値に取得され、ここでは3つの解決策があります.
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
find_dotenv関数は、現在および親ディレクトリで検索されます.Envファイル、load_dotenv関数は環境変数のロードを担当します.こうして、大功が成し遂げられた.私たちはコードを楽しく続けることができます.添付:プロジェクトソース