python非同期インタフェースのテスト方法
2626 ワード
業務処理に時間がかかる場合、インタフェースは一般的に非同期処理方式を採用する、この非同期処理方式はFutureモードとも呼ばれる.
一般的なプロセスでは、非同期インタフェースを要求すると、インタフェースはすぐに処理を開始したことを示す結果を返します.結果には、タスクidのようなものが含まれて結果を追跡します.また、クエリー結果のインタフェースが提供されます.結果が処理されていない場合、クエリーインタフェースは対応する「未完了」状態を返し、処理が完了すると対応するデータが返されます.
非同期インタフェースの処理一般的には、インタフェースが指定したデータまたはタイムアウトに完了/クエリーされるまで、クエリー結果を要求するインタフェースを一定時間間隔でポーリングする方法があります.
非同期インタフェースに追跡idとクエリーインタフェースが提供されていない場合は、指定した結果またはタイムアウトが取得されるまで、同じ方法でデータベースデータまたはログデータをポーリングできます.
サンプルインタフェースオーダーインタフェース を作成する
要求アドレス
要求フォーマットPOST:フォーム
パラメータ
を選択します.
説明
user_id
String
ユーザid
goods_id
String
商品ID
num
int
数量
amount
float
合計価格
応答例ではパラメータがありません:
成功:受注結果インタフェース を取得する.
要求アドレス
パラメータ
を選択します.
説明
order_id
String
オーダーID
応答例の作成:
作成に成功しました:
Pythonの実現方法
転載先:https://www.jianshu.com/p/eecf0c985ba0
一般的なプロセスでは、非同期インタフェースを要求すると、インタフェースはすぐに処理を開始したことを示す結果を返します.結果には、タスクidのようなものが含まれて結果を追跡します.また、クエリー結果のインタフェースが提供されます.結果が処理されていない場合、クエリーインタフェースは対応する「未完了」状態を返し、処理が完了すると対応するデータが返されます.
非同期インタフェースの処理一般的には、インタフェースが指定したデータまたはタイムアウトに完了/クエリーされるまで、クエリー結果を要求するインタフェースを一定時間間隔でポーリングする方法があります.
非同期インタフェースに追跡idとクエリーインタフェースが提供されていない場合は、指定した結果またはタイムアウトが取得されるまで、同じ方法でデータベースデータまたはログデータをポーリングできます.
サンプルインタフェース
要求アドレス
http://115.28.108.130:5000/api/order/create/
リクエストメソッドGET/POST要求フォーマットPOST:フォーム
パラメータ
を選択します.
説明
user_id
String
ユーザid
goods_id
String
商品ID
num
int
数量
amount
float
合計価格
応答例ではパラメータがありません:
{
"msg": " "
}
成功:
{
"order_id": "69561"
}
要求アドレス
http://115.28.108.130:5000/api/order/get_result/?order_id=***
リクエストメソッドGETパラメータ
を選択します.
説明
order_id
String
オーダーID
応答例の作成:
{}
作成に成功しました:
{
"amount": "20.0",
"goods_id": "123",
"num": "2",
"user_id": "123"
}
Pythonの実現方法
import time
import requests
def create_order():
url = "http://115.28.108.130:5000/api/order/create/" #
data = {
"user_id": "1234",
"goods_id": "136",
"num": 10,
"amount": 20.00
}
res = requests.post(url=url, data=data)
return res.json().get("order_id") # order_id
def get_order_result(interval = 1, time_out = 60): # ,
order_id = create_order()
#
url = "http://115.28.108.130:5000/api/order/get_result/?order_id={}".format(order_id)
start_time = time.time() #
end_time = start_time + time_out + =
count = 1 # , ,
while time.time() < end_time: # ,
res = requests.get(url) #
print(count, res.json())
count += 1
time.sleep(interval) #
if res.json(): #
break
else:
return None # ( end_time, break ) None
return res.json() # break
if __name__ == '__main__':
order_result = get_order_result()
print(order_result)
転載先:https://www.jianshu.com/p/eecf0c985ba0