[記録点滴]Redisは簡単なメッセージキューを実現する

7525 ワード

[記録点滴]Redisは簡単なメッセージキューを実現する
目次
  • [記録点滴]Redis単純メッセージキューの実装
  • 0 0 x 00要約
  • 0 x 01缘由
  • 0 x 02背景概念
  • 2.1 Redisがメッセージキューに適しているかどうか
  • 2.1.1 Redisの問題
  • 2.1.2誤りを許さない
  • 2.1.3許容エラー
  • 2.2 Redisメッセージキューのシナリオ
  • 2.4本明細書で採用したスキーム
  • 2.4.1 RPOPLPUSH
  • 2.4.2 BLPOPとBRPOP

  • 0 x 03生産者LUA
  • 0 x 04消費者JAVA
  • 4.1データ変数
  • 4.2消費関数
  • 4.3バックアップメッセージを削除
  • 4.4処理異常
  • 0 x 05参照

  • 0 x 00サマリ
    本論文では,リソース不足の条件下で一時的に使用するのに適したRedisを用いて単純なメッセージキューを実現するスキームを提案した.
    0 x 01缘由
    一人の兄弟が起業して、資源が深刻に不足していますが、メッセージの列を作りたいと思って、私に相談しました.私はちょうど経験があるので,彼と分かち合った.彼のニーズは以下の通りです.
  • の主な目的は、デカップリングのために、メッセージがキューに格納され、その後、キューからデータベースに転送されることである.
  • はメッセージの信頼性に対する要求が高くなく、使用シーンはメッセージの損失を許容するか、あるいは性能に対する渇望が信頼性より大きい.
  • グループ消費を考慮せず、消費と放送を繰り返す.
  • メッセージシーケンス順序は考慮されない.
  • システムは現在、Redisをキャッシュしています.
  • 人力と財力資源は専門のQueueを再利用するのに不足している.

  • この場合、Redis上でメッセージキューを構築し、一時的に難関を乗り越えることをお勧めします.
    0 x 02背景概念
    2.1 Redisがメッセージキューに適しているか
    まず結論を述べる:Redisはメッセージキューに適していないに違いない.それ自体がRedis設計の初心ではないからだ.
    しかし、確かにリソースが制限されている場合、システムのメンテナンスコストを削減し、複雑さを実現するために、Redisを使用することも考えられます.
    2.1.1 Redisの問題
    Redisはメッセージキューのために設計されていないため、メッセージキューの基本的な問題は考慮されていません.
  • 列で物をなくしたらどうしますか?
  • キューが一時的に新しいデータを挿入できない場合、一時的なストレージなどのキューリカバリ時に新しいデータを一時的に保存して再挿入する方法はありませんか?
  • 消費者はデータを読み取るときに「commit」の意味が必要ですか?読み取り処理が完了したことを確認する必要がありますか?
  • キューの長さに制限はありますか?最大長に達したらどうする?......

  • エラーを許容するかどうかによって区別すると、2つのキューに分けることができますが、どちらもRedisには向いていません.
    2.1.2誤りを容認しない
    このキューの要件は、メッセージの損失を許さず、一貫性を保証することです.例えば、注文操作.
    この需要の下で、Redisを使うのは現実的ではありません.Redisに基づいて使い捨てや非同期のべき乗などをどのようにして、exactly onceを保証するかを考える必要がありますから.RabbitMQ、Kafkaなど、よくあるMQを使うべきです.
    2.1.3許容誤り
    例えばログ収集.このようなある程度のデータ損失を許容するのは、実際にはRedisにも適していないし、fluentd、logstashなど、既存のスキームがたくさんあります.
    2.2 Redisメッセージキューのシナリオ
    一般的には4つの方法があります
  • ListベースLPUSH+BRPOPの実装
  • PUB/SUBベース、購読/配布モード
  • Sorted-setベースの実装
  • Streamタイプに基づく実装Redisの本例におけるアプリケーション
  • あるいはRedisの作者が書いたdisqueに基づいて開発することを考えることができますか?
    2.4本明細書のシナリオ
    本明細書では、異なるプログラム間でメッセージを交換するためにキューとしてRedisのListを用いることができる.生成者はLPUSHまたはRPUSHを使用してメッセージをキューに入れます.消費者は、RPOPコマンドまたはLPOPコマンドを使用して、待ち時間が最も長いメッセージを取り出します.Listは複数の生産者と消費者の同時出入りメッセージをサポートし,消費者ごとに異なるリスト要素を取得する.
    しかし、これには2つの問題があります.
  • キューが空の場合、LPOPまたはRPOPは常に交代し、リソースを大幅に消費します.
  • クライアントがメッセージを消費するときにクラッシュすると、未処理のメッセージも失われる.

  • そのためには
  • RPOPLPUSHコマンド(またはそのブロックバージョンBRPOLPUSH)を使用します.
  • またはブロックリードblpopおよびbrpop(bはblockingを表す)を導入し、ブロックリードはキューにデータがない場合にスリープ状態に入る.

  • 2.4.1 RPOPLPUSH
    RPOPLPUSHの利点は、メッセージを返すだけでなく、別のバックアップリストに追加することです.すべてが正常であれば、クライアントがメッセージの処理を完了した後、LREMコマンドでバックアップリストからメッセージを削除できます.
    最後に、バックアップ・テーブルを監視するためのクライアントを追加することもできます.これにより、一定の処理時間を超えたメッセージをキューに自動的に再配置します(メッセージを処理するクライアントがクラッシュした可能性があります).これにより、メッセージが失われません.
    注意すべき点:RPOPLPUSHが再エンキューされ、バックアップリストの右側の要素(表の末尾)が再エンキューされ、メッセージが重複して消費される可能性があります.従って、消費操作はべき乗等性を実現する、すなわち重複消費結果の一致を保証する.
    2.4.2 BLPOPとBRPOP
    利点は,ブロックリードがキューにデータがない場合にスリープ状態になり,データが来るとすぐに目が覚め,メッセージ遅延がほぼゼロになることである.
    注意すべき問題:空き接続の問題.スレッドが常にブロックされている場合、Redisクライアントの接続はアイドル接続となり、アイドルが長すぎると、サーバは自発的に接続を切断し、アイドルリソースの占有量を減らすのが一般的です.このときblpopとbrpopは異常を投げ出すか、クライアント消費者を書くときは気をつけなければなりません.異常をキャプチャしたら、再試行もあります.
    0 x 03生産者LUA
    彼のデータはLUAによって生成され、Openrestyによって実行されます.
    具体的なコードの概要は次のとおりです.
    local REDIS = require "redis_iresty"
    
    local REDIS_STORE = REDIS:new(CONF)
    
    REDIS_STORE:lpush(LOG_LIST, log)
    

    0 x 04消費者JAVA
    生産者はLPUSHなので、消費者はRPOPLPUSHを使います.
    4.1データ変数
    RPOPLPUSHは1つのメッセージを返すだけでなく、このメッセージを別のバックアップリストに追加するので、mKeyはメッセージリスト、mKeyRollbackはバックアップリストです.Redisからメッセージを読み出し、mActionListに一時的に格納する.
    protected List mActionList = new CopyOnWriteArrayList();
    
    @Value("${key.list}")
    private String mKey;	
    	
    @Value("${key.rollback.list}")
    private String mKeyRollback;	
    

    4.2消費関数
    consumeは消費関数です.例外が発生すると、バックアップ・リストからメッセージ・キューにメッセージが書き込まれます.
    public boolean consume() {
          rollbackLastLaunch(); //       ,      
          
          while(true) {  
             
             try {
                if (schedulejob) {
                   timerecord = System.currentTimeMillis();
                   schedulejob = false;
                }           
                
               //          ,  Redis                 。
                String action = mRedisStore.listRightPopAndLeftPush(mKey, mKeyRollback, mWaitTimeLimit, TimeUnit.SECONDS);
                if(action != null) {
                   mActionList.add(action);
                }
                
                currentTimeStamp = System.currentTimeMillis();
                if (mActionList.size() >= mBatchSize || 
                      (currentTimeStamp - timerecord >= mTimeElapsedLimit && mActionList.size() > 0)) {
                   schedulejob = true;       
                   boolean res = sync2MySql();
                   if (res == true) {
                      clearRollback(); //      
                   } else {                 
                      rollbackLastLaunch(); //rollback();
                   }
                   mActionList.clear();
                }
             } catch (Exception e) {
                 //       ,   processing  id    waiting queue 
                 //  redis, mysql  ,      catch
                 rollbackLastLaunch(); 
             } finally {             
             }
          }
       }
    }
    

    具体的なRedis動作はStringRedisTemplate.opsForList().rightPopAndLeftPush関数である.
    public String listRightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) {
       getTemplate().setDefaultSerializer(new StringRedisSerializer());
           return getTemplate().opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit);
    }  
    

    これは、構成タイムアウト時間をサポートするために使用します.
    V rightPopAndLeftPush(K var1, K var2, long var3, TimeUnit var5);
    

    4.3バックアップメッセージの削除
    clearRollback関数は、メッセージが正常に処理された後、バックアップ・キューからバックアップ・メッセージを削除します.
    protected void clearRollback() {
       Long count = mRedisStore.getListSize(mKeyRollback);
       while(count > 0 ) {
          mRedisStore.listRightPop(mKeyRollback);
          count--;
       }     
    }
    

    4.4処理異常
    問題が発生すると、rollbackLastLaunch関数が呼び出され、バックアップリストからメッセージをメッセージキューに戻します.
    1つのRedisオペレーションでlpopとrpushの2つのオペレーションを実行する必要があるため,この2つのオペレーションを1つの原子配列に構築する必要があるため,ここではLuaスクリプトの使用に関する.Lua環境への組み込みサポートにより、Redisは、CAS(check-and-set)コマンドを効率的に処理できないという長年の欠点を解決し、複数のコマンドを組み合わせることで、従来は実現が困難だったり、効率的に実現できなかったりしたモードを容易に実現することができる.
    void rollbackLastLaunch() {
       try {
          Long count = mRedisStore.getListSize(mKeyRollback);          
          Long dbsize = 0l;
          while(count > 0 ) {
             List keys = new ArrayList();
             keys.add(mKeyRollback);
             keys.add(mKey);
             
             DefaultRedisScript script = new DefaultRedisScript();
             script.setScriptText("local action = redis.call('lpop', KEYS[1]); local result = redis.call('rpush', KEYS[2], action); return result;");
             script.setResultType(Long.class);
             
             dbsize += mRedisStore.executeScript(script, keys, null);
             count--;
          }           
       } catch (Exception e) {    
       }  
    }  
    

    0 x 05リファレンス
    Redisの使い方は頼りになりますか?
    Luaスクリプト
    Redisがメッセージキューを実装するスキーム
    Redisはメッセージキューをどうしますか?
    Redisブロック、セキュアキューBLPOP/BRPOP/LPUSH