python非同期インタフェースのテスト方法

2626 ワード

業務処理に時間がかかる場合、インタフェースは一般的に非同期処理方式を採用する、この非同期処理方式はFutureモードとも呼ばれる.
一般的なプロセスでは、非同期インタフェースを要求すると、インタフェースはすぐに処理を開始したことを示す結果を返します.結果には、タスクidのようなものが含まれて結果を追跡します.また、クエリー結果のインタフェースが提供されます.結果が処理されていない場合、クエリーインタフェースは対応する「未完了」状態を返し、処理が完了すると対応するデータが返されます.
非同期インタフェースの処理一般的には、インタフェースが指定したデータまたはタイムアウトに完了/クエリーされるまで、クエリー結果を要求するインタフェースを一定時間間隔でポーリングする方法があります.
非同期インタフェースに追跡idとクエリーインタフェースが提供されていない場合は、指定した結果またはタイムアウトが取得されるまで、同じ方法でデータベースデータまたはログデータをポーリングできます.
サンプルインタフェース
  • オーダーインタフェース
  • を作成する
    要求アドレスhttp://115.28.108.130:5000/api/order/create/リクエストメソッドGET/POST
    要求フォーマットPOST:フォーム
    パラメータ
    を選択します.
    説明
    user_id
    String
    ユーザid
    goods_id
    String
    商品ID
    num
    int
    数量
    amount
    float
    合計価格
    応答例ではパラメータがありません:
    {
        "msg": "    "
    }
    

    成功:
    {
        "order_id": "69561"
    }
    
  • 受注結果インタフェース
  • を取得する.
    要求アドレスhttp://115.28.108.130:5000/api/order/get_result/?order_id=***リクエストメソッドGET
    パラメータ
    を選択します.
    説明
    order_id
    String
    オーダーID
    応答例の作成:
    {}
    

    作成に成功しました:
    {
        "amount": "20.0",
        "goods_id": "123",
        "num": "2",
        "user_id": "123"
    }
    

    Pythonの実現方法
    import time
    import requests
    
    def create_order():
        url = "http://115.28.108.130:5000/api/order/create/"   #     
        data = {
            "user_id": "1234",
            "goods_id": "136",
            "num": 10,
            "amount": 20.00
        }
        res = requests.post(url=url, data=data)
        return res.json().get("order_id")  #   order_id    
    
    def get_order_result(interval = 1, time_out = 60):  #               ,    
        order_id = create_order()
        #       
        url = "http://115.28.108.130:5000/api/order/get_result/?order_id={}".format(order_id) 
        start_time = time.time()  #     
        end_time = start_time + time_out      +    =    
        count = 1  #    ,           ,     
        while time.time() < end_time:   #         ,     
            res = requests.get(url)  #         
            print(count, res.json())
            count += 1
            time.sleep(interval)   #       
            if res.json():   #           
                break
        else:
            return None   #     (  end_time,  break  )   None
        return res.json()  # break         
    
    
    if __name__ == '__main__':
        order_result = get_order_result()
        print(order_result)
    

    転載先:https://www.jianshu.com/p/eecf0c985ba0