PythonのFlashkフレームアプリケーションはRedisキューデータを呼び出す方法を適用します。
任務が非同期化する
ブラウザを開けて、住所を入力して、車を押して、ページを開けました。すると、HTTP要求がクライアントからサーバに送信され、サーバが要求を処理し、応答内容を返す。
私たちは毎日ホームページを見て、サーバーに大小の要求を送ります。サーバーが要求を受けたら、他のサーバーにも要求を送る必要があります。サーバーも他の仕事をする必要があります。だから、最初に送った要求はブロックされました。つまり、サーバーが他の仕事を完成するのを待つということです。
より多くの場合、サーバーがする追加の仕事は、クライアントが待つ必要はありません。この場合は、これらの追加のことを非同期に行うことができます。非同期の仕事をする道具はたくさんあります。主な原理は、通知メッセージを処理するか、通知メッセージに対しては、通常、キュー構造である。生産と消費のメッセージは通信と業務の実現を行う。
生産消費と行列
上述の非同期的な任務の実現は、生産者の消費モデルとして抽象化できる。レストランのように料理人が食事をしています。料理人がたくさん作ったら、しばらくは売れ残ったら、シェフは休みになります。もしお客さんが多いなら、コックさんは忙しくて、お客さんはゆっくりと待つ必要があります。生産者と消費者を実現する方法はたくさんあります。Python標準ライブラリQueを使って、小さな例を書きます。
Pythonは使いやすいキュー構造を内蔵しています。私たちは同じような操作をredisで実現することもできます。そして簡単な非同期の仕事をします。
Redisは、メッセージキューとしての2つの方法を提供する。一つは生産者消費モデルを使うことで、もう一つは予約者モードを発表することです。前者は、1つ以上のクライアントにメッセージ・キューを傍受させ、メッセージが到着すると、消費者はすぐに消費され、先に誰かを計算します。もし列にメッセージがないなら、消費者は引き続き傍受します。後者も1つ以上のクライアントがメッセージチャンネルを購読しています。発表者がメッセージを発表すれば、すべての購読者はメッセージを受信できます。購読者はpingです。
生産消費モデル
主にredisによって提供されたblpopを使ってキューデータを取得します。キューにデータがないと待ち時間が詰まります。つまり傍受です。
redisのpbsub機能を使って、購読者はチャンネルを購読して、発表者はニュースを発表してチャンネルに着いて、チャンネルは1つのニュースの隊列です。
私たちはそれぞれ2つの非同期タスクのバックエンドサービスを実現して、直接彼らを起動して、redisキューまたはチャンネルのニュースをモニターすることができます。簡単なテストは以下の通りです。
ブラウザを開けて、住所を入力して、車を押して、ページを開けました。すると、HTTP要求がクライアントからサーバに送信され、サーバが要求を処理し、応答内容を返す。
私たちは毎日ホームページを見て、サーバーに大小の要求を送ります。サーバーが要求を受けたら、他のサーバーにも要求を送る必要があります。サーバーも他の仕事をする必要があります。だから、最初に送った要求はブロックされました。つまり、サーバーが他の仕事を完成するのを待つということです。
より多くの場合、サーバーがする追加の仕事は、クライアントが待つ必要はありません。この場合は、これらの追加のことを非同期に行うことができます。非同期の仕事をする道具はたくさんあります。主な原理は、通知メッセージを処理するか、通知メッセージに対しては、通常、キュー構造である。生産と消費のメッセージは通信と業務の実現を行う。
生産消費と行列
上述の非同期的な任務の実現は、生産者の消費モデルとして抽象化できる。レストランのように料理人が食事をしています。料理人がたくさん作ったら、しばらくは売れ残ったら、シェフは休みになります。もしお客さんが多いなら、コックさんは忙しくて、お客さんはゆっくりと待つ必要があります。生産者と消費者を実現する方法はたくさんあります。Python標準ライブラリQueを使って、小さな例を書きます。
import random
import time
from Queue import Queue
from threading import Thread
queue = Queue(10)
class Producer(Thread):
def run(self):
while True:
elem = random.randrange(9)
queue.put(elem)
print " {} {} --- {} ".format(self.name, elem, queue.qsize())
time.sleep(random.random())
class Consumer(Thread):
def run(self):
while True:
elem = queue.get()
print " {} {} --- {} ".format(self.name, elem, queue.qsize())
time.sleep(random.random())
def main():
for i in range(3):
p = Producer()
p.start()
for i in range(2):
c = Consumer()
c.start()
if __name__ == '__main__':
main()
大体の出力は以下の通りです。
Thread-1 1 --- 1
Thread-2 8 --- 2
Thread-3 3 --- 3
Thread-4 1 --- 2
Thread-5 8 --- 1
Thread-4 3 --- 0
Thread-1 0 --- 1
Thread-2 0 --- 2
Thread-1 1 --- 3
Thread-1 1 --- 4
Thread-4 0 --- 3
Thread-3 3 --- 4
Thread-5 0 --- 3
Thread-5 1 --- 2
Thread-2 8 --- 3
Thread-2 8 --- 4
RedisキューPythonは使いやすいキュー構造を内蔵しています。私たちは同じような操作をredisで実現することもできます。そして簡単な非同期の仕事をします。
Redisは、メッセージキューとしての2つの方法を提供する。一つは生産者消費モデルを使うことで、もう一つは予約者モードを発表することです。前者は、1つ以上のクライアントにメッセージ・キューを傍受させ、メッセージが到着すると、消費者はすぐに消費され、先に誰かを計算します。もし列にメッセージがないなら、消費者は引き続き傍受します。後者も1つ以上のクライアントがメッセージチャンネルを購読しています。発表者がメッセージを発表すれば、すべての購読者はメッセージを受信できます。購読者はpingです。
生産消費モデル
主にredisによって提供されたblpopを使ってキューデータを取得します。キューにデータがないと待ち時間が詰まります。つまり傍受です。
import redis
class Task(object):
def __init__(self):
self.rcon = redis.StrictRedis(host='localhost', db=5)
self.queue = 'task:prodcons:queue'
def listen_task(self):
while True:
task = self.rcon.blpop(self.queue, 0)[1]
print "Task get", task
if __name__ == '__main__':
print 'listen task queue'
Task().listen_task()
購読モードを公開redisのpbsub機能を使って、購読者はチャンネルを購読して、発表者はニュースを発表してチャンネルに着いて、チャンネルは1つのニュースの隊列です。
import redis
class Task(object):
def __init__(self):
self.rcon = redis.StrictRedis(host='localhost', db=5)
self.ps = self.rcon.pubsub()
self.ps.subscribe('task:pubsub:channel')
def listen_task(self):
for i in self.ps.listen():
if i['type'] == 'message':
print "Task get", i['data']
if __name__ == '__main__':
print 'listen task channel'
Task().listen_task()
Flashk入口私たちはそれぞれ2つの非同期タスクのバックエンドサービスを実現して、直接彼らを起動して、redisキューまたはチャンネルのニュースをモニターすることができます。簡単なテストは以下の通りです。
import redis
import random
import logging
from flask import Flask, redirect
app = Flask(__name__)
rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'
@app.route('/')
def index():
html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons"> </a>
<br>
<br>
<a href="/pubsub"> </a>
</center>
"""
return html
@app.route('/prodcons')
def prodcons():
elem = random.randrange(10)
rcon.lpush(prodcons_queue, elem)
logging.info("lpush {} -- {}".format(prodcons_queue, elem))
return redirect('/')
@app.route('/pubsub')
def pubsub():
ps = rcon.pubsub()
ps.subscribe(pubsub_channel)
elem = random.randrange(10)
rcon.publish(pubsub_channel, elem)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)
スクリプトを開始します。
siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub
非同期メッセージは、傍受されたスクリプト入力にそれぞれ見られます。非同期のタスクでは、当然ながら、現在のこれらの方法は非同期の実行結果を知らないので、非同期の実行結果を知る必要があれば、設計協働タスクやRQやcelleryなどのツールを使用することも考えられます。