义齿

4644 ワード

Celeryは非同期フレームワークであり、一部の業務では非同期処理が必要であり、例えば服の画像をアップロードし、この画像の識別情報を得る:例えば上下2件、自動識別ラベルタイプ分類、アンカー情報
このトレーニングにはgpuが必要であるため,ユーザが画像インタフェースをアップロードして終了した後,まずOKを返し,後でインタフェースの結果をポーリングし,途中でceleryを失う.
Celeryのタスクファイルtask.pyとapp.pyまたはmain.pyのディレクトリは一致していて、中のredisアドレスは私が起動したdockerコンテナで、露出ポート6379、第8ライブラリです.
Celery自身がタスク実行結果をtask_に基づいてidはredisに書き込まれ、task_に従ってidは自分で出すので、redisが配置されていればOK、自分でredisに取りに行く必要はありません
import connexion
from celery import Celery

app = connexion.FlaskApp(__name__, specification_dir='.')

application = app.app
# Celery configuration db 8
application.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/8'
application.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/8'


def make_celery(app):
    celery = Celery(app.name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery(app.app)


@celery.task(bind=True)
def calling_api(self, url):

    # try:
    #     lenth = len(url)
    # except SomeNetworkException as e:
    #     print("maybe do some clenup here....")
    # self.retry(e)

    result = [url, len(url), "helloworld"]
    return result

上のタスク関数は比較的に独立して、urlに入って、[url,len(url),“helloworld”]を返して、簡単な入力パラメータの和を求めるのと同じです
他の場所でsimpleを書きましたtest.py
# coding=utf-8

import copy

from flask import Flask, request, render_template, session, flash, redirect, \
    url_for, jsonify

RESULT = {
    'code': 0,
    'message': 'Success',
    'data': {}
}

from ...task import calling_api


def send_data():
    result = copy.deepcopy(RESULT)

    ret = request.get_json()
    print("request.get_json():", ret)
    url = ret.get("url")
    if url is not None:
        print("url:", url)
        # outcome = calling_api.apply_async(args=[url], countdown=60)
        outcome = calling_api.apply_async(args=[url])
        # outcome = calling_api.delay(url)
        print("outcome:", outcome)
        print("outcome type:", type(outcome))
        print("outcome dict:", type(outcome.__dict__()))
       
        outcome_dict = {}
        outcome['task_id'] = str(outcome)
        result['data'] = outcome_dict

    else:
        result['code'] = 1
        result['message'] = "failed"

    return jsonify(result)


def get_data(task_id):
    result = copy.deepcopy(RESULT)
    task = calling_api.AsyncResult(task_id)
    # task = calling_api.AsyncResult(task_id)

    task_id = "{0}".format(task.id)
    print(type(task_id), task_id)

    if task.status == 'SUCCESS':
        print("ok")
        result_dict = {}
        result_dict["status"] = task.status
        result_dict["ready"] = task.ready()
        result_dict["result"] = task.result
        result_dict["state"] = task.state
        result_dict["id"] = task.id
        result['data'] = result_dict
    else:
        result['code'] = 1

    return jsonify(result)

 
タスクインタフェースの実行:インタフェースがurlに転送され、outcome=calling_が呼び出されます.api.apply_async(args=[url])はtask_を返します.id、urlを1つ失うたびにtaskが返されます.id
タスクインタフェースを返します:task=calling_api.AsyncResult(task_id)はタスク結果を返し,状態から成功したと判断すればよい.
 
コマンドの実行:
  task.py  :
celery -A task.celery worker -l info
# celery -A task.celery worker -l info --logfile=./celerylog.log
  celery      ,    -d     

         ,  app.py  main.py  :
uwsgi --http :20000 -w app.main

  :
     :
: post http://localhost:20000/testing request: application/json {url="xxxxx.jpg"} response: "task_id": "44057af4-3e14-468a-a36d-8c31e3665bce" task_id get http://localhost:20000/testing/{task_id} response: { "id": "44057af4-3e14-468a-a36d-8c31e3665bc", "ready": true, "result": [ "http://s3.xxxx.com/ba7571a95d64eaa69a49912f26816e2f.jpg", 60, "helloworld" ], "state": "SUCCESS", "status": "SUCCESS" },

  
 
転載先:https://www.cnblogs.com/adamans/articles/10532050.html