[記録点滴]Redisは簡単なメッセージキューを実現する
7525 ワード
[記録点滴]Redisは簡単なメッセージキューを実現する
目次[記録点滴]Redis単純メッセージキューの実装 0 0 x 00要約 0 x 01缘由 0 x 02背景概念 2.1 Redisがメッセージキューに適しているかどうか 2.1.1 Redisの問題 2.1.2誤りを許さない 2.1.3許容エラー 2.2 Redisメッセージキューのシナリオ 2.4本明細書で採用したスキーム 2.4.1 RPOPLPUSH 2.4.2 BLPOPとBRPOP
0 x 03生産者LUA 0 x 04消費者JAVA 4.1データ変数 4.2消費関数 4.3バックアップメッセージを削除 4.4処理異常 0 x 05参照
0 x 00サマリ
本論文では,リソース不足の条件下で一時的に使用するのに適したRedisを用いて単純なメッセージキューを実現するスキームを提案した.
0 x 01缘由
一人の兄弟が起業して、資源が深刻に不足していますが、メッセージの列を作りたいと思って、私に相談しました.私はちょうど経験があるので,彼と分かち合った.彼のニーズは以下の通りです.の主な目的は、デカップリングのために、メッセージがキューに格納され、その後、キューからデータベースに転送されることである. はメッセージの信頼性に対する要求が高くなく、使用シーンはメッセージの損失を許容するか、あるいは性能に対する渇望が信頼性より大きい. グループ消費を考慮せず、消費と放送を繰り返す. メッセージシーケンス順序は考慮されない. システムは現在、Redisをキャッシュしています. 人力と財力資源は専門のQueueを再利用するのに不足している.
この場合、Redis上でメッセージキューを構築し、一時的に難関を乗り越えることをお勧めします.
0 x 02背景概念
2.1 Redisがメッセージキューに適しているか
まず結論を述べる:Redisはメッセージキューに適していないに違いない.それ自体がRedis設計の初心ではないからだ.
しかし、確かにリソースが制限されている場合、システムのメンテナンスコストを削減し、複雑さを実現するために、Redisを使用することも考えられます.
2.1.1 Redisの問題
Redisはメッセージキューのために設計されていないため、メッセージキューの基本的な問題は考慮されていません.列で物をなくしたらどうしますか? キューが一時的に新しいデータを挿入できない場合、一時的なストレージなどのキューリカバリ時に新しいデータを一時的に保存して再挿入する方法はありませんか? 消費者はデータを読み取るときに「commit」の意味が必要ですか?読み取り処理が完了したことを確認する必要がありますか? キューの長さに制限はありますか?最大長に達したらどうする?......
エラーを許容するかどうかによって区別すると、2つのキューに分けることができますが、どちらもRedisには向いていません.
2.1.2誤りを容認しない
このキューの要件は、メッセージの損失を許さず、一貫性を保証することです.例えば、注文操作.
この需要の下で、Redisを使うのは現実的ではありません.Redisに基づいて使い捨てや非同期のべき乗などをどのようにして、exactly onceを保証するかを考える必要がありますから.RabbitMQ、Kafkaなど、よくあるMQを使うべきです.
2.1.3許容誤り
例えばログ収集.このようなある程度のデータ損失を許容するのは、実際にはRedisにも適していないし、fluentd、logstashなど、既存のスキームがたくさんあります.
2.2 Redisメッセージキューのシナリオ
一般的には4つの方法があります ListベースLPUSH+BRPOPの実装 PUB/SUBベース、購読/配布モード Sorted-setベースの実装 Streamタイプに基づく実装Redisの本例におけるアプリケーション あるいはRedisの作者が書いたdisqueに基づいて開発することを考えることができますか?
2.4本明細書のシナリオ
本明細書では、異なるプログラム間でメッセージを交換するためにキューとしてRedisのListを用いることができる.生成者はLPUSHまたはRPUSHを使用してメッセージをキューに入れます.消費者は、RPOPコマンドまたはLPOPコマンドを使用して、待ち時間が最も長いメッセージを取り出します.Listは複数の生産者と消費者の同時出入りメッセージをサポートし,消費者ごとに異なるリスト要素を取得する.
しかし、これには2つの問題があります.キューが空の場合、LPOPまたはRPOPは常に交代し、リソースを大幅に消費します. クライアントがメッセージを消費するときにクラッシュすると、未処理のメッセージも失われる.
そのためには RPOPLPUSHコマンド(またはそのブロックバージョンBRPOLPUSH)を使用します. またはブロックリードblpopおよびbrpop(bはblockingを表す)を導入し、ブロックリードはキューにデータがない場合にスリープ状態に入る.
2.4.1 RPOPLPUSH
RPOPLPUSHの利点は、メッセージを返すだけでなく、別のバックアップリストに追加することです.すべてが正常であれば、クライアントがメッセージの処理を完了した後、LREMコマンドでバックアップリストからメッセージを削除できます.
最後に、バックアップ・テーブルを監視するためのクライアントを追加することもできます.これにより、一定の処理時間を超えたメッセージをキューに自動的に再配置します(メッセージを処理するクライアントがクラッシュした可能性があります).これにより、メッセージが失われません.
注意すべき点:RPOPLPUSHが再エンキューされ、バックアップリストの右側の要素(表の末尾)が再エンキューされ、メッセージが重複して消費される可能性があります.従って、消費操作はべき乗等性を実現する、すなわち重複消費結果の一致を保証する.
2.4.2 BLPOPとBRPOP
利点は,ブロックリードがキューにデータがない場合にスリープ状態になり,データが来るとすぐに目が覚め,メッセージ遅延がほぼゼロになることである.
注意すべき問題:空き接続の問題.スレッドが常にブロックされている場合、Redisクライアントの接続はアイドル接続となり、アイドルが長すぎると、サーバは自発的に接続を切断し、アイドルリソースの占有量を減らすのが一般的です.このときblpopとbrpopは異常を投げ出すか、クライアント消費者を書くときは気をつけなければなりません.異常をキャプチャしたら、再試行もあります.
0 x 03生産者LUA
彼のデータはLUAによって生成され、Openrestyによって実行されます.
具体的なコードの概要は次のとおりです.
0 x 04消費者JAVA
生産者はLPUSHなので、消費者はRPOPLPUSHを使います.
4.1データ変数
RPOPLPUSHは1つのメッセージを返すだけでなく、このメッセージを別のバックアップリストに追加するので、mKeyはメッセージリスト、mKeyRollbackはバックアップリストです.Redisからメッセージを読み出し、mActionListに一時的に格納する.
4.2消費関数
consumeは消費関数です.例外が発生すると、バックアップ・リストからメッセージ・キューにメッセージが書き込まれます.
具体的なRedis動作は
これは、構成タイムアウト時間をサポートするために使用します.
4.3バックアップメッセージの削除
clearRollback関数は、メッセージが正常に処理された後、バックアップ・キューからバックアップ・メッセージを削除します.
4.4処理異常
問題が発生すると、rollbackLastLaunch関数が呼び出され、バックアップリストからメッセージをメッセージキューに戻します.
1つのRedisオペレーションでlpopとrpushの2つのオペレーションを実行する必要があるため,この2つのオペレーションを1つの原子配列に構築する必要があるため,ここではLuaスクリプトの使用に関する.Lua環境への組み込みサポートにより、Redisは、CAS(check-and-set)コマンドを効率的に処理できないという長年の欠点を解決し、複数のコマンドを組み合わせることで、従来は実現が困難だったり、効率的に実現できなかったりしたモードを容易に実現することができる.
0 x 05リファレンス
Redisの使い方は頼りになりますか?
Luaスクリプト
Redisがメッセージキューを実装するスキーム
Redisはメッセージキューをどうしますか?
Redisブロック、セキュアキューBLPOP/BRPOP/LPUSH
目次
0 x 00サマリ
本論文では,リソース不足の条件下で一時的に使用するのに適したRedisを用いて単純なメッセージキューを実現するスキームを提案した.
0 x 01缘由
一人の兄弟が起業して、資源が深刻に不足していますが、メッセージの列を作りたいと思って、私に相談しました.私はちょうど経験があるので,彼と分かち合った.彼のニーズは以下の通りです.
この場合、Redis上でメッセージキューを構築し、一時的に難関を乗り越えることをお勧めします.
0 x 02背景概念
2.1 Redisがメッセージキューに適しているか
まず結論を述べる:Redisはメッセージキューに適していないに違いない.それ自体がRedis設計の初心ではないからだ.
しかし、確かにリソースが制限されている場合、システムのメンテナンスコストを削減し、複雑さを実現するために、Redisを使用することも考えられます.
2.1.1 Redisの問題
Redisはメッセージキューのために設計されていないため、メッセージキューの基本的な問題は考慮されていません.
エラーを許容するかどうかによって区別すると、2つのキューに分けることができますが、どちらもRedisには向いていません.
2.1.2誤りを容認しない
このキューの要件は、メッセージの損失を許さず、一貫性を保証することです.例えば、注文操作.
この需要の下で、Redisを使うのは現実的ではありません.Redisに基づいて使い捨てや非同期のべき乗などをどのようにして、exactly onceを保証するかを考える必要がありますから.RabbitMQ、Kafkaなど、よくあるMQを使うべきです.
2.1.3許容誤り
例えばログ収集.このようなある程度のデータ損失を許容するのは、実際にはRedisにも適していないし、fluentd、logstashなど、既存のスキームがたくさんあります.
2.2 Redisメッセージキューのシナリオ
一般的には4つの方法があります
2.4本明細書のシナリオ
本明細書では、異なるプログラム間でメッセージを交換するためにキューとしてRedisのListを用いることができる.生成者はLPUSHまたはRPUSHを使用してメッセージをキューに入れます.消費者は、RPOPコマンドまたはLPOPコマンドを使用して、待ち時間が最も長いメッセージを取り出します.Listは複数の生産者と消費者の同時出入りメッセージをサポートし,消費者ごとに異なるリスト要素を取得する.
しかし、これには2つの問題があります.
そのためには
2.4.1 RPOPLPUSH
RPOPLPUSHの利点は、メッセージを返すだけでなく、別のバックアップリストに追加することです.すべてが正常であれば、クライアントがメッセージの処理を完了した後、LREMコマンドでバックアップリストからメッセージを削除できます.
最後に、バックアップ・テーブルを監視するためのクライアントを追加することもできます.これにより、一定の処理時間を超えたメッセージをキューに自動的に再配置します(メッセージを処理するクライアントがクラッシュした可能性があります).これにより、メッセージが失われません.
注意すべき点:RPOPLPUSHが再エンキューされ、バックアップリストの右側の要素(表の末尾)が再エンキューされ、メッセージが重複して消費される可能性があります.従って、消費操作はべき乗等性を実現する、すなわち重複消費結果の一致を保証する.
2.4.2 BLPOPとBRPOP
利点は,ブロックリードがキューにデータがない場合にスリープ状態になり,データが来るとすぐに目が覚め,メッセージ遅延がほぼゼロになることである.
注意すべき問題:空き接続の問題.スレッドが常にブロックされている場合、Redisクライアントの接続はアイドル接続となり、アイドルが長すぎると、サーバは自発的に接続を切断し、アイドルリソースの占有量を減らすのが一般的です.このときblpopとbrpopは異常を投げ出すか、クライアント消費者を書くときは気をつけなければなりません.異常をキャプチャしたら、再試行もあります.
0 x 03生産者LUA
彼のデータはLUAによって生成され、Openrestyによって実行されます.
具体的なコードの概要は次のとおりです.
local REDIS = require "redis_iresty"
local REDIS_STORE = REDIS:new(CONF)
REDIS_STORE:lpush(LOG_LIST, log)
0 x 04消費者JAVA
生産者はLPUSHなので、消費者はRPOPLPUSHを使います.
4.1データ変数
RPOPLPUSHは1つのメッセージを返すだけでなく、このメッセージを別のバックアップリストに追加するので、mKeyはメッセージリスト、mKeyRollbackはバックアップリストです.Redisからメッセージを読み出し、mActionListに一時的に格納する.
protected List mActionList = new CopyOnWriteArrayList();
@Value("${key.list}")
private String mKey;
@Value("${key.rollback.list}")
private String mKeyRollback;
4.2消費関数
consumeは消費関数です.例外が発生すると、バックアップ・リストからメッセージ・キューにメッセージが書き込まれます.
public boolean consume() {
rollbackLastLaunch(); // ,
while(true) {
try {
if (schedulejob) {
timerecord = System.currentTimeMillis();
schedulejob = false;
}
// , Redis 。
String action = mRedisStore.listRightPopAndLeftPush(mKey, mKeyRollback, mWaitTimeLimit, TimeUnit.SECONDS);
if(action != null) {
mActionList.add(action);
}
currentTimeStamp = System.currentTimeMillis();
if (mActionList.size() >= mBatchSize ||
(currentTimeStamp - timerecord >= mTimeElapsedLimit && mActionList.size() > 0)) {
schedulejob = true;
boolean res = sync2MySql();
if (res == true) {
clearRollback(); //
} else {
rollbackLastLaunch(); //rollback();
}
mActionList.clear();
}
} catch (Exception e) {
// , processing id waiting queue
// redis, mysql , catch
rollbackLastLaunch();
} finally {
}
}
}
}
具体的なRedis動作は
StringRedisTemplate.opsForList().rightPopAndLeftPush
関数である.public String listRightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) {
getTemplate().setDefaultSerializer(new StringRedisSerializer());
return getTemplate().opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit);
}
これは、構成タイムアウト時間をサポートするために使用します.
V rightPopAndLeftPush(K var1, K var2, long var3, TimeUnit var5);
4.3バックアップメッセージの削除
clearRollback関数は、メッセージが正常に処理された後、バックアップ・キューからバックアップ・メッセージを削除します.
protected void clearRollback() {
Long count = mRedisStore.getListSize(mKeyRollback);
while(count > 0 ) {
mRedisStore.listRightPop(mKeyRollback);
count--;
}
}
4.4処理異常
問題が発生すると、rollbackLastLaunch関数が呼び出され、バックアップリストからメッセージをメッセージキューに戻します.
1つのRedisオペレーションでlpopとrpushの2つのオペレーションを実行する必要があるため,この2つのオペレーションを1つの原子配列に構築する必要があるため,ここではLuaスクリプトの使用に関する.Lua環境への組み込みサポートにより、Redisは、CAS(check-and-set)コマンドを効率的に処理できないという長年の欠点を解決し、複数のコマンドを組み合わせることで、従来は実現が困難だったり、効率的に実現できなかったりしたモードを容易に実現することができる.
void rollbackLastLaunch() {
try {
Long count = mRedisStore.getListSize(mKeyRollback);
Long dbsize = 0l;
while(count > 0 ) {
List keys = new ArrayList();
keys.add(mKeyRollback);
keys.add(mKey);
DefaultRedisScript script = new DefaultRedisScript();
script.setScriptText("local action = redis.call('lpop', KEYS[1]); local result = redis.call('rpush', KEYS[2], action); return result;");
script.setResultType(Long.class);
dbsize += mRedisStore.executeScript(script, keys, null);
count--;
}
} catch (Exception e) {
}
}
0 x 05リファレンス
Redisの使い方は頼りになりますか?
Luaスクリプト
Redisがメッセージキューを実装するスキーム
Redisはメッセージキューをどうしますか?
Redisブロック、セキュアキューBLPOP/BRPOP/LPUSH