flaskでceleryを使用する実践


前言
Web開発では、メール/メールの送信、さまざまなタスクの実行など、時間のかかる操作によく遭遇します.この場合、私たちはこれらのタスクを非同期で実行します.celeryはこのような非同期の分散タスク処理フレームワークです.官方文書今日、私たちのテーマはceleryがflaskとどのように仕事をしているかを知っています.flaskは非常にコンパクトなWebフレームワークであり、多くの拡張があり、celeryも例外ではありません.現在よく使われているいくつかのflask-celeryの拡張を見てみましょう.
  • Flask-celery:celeryの著者が開発したのは、拡張ではなく、celeryとその関連コンポーネントをインストールする機能です.ここでは説明しません.
  • Flask-celery-Helper:以前の拡張、作者はメンテナンスせず、現在の4.0バージョン
  • をサポートしていません.
  • Flask-celeryExt:4.0バージョンをサポートし、現在は比較的使いやすい拡張
  • これらの拡張に加えて、flaskの公式ドキュメントではflaskでceleryを使用する方法が示されていますが、それは単一ファイルでflaskを実行するdemoであり、実際のプロジェクトで使用するか、注意すべき点がたくさんあります.次に、flaskプロジェクトでceleryを使用する方法を探ってみましょう.
    プロジェクト構造
    ├── celery_task                   # celery    
    │   ├── __init__.py
    │   ├── tasks.py
    │   └── test.py
    ├── manage.py                     # celery worker  
    ├── requirements.txt              #    
    └── test_api                      # flask   
        ├── api                       #     
        │   ├── __init__.py
        │   └── v1
        │       ├── __init__.py
        │       └── views.py
        ├── extensions.py             #      
        ├── __init__.py               # flask app
        ├── models.py                 #     
        └── settings.py               #     
    

    公式サンプルコード
    本プロジェクトでは拡張は使用されていませんが、公式ドキュメントの例に基づいてさらに適用されます.
    from celery import Celery
    
    def make_celery(app):
        celery = Celery(
            app.import_name,
            backend=app.config['CELERY_RESULT_BACKEND'],
            broker=app.config['CELERY_BROKER_URL']
        )
        celery.conf.update(app.config)
    
        class ContextTask(celery.Task):
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return self.run(*args, **kwargs)
    
        celery.Task = ContextTask
        return celery

    これはceleryのファクトリ関数で、flask appの構成を使用してcelery関連のプロパティを設定し、flaskのアプリケーションコンテキストを使用できるようにceleryオブジェクトのTaskを変更することが重要です.このコードをflaskプロジェクト初期化ファイル、すなわちtestapi/_に配置します.init_\.py
    celeryオブジェクトの構築
    celerytask/__init_\.py
    rom test_api import create_app, make_celery
    
    app = create_app()
    celery = make_celery(app)
    
    class MyTask(celery.Task): # celery   
    
        def on_success(self, retval, task_id, args, kwargs):
            #        
            print('MyTasks     ,      ')
            return super(MyTask, self).on_success(retval, task_id, args, kwargs)
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            #        
            #       ,               
            print('MyTasks     ,      ')
            return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

    ここではタスク情報を追加するためにTaskをさらにカスタマイズしました.
    タスクの作成
    
    import datetime
    import time
    import os
    import random
    from flask import current_app
    from test_api.models import User
    from test_api.extensions import db
    
    from celery_task import celery, MyTask
    
    @celery.task(bind=True, base=MyTask)
    def apptask(self):
        print(current_app.config)
        print("==============%s " % current_app.config["SQLALCHEMY_DATABASE_URI"])
        print("++++++++++++++%s " % os.getenv("DATABASE_URL"))
        time.sleep(5)
        user = User(username="user%s" % random.randint(1,100))
        db.session.add(user)
        db.session.commit()
        return 'success'

    このタスクは簡単で、Userモデルクラスを使用してデータベースに非同期でデータを追加し、時間のかかる操作を実現するためにsleep関数シミュレーションを使用します.
    ビュー関数で使用
    test_api/api/v1/views.py
    from flask import jsonify
    from celery_task.tasks import apptask
    from test_api.api.v1 import api_v1
    from test_api.extensions import db
    from flask import current_app
    
    @api_v1.route("/", methods=["GET"])
    def index():
        r = apptask.apply_async()
        return jsonify({"status": "success"})

    ビュー関数は非常に簡単で、タスクをコミットする操作しかできません.
    起動とテスト
    celeryの起動
    ループインポートの問題を回避するために、プロジェクトルートディレクトリの下にmanageを新規作成します.py
    from test_api import create_app, make_celery
    
    app = create_app()
    celery = make_celery(app)
    
    if __name__ == '__main__':
        app.run()

    このファイルはceleryを起動するためにのみ使用され、起動コマンドは次のとおりです.
    # celery worker -A manage:celery -l debug

    起動に成功したことを示す出力が表示されます.
    -------------- celery@test-3 v4.4.0 (cliffs)
    --- ***** ----- 
    -- ******* ---- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2020-03-03 21:14:13
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         test_api:0x7f87c31a4e48
    - ** ---------- .> transport:   redis://127.0.0.1:6379/3
    - ** ---------- .> results:     redis://127.0.0.1:6379/4
    - *** --- * --- .> concurrency: 2 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    
    [tasks]
      . celery.accumulate
      . celery.backend_cleanup
      . celery.chain
      . celery.chord
      . celery.chord_unlock
      . celery.chunks
      . celery.group
      . celery.map
      . celery.starmap
      . celery_task.tasks.apptask
    
    [2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Hub
    [2020-03-03 21:14:13,632: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Pool
    [2020-03-03 21:14:13,690: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:13,691: DEBUG/MainProcess] | Worker: Starting Consumer
    [2020-03-03 21:14:13,691: DEBUG/MainProcess] | Consumer: Starting Connection
    [2020-03-03 21:14:13,708: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3
    [2020-03-03 21:14:13,708: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:13,708: DEBUG/MainProcess] | Consumer: Starting Events
    [2020-03-03 21:14:13,718: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:13,718: DEBUG/MainProcess] | Consumer: Starting Mingle
    [2020-03-03 21:14:13,718: INFO/MainProcess] mingle: searching for neighbors
    [2020-03-03 21:14:14,743: INFO/MainProcess] mingle: all alone
    [2020-03-03 21:14:14,743: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:14,744: DEBUG/MainProcess] | Consumer: Starting Gossip
    [2020-03-03 21:14:14,748: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:14,748: DEBUG/MainProcess] | Consumer: Starting Heart
    [2020-03-03 21:14:14,750: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:14,750: DEBUG/MainProcess] | Consumer: Starting Tasks
    [2020-03-03 21:14:14,756: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:14,756: DEBUG/MainProcess] | Consumer: Starting Control
    [2020-03-03 21:14:14,759: DEBUG/MainProcess] ^-- substep ok
    [2020-03-03 21:14:14,759: DEBUG/MainProcess] | Consumer: Starting event loop
    [2020-03-03 21:14:14,759: DEBUG/MainProcess] | Worker: Hub.register Pool...
    [2020-03-03 21:14:14,760: INFO/MainProcess] celery@test-3 ready.
    [2020-03-03 21:14:14,760: DEBUG/MainProcess] basic.qos: prefetch_count->8
    

    flaskを起動するには:
    # flask run
    
    * Serving Flask app "test_api" (lazy loading)
     * Environment: development
     * Debug mode: on
     * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
     * Restarting with stat
     * Debugger is active!
     * Debugger PIN: 237-492-852

    デバッグインタフェース:
    # curl http://127.0.0.1:5000/api/v1/
    {
      "status": "success"
    }

    celeryログを表示するには、次の手順に従います.
    [2020-03-03 21:17:31,330: WARNING/ForkPoolWorker-2] 
    [2020-03-03 21:17:31,330: DEBUG/MainProcess] Task accepted: celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] pid:2341
    [2020-03-03 21:17:36,391: WARNING/ForkPoolWorker-2] MyTasks     ,      
    [2020-03-03 21:17:36,392: INFO/ForkPoolWorker-2] Task celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] succeeded in 5.0624741315841675s: 'success'

    タスクの実行に成功しました.データベースデータを表示します.
    mysql> select * from user order by id;
    +----+----------+
    | id | username |
    +----+----------+
    |  1 | user26   |
    |  2 | user69   |
    |  3 | user71   |
    |  4 | user35   |
    |  5 | user13   |
    |  6 | user54   |
    |  7 | user88   |
    |  8 | user63   |
    |  9 | user87   |
    | 10 | user90   |
    | 11 | user3    |
    | 12 | user18   |
    | 13 | user65   |
    +----+----------+
    

    データが挿入され、実験に成功しました.
    まとめ
    いくつかの穴がありますのでご注意ください
    1.app初期化ファイルにおける青写真インポート位置の問題により循環インポートが発生し、import Error
    エラーファイル:testapi/_init_\.py
    import os 
    import click
    
    from flask import Flask, jsonify
    from test_api.api.v1 import api_v1   #        ,      
    from test_api.settings import config
    from test_api.models import User
    
    from celery import Celery
    
    def make_celery(app):
    ...
    def create_app(config_name=None):
        if config_name is None:
            config_name = os.getenv('FLASK_ENV', 'development')
    
        app = Flask('test_api')
        app.config.from_object(config[config_name])
    
        register_extensions(app)
        register_blueprints(app)
        register_commands(app)
        register_errors(app)
        return app
    
    #       
    def register_blueprints(app):
        app.register_blueprint(api_v1, url_prefix='/api/v1')

    celeryとリクエストインタフェースを起動すると、エラースタックは次のようにエラーが表示されます.
      from test_api import create_app, make_celery
      File "/tmp/test/test_api/__init__.py", line 5, in 
        from test_api.api.v1 import api_v1
      File "/tmp/test/test_api/api/v1/__init__.py", line 9, in 
        from test_api.api.v1 import views
      File "/tmp/test/test_api/api/v1/views.py", line 2, in 
        from celery_task.tasks import apptask
      File "/tmp/test/celery_task/__init__.py", line 1, in 
        from test_api import create_app, make_celery
    ImportError: cannot import name 'create_app'

    解決方法:
    青写真のインポートを下に配置青写真登録関数のtestapi/_init_\.py:
    ...
    def register_blueprints(app):
        from test_api.api.v1 import api_v1
        app.register_blueprint(api_v1, url_prefix='/api/v1')
    ...

    2.celeryはflask-sqlalemyの接続構成情報を読めません
    タスクをコミットし、celeryは次のようにエラーを報告しました.
      ...
      options = self.get_options(sa_url, echo)
      File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 575, in get_options
        self._sa.apply_driver_hacks(self._app, sa_url, options)
      File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 877, in apply_driver_hacks
        if sa_url.drivername.startswith('mysql'):
    AttributeError: 'NoneType' object has no attribute 'drivername'

    デバッグを通じて、flaskのappの構成は手に入れることができることを発見しました.私たちは工場関数の中で応用コンテキストをプッシュしたので、私のデータベースの構成情報はキー値の形式で書きました.Envファイルでは、これも現在flaskが推奨している方法です.では、なぜceleryはデータベース接続構成を取得できないのでしょうか.実は、celeryを起動するappと私たちのwebサービスで使うappは2つの独立したappで、celeryは通過できません.Envの環境変数は対応する値に取得され、ここでは3つの解決策があります.
  • 環境変数を使用せずに、関連情報をプロファイルに直接書き込みます.たとえば、SQLALCHEMY_DATABASE_URI = "mysql+pymysql://xxx:[email protected]:3306/test?charset=utf8"
  • は、システム環境変数(/etc/profile)
  • に構成を書き込む.
  • dotenvを使用してロード.Envの環境変数
  • それに比べて、シナリオ3は比較的多く採用されているので、test_api/settings.pyファイルに次のコードを追加します.
    from dotenv import find_dotenv, load_dotenv
    load_dotenv(find_dotenv())

    find_dotenv関数は、現在および親ディレクトリで検索されます.Envファイル、load_dotenv関数は環境変数のロードを担当します.こうして、大功が成し遂げられた.私たちはコードを楽しく続けることができます.添付:プロジェクトソース