優先度をサポートするメッセージキューをredisで実装


メッセージキューが必要な理由
システムにメッセージキューメカニズムを導入することは、システムにとって非常に大きな改善である.たとえば、Webシステムでは、ユーザーが操作をした後、メール通知をユーザーメールボックスに送信する必要があります.同期方式を使用すると、メールの送信が完了した後にユーザーにフィードバックするのを待つことができますが、ネットワークの不確実性によってユーザーが長時間待つことになり、ユーザー体験に影響を与える可能性があります.
一部のシーンでは、同期方式で完了を待つことはできません.バックグラウンドで多くの時間を費やす操作が必要です.たとえば、極端な例では、バックグラウンドコンパイルが完了するのに30分かかるオンラインコンパイルシステムタスクがあります.このようなシーンの設計では、同期して待機した後にフィードバックすることはできません.まず、ユーザーにフィードバックしてから非同期処理が完了し、処理が完了した後、状況に応じてユーザーにフィードバックする必要があります.
またメッセージキューが適用される場合,それらのシステム処理能力が限られている場合,まずキューメカニズムを用いてタスクを一時的に格納し,システムはキューに並んだタスクを順番に処理する.これにより,システムスループットが不足している場合でも,高同時タスクを安定的に処理できる.
メッセージキューはキューメカニズムとして使用でき、システムがキューメカニズムを使用する必要がある場所でメッセージキューを使用することができます.
rabbitmqの優先順位の方法
現在、成熟したメッセージキュー製品は多く、rabbitmqなどの有名な製品があります.使用するのは比較的簡単で、機能も比較的豊富で、一般的な場合は完全に十分です.しかし、優先度がサポートされていないのが煩わしい.例えば、メールを送信するタスクでは、一部の特権ユーザーは、少なくとも一般ユーザーよりも優先的にメールをよりタイムリーに送信することを望んでいます.デフォルトではrabbitmqは処理できませんが、rabbitmqに投げつけるタスクはFIFOが先に出ます.しかし、これらの優先度をサポートするために、いくつかの融通性のあるテクニックを使用することができます.複数のキューを作成し、rabbitmqの消費者に対応するルーティングルールを設定します.
例えばデフォルトではこのようなキューがあり,listを用いて[task 1,task 2,task 3]をシミュレートし,消費者はFIFOの原則に従って順番にtaskを取り出して処理する.優先度の高いタスクが入ってくると、最後に処理される[task 1,task 2,task 3,higitask 1]に従うしかない.ただし、2つのキュー、1つの高優先度キュー、1つの一般優先度キューを使用するとします.一般優先度[task 1,task 2,task 3],高優先度[hightask 1]では,消費者のルーティングを設定し,任意のキューからランダムにデータを取得すればよい.
また、空き時間に低優先キューのデータも処理しない高優先キューを専門に処理する消費者を定義することができます.これは銀行のVIPカウンターのようなもので、普通のお客様は銀行で番号を取って並んでいて、VIPが来て、彼は番号取り機から普通の会員の前に並んでいる切符を出していませんが、彼はもっと速く直接VIP通路を歩くことができます.
rabbitmqを使用して優先度をサポートするメッセージキューを作成すると、上記の銀行VIP会員と同じように、異なるチャネルを歩きます.しかし、この方法は相対的な優先度にすぎず、絶対的な優先度制御はできません.例えば、ある優先度の高いタスクが他の一般的なタスクよりも絶対的な意味で優先的に処理されることを望んでいます.このような案は通用しません.rabbitmqの消費者は、自分が空いている場合に自分の関心のあるキューから「ランダム」にあるキューの最初のデータを取って処理することしか知らないため、どのキューを優先的に探すかを制御できません.またはより微細度の優先度制御.あるいは、システムに設定されている優先度は10種類以上あります.このようにrabbitmqを使うのも難しい.
しかしredisを使用してキューを作成すれば、上記のニーズはすべて実現できます.
redisを使用してメッセージキューを作成する方法
まずredisはキャッシュに使用されるように設計されていますが、それ自体の特性によりメッセージキューに使用することができます.いくつかのブロック式のAPIが使用できます.これらのブロック式のAPIこそ、メッセージキューを作る能力を持っています.
データベースですべての問題を解決するには「考えてみれば、メッセージ・キューを使用しなくても、あなたのニーズを満たすことができます.私たちはすべてのタスクをデータベースに保存し、継続的なポーリングでタスク処理を行います.この方法は、あなたのタスクを完了することができますが、やり方は粗悪です.しかし、データベース・インタフェースにブロックされた方法を提供すれば、ポーリング操作を回避することができます.あなたのデータベースメッセージキューとしても使用できますが、現在のデータベースにはこのようなインタフェースはありません.また,メッセージキューの他の特性,例えばFIFOも容易に実現でき,1つのListオブジェクトからデータを最初から取り,末尾部からデータを塞ぐだけで実現できる.redisがメッセージキューを作成できるのは、listオブジェクトblpop brpopインタフェースとPub/sub(パブリッシュ/サブスクリプション)の一部のインタフェースのおかげです.彼らはブロック版なので、メッセージキューに使用できます.
redisメッセージキューの優先度の実装のいくつかの基礎redis基礎知識の説明
redis> blpop tasklist 0
"im task 01"

