PythonのFlashkフレームアプリケーションはRedisキューデータを呼び出す方法を適用します。

5433 ワード

任務が非同期化する
ブラウザを開けて、住所を入力して、車を押して、ページを開けました。すると、HTTP要求がクライアントからサーバに送信され、サーバが要求を処理し、応答内容を返す。
私たちは毎日ホームページを見て、サーバーに大小の要求を送ります。サーバーが要求を受けたら、他のサーバーにも要求を送る必要があります。サーバーも他の仕事をする必要があります。だから、最初に送った要求はブロックされました。つまり、サーバーが他の仕事を完成するのを待つということです。
より多くの場合、サーバーがする追加の仕事は、クライアントが待つ必要はありません。この場合は、これらの追加のことを非同期に行うことができます。非同期の仕事をする道具はたくさんあります。主な原理は、通知メッセージを処理するか、通知メッセージに対しては、通常、キュー構造である。生産と消費のメッセージは通信と業務の実現を行う。
生産消費と行列
上述の非同期的な任務の実現は、生産者の消費モデルとして抽象化できる。レストランのように料理人が食事をしています。料理人がたくさん作ったら、しばらくは売れ残ったら、シェフは休みになります。もしお客さんが多いなら、コックさんは忙しくて、お客さんはゆっくりと待つ必要があります。生産者と消費者を実現する方法はたくさんあります。Python標準ライブラリQueを使って、小さな例を書きます。

import random
import time
from Queue import Queue
from threading import Thread

queue = Queue(10)

class Producer(Thread):
  def run(self):
    while True:
      elem = random.randrange(9)
      queue.put(elem)
      print "   {}    {}   ---    {}     ".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

class Consumer(Thread):
  def run(self):
    while True:
      elem = queue.get()
      print "  {}    {}   ---    {}     ".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

def main():
  for i in range(3):
    p = Producer()
    p.start()
  for i in range(2):
    c = Consumer()
    c.start()

if __name__ == '__main__':
  main()

大体の出力は以下の通りです。

   Thread-1    1   ---    1     
   Thread-2    8   ---    2     
   Thread-3    3   ---    3     
  Thread-4    1   ---    2     
  Thread-5    8   ---    1     
  Thread-4    3   ---    0     
   Thread-1    0   ---    1     
   Thread-2    0   ---    2     
   Thread-1    1   ---    3     
   Thread-1    1   ---    4     
  Thread-4    0   ---    3     
   Thread-3    3   ---    4     
  Thread-5    0   ---    3     
  Thread-5    1   ---    2     
   Thread-2    8   ---    3     
   Thread-2    8   ---    4     
Redisキュー
Pythonは使いやすいキュー構造を内蔵しています。私たちは同じような操作をredisで実現することもできます。そして簡単な非同期の仕事をします。
Redisは、メッセージキューとしての2つの方法を提供する。一つは生産者消費モデルを使うことで、もう一つは予約者モードを発表することです。前者は、1つ以上のクライアントにメッセージ・キューを傍受させ、メッセージが到着すると、消費者はすぐに消費され、先に誰かを計算します。もし列にメッセージがないなら、消費者は引き続き傍受します。後者も1つ以上のクライアントがメッセージチャンネルを購読しています。発表者がメッセージを発表すれば、すべての購読者はメッセージを受信できます。購読者はpingです。
生産消費モデル
主にredisによって提供されたblpopを使ってキューデータを取得します。キューにデータがないと待ち時間が詰まります。つまり傍受です。

import redis

class Task(object):
  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.queue = 'task:prodcons:queue'

  def listen_task(self):
    while True:
      task = self.rcon.blpop(self.queue, 0)[1]
      print "Task get", task

if __name__ == '__main__':
  print 'listen task queue'
  Task().listen_task()

購読モードを公開
redisのpbsub機能を使って、購読者はチャンネルを購読して、発表者はニュースを発表してチャンネルに着いて、チャンネルは1つのニュースの隊列です。

import redis


class Task(object):

  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.ps = self.rcon.pubsub()
    self.ps.subscribe('task:pubsub:channel')

  def listen_task(self):
    for i in self.ps.listen():
      if i['type'] == 'message':
        print "Task get", i['data']

if __name__ == '__main__':
  print 'listen task channel'
  Task().listen_task()

Flashk入口
私たちはそれぞれ2つの非同期タスクのバックエンドサービスを実現して、直接彼らを起動して、redisキューまたはチャンネルのニュースをモニターすることができます。簡単なテストは以下の通りです。

import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'

@app.route('/')
def index():

  html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">       </a>
<br>
<br>
<a href="/pubsub">       </a>
</center>
"""
  return html


@app.route('/prodcons')
def prodcons():
  elem = random.randrange(10)
  rcon.lpush(prodcons_queue, elem)
  logging.info("lpush {} -- {}".format(prodcons_queue, elem))
  return redirect('/')

@app.route('/pubsub')
def pubsub():
  ps = rcon.pubsub()
  ps.subscribe(pubsub_channel)
  elem = random.randrange(10)
  rcon.publish(pubsub_channel, elem)
  return redirect('/')

if __name__ == '__main__':
  app.run(debug=True)

スクリプトを開始します。

siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub
非同期メッセージは、傍受されたスクリプト入力にそれぞれ見られます。非同期のタスクでは、当然ながら、現在のこれらの方法は非同期の実行結果を知らないので、非同期の実行結果を知る必要があれば、設計協働タスクやRQやcelleryなどのツールを使用することも考えられます。