この例ではblpopコマンドを使用するとtasklistリストから最初のデータがブロックされ、最後のパラメータはタイムアウト待ち時間です.0に設定すると、無限の待機を表します.またredisに格納されているデータはstringタイプのみであるため,タスクが伝達されるときは伝達文字列のみである.担当データをjson形式の文字列に簡単にシーケンス化し、消費者側から変換すればいいだけです.
ここで、サンプル言語はpythonを使用し、redisをリンクするライブラリはredis-pyを使用する.プログラミングの基礎があれば、自分の好きな言語に切り替えるのは大丈夫だと思います.
  • 簡単なFIFOキュー
  • import redis, time
     
    def handle(task):
        print task
        time.sleep(4)
     
    def main():
        pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
        r = redis.Redis(connection_pool=pool)
        while 1:
            result = r.brpop('tasklist', 0)
            handle(result[1])
     
    if __name__ == "__main__":
        main()
    

    前の例では、最も簡単な消費者であっても、redisのキューから無限ループでデータを取得し続けています.キューにデータがない場合はタイムアウトのブロックはなく、データがある場合は取り出して実行します.一般的には複雑な文字列であり、フォーマットして処理関数に渡す必要があるかもしれませんが、簡単にするためには一般的な文字列です.他の例の処理関数は何も処理せず、sleepだけが時間のかかる操作をシミュレートするために使用される.
    生産者をシミュレートするためにredisのクライアントを別に開き、持参したクライアントでいいです.tasklistキューにデータをたくさん詰めます.
    redis> lpush tasklist 'im task 01'
    redis> lpush tasklist 'im task 02'
    redis> lpush tasklist 'im task 03'
    redis> lpush tasklist 'im task 04'
    redis> lpush tasklist 'im task 05' 
    

    その後、消費者側では、これらのシミュレーションされたタスクが消費されるのを見ることができます.
  • 単純優先度のキュー
  • 単純なニーズを仮定すると、優先度の高いタスクが優先度の低いタスクよりも先に処理されるだけです.他のタスク間の順序は一切関係ありません.このような場合、pushではなく、優先度の高いタスクに遭遇したときにキューの先頭に詰めるだけです.私たちのキューはredisのlistを使用しているので、簡単に実現できます.高優先度の使用rpushに遭遇低優先度の使用lpushに遭遇
    redis> lpush tasklist 'im task 01'
    redis> lpush tasklist 'im task 02'
    redis> rpush tasklist 'im high task 01'
    redis> rpush tasklist 'im high task 01'
    redis> lpush tasklist 'im task 03'
    redis> rpush tasklist 'im high task 03'
    

    次に、優先度の高いものは、常に優先度の低いものよりも先に実行されることがわかります.しかし,このスキームの欠点は,優先度の高いタスク間の実行順序が先進的であることである.
  • 比較的完全なキュー
  • 例2では、高優先度のタスクをキューの一番前に、低優先度のタスクを一番後ろに簡単に詰め込むだけである.これにより、優先度の高いタスク間の順序が保証されません.すべてのタスクが優先度が高い場合、実行順序が逆になると仮定します.これは明らかにキューのFIFOの原則に反している.しかし、少し改善すれば、私たちの列を改善することができます.
    rabbitmqと同様に、2つのキューを設定し、1つの優先度が高く、1つの優先度が低いキューを設定します.高優先度タスクは、高キューに配置され、低優先度キューに配置されます.redisはrabbitmqとは異なり、キュー消費者にどのキューから先に読むように要求することができる.
    def main():
        pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
        r = redis.Redis(connection_pool=pool)
        while 1:
            result = r.brpop(['high_task_queue', 'low_task_queue'], 0)
            handle(result[1])
    

    上のコードは、'high_からブロックされます.task_queue’, 'low_task_queue’という2つのキューからデータを取り、1つ目が2つ目から取られなかった場合.そのため、キュー消費者をこのような改善を行うだけで目的を達成することができます.
    redis> lpush low_task_queue low001
    redis> lpush low_task_queue low002
    redis> lpush low_task_queue low003
    redis> lpush low_task_queue low004
    redis> lpush high_task_queue low001
    redis> lpush high_task_queue low002
    redis> lpush high_task_queue low003
    redis> lpush high_task_queue low004
    

    上記のテストでは、優先度の高いものが最初に実行され、優先度の高いものの間でもFIFOの原則が保証されていることがわかります.このスキームでは、高校の3つ以上のレベルなど、異なる段階の優先順位キューをサポートできます.
  • 優先度レベルが多い場合
  • このような需要があると仮定すると、優先度は単純な高校低または0-10のような固定的なレベルではない.0-99999のようなレベルですでは、私たちの3つ目の案はあまり適切ではありません.redisにはsorted setのようなソート可能なデータ型がありますが、ブロック版のインタフェースがないのは残念です.そこでlistタイプを使用して他の方法で目的を達成するしかありません.
    簡単な方法では、キューを1つだけ設定し、優先順位に従って番号をソートすることができます.次に、タスクの適切な場所を二分ルックアップで検索し、lsetコマンドで適切な場所に挿入します.例えばキューには優先度を書くタスク[1,3,6,8,9,14]が含まれており、優先度7のタスクが来ると、自分の二分アルゴリズムでキューからデータを取り出してターゲットデータと照合し、対応する位置を計算して指定された場所に挿入すればよい.
    二分検索は比較的速く、redis自体もメモリにあるので、理論的には速度が保証されています.しかし、データ量が確かに大きいといえば、いくつかの方法で調整することもできます.
    私たちの3つ目の案を思い出すと、3つ目の案を組み合わせると、オーバーヘッドが大幅に減少します.例えば、データ量10万のキューでは、それらの優先度もランダム0〜10万の区間である.10個または100個の異なるキューを設定し、0-1万の優先度タスクを1番キューに投入し、1万-2万のタスクを2番キューに投入することができます.これにより、1つのキューを異なるレベルで分割すると、1つのキューのデータが大幅に減少し、二分検索マッチングの効率も向上します.しかし、データが占めるリソースは基本的に変わらず、10万のデータがどれだけのメモリを占めるべきか、それともどれだけのメモリを占めるべきか.システムにキューが増えただけです.