Redisを用いて遅延処理を実現する方法の例
背景
開発においては、遅延任務に関するいくつかの需要があります。例えば
•注文を生成してから30分以内に支払わないと自動的にキャンセルされます。
•注文書を作成して60秒後、ユーザーにメッセージを送る
上記の任務に対して、私達は専門の名前をあげて形容します。それは延命任務です。
最近は、時間遅延処理を行う機能が必要です。主に、kafkaからのメッセージを消費した後、メッセージのある遅延フィールドに従って遅延処理を行います。実際の実装過程で注意すべき点があります。下記のように記録します。
実現プロセス
javaにおけるタイミング機能といえば、まず思いつくTimerとSchduledThreadPoolExectorですが、Timerはそれに比べて除外できます。 Timerは絶対時間を使っています。システム時間の変化はTimerに一定の影響を与えます。ScheduledThreadPoolExectorは相対時間を使っていますので、この問題はありません。 Timerは、単一のラインプログラムを使用してタスクを処理し、長時間実行されるタスクは、他のタスクの遅延処理をもたらし、ScheduledThreadPool Exectorは、スレッドの数をカスタマイズすることができます。 Timerは運転時の異常処理を行っていません。あるタスクが実行をトリガすると、例外的にTimerが崩壊し、SchduledThreadPool Exectorが運転時に異常に捕獲したので、より安全です。 1、ScheduledThreadPoolExectorはScheduledThreadPool Exectorで実現することを決定しました。次はコード作成です。
主な遅延は以下の通りです。
それからテストして、目標の需要を満たす機能は指定時間を遅らせて実行できるようになりました。これで機能は完成されたようです。
皆さんは疑問かもしれませんが、これは簡単すぎます。何か言いたいことがありますか?しかし、このような方式の実現は簡単ですが、潜在的な問題があります。問題はどこにありますか?SchduledThreadPoolExectorのソースを見せてください。
DelayWrokQueは無界の列であり、ターゲットデータのソースはkafkaであり、つまり高合併高スループットのメッセージキューであり、ある時間帯に大量のメッセージが来て、OOMを引き起こす可能性が高いです。マルチスレッドを使う時、OOMの可能性を考慮しなければならないです。システムOOMの一時的な解決方法は普通は再起動しかできません。ユーザーのデータが失われるなど挽回不可能な問題が発生する可能性がありますので、コード設計段階からできるだけ妥当な手段を採用してこれらの問題を避けるべきです。
2、redisとスレッド結合を採用する
今回は考え方を変えて、redisを採用して私達を緩衝してくれます。これによってメッセージが多すぎてOOMの問題を避けます。
関連redis zset api:第一歩データストア:9:10 kafkaからaの注文メッセージを受け取りました。30分後に出荷通知を行います。今の時間を30分加えて、タイムスタンプをaのscoreに変えます。keyはaの注文番号としてredisに預けます。コードは以下の通りです。 第二段階のデータ処理:もう一つのスレッドの具体的なスケジュール時間は業務需要によって決められます。ここで3分間に一回実行します。内部ロジック:redisから一定量のzsetデータを取り出して、どうやって取りますか?zsetのzrangeByScore方法を使って、データのscoreによって並べ替えます。もちろん時間帯を持ってもいいです。ここで0から今まで、消費します。注意すべき点は、データを取り出した後、zrem方法で取り出したデータをzsetから削除し、他のスレッドでデータの重複消費を防ぐことです。その後、次の出荷通知などの関連ロジックを行います。コードは以下の通りです。
おわりに
上の二つの異なる実現方法について、長所と短所の比較を行います。の第一の方法は簡単で、外部のコンポーネントに依存しないで、目的の機能を迅速に実現することができますが、欠点も明らかで、特定のシーンで使用する必要があります。このようなメッセージの量が大きい場合は問題があるかもしれません。もちろん、データソースが少なくても良い選択です。 の第二の方法は、やや複雑であるが、メッセージ量の大きいシーンに適応でき、レディのzsetを「ミドルウェア」の効果として採用し、また、遅延機能の実現を助けて、高合併シーンによりよく適応することができる。送信には一定時間の遅延、一括データサイズの設定などがあります。 以上、本人の今回の遅延機能の実現過程の二つの実現方式をまとめました。具体的にどのような方式を採用するかは、皆さんが実際の状況によって選択してください。ps:本人の技術能力が限られていますので、文章の中で技術的な説明が不正確または間違っている可能性があります。ご指摘をお願いします。すぐに是正します。みんなを誤解させないようにします。ありがとうございます。
締め括りをつける
以上はこの文章の全部の内容です。本文の内容は皆さんの学習や仕事に対して一定の参考学習価値を持ってほしいです。ありがとうございます。
開発においては、遅延任務に関するいくつかの需要があります。例えば
•注文を生成してから30分以内に支払わないと自動的にキャンセルされます。
•注文書を作成して60秒後、ユーザーにメッセージを送る
上記の任務に対して、私達は専門の名前をあげて形容します。それは延命任務です。
最近は、時間遅延処理を行う機能が必要です。主に、kafkaからのメッセージを消費した後、メッセージのある遅延フィールドに従って遅延処理を行います。実際の実装過程で注意すべき点があります。下記のように記録します。
実現プロセス
javaにおけるタイミング機能といえば、まず思いつくTimerとSchduledThreadPoolExectorですが、Timerはそれに比べて除外できます。
主な遅延は以下の通りです。
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new
ThreadPoolExecutor.AbortPolicy());
//
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
//
}},0,delayTime, TimeUnit.SECONDS);
その中のNamedThreadFactoryは私がカスタマイズしたスレッド工場で、主にスレッド池の定義名称と関連ログの印刷に後続の問題分析がしやすいので、ここでは多く紹介しません。拒否策もデフォルトの拒否策を採用します。それからテストして、目標の需要を満たす機能は指定時間を遅らせて実行できるようになりました。これで機能は完成されたようです。
皆さんは疑問かもしれませんが、これは簡単すぎます。何か言いたいことがありますか?しかし、このような方式の実現は簡単ですが、潜在的な問題があります。問題はどこにありますか?SchduledThreadPoolExectorのソースを見せてください。
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0,
TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
ScheduledThreadPoolExectorは自分自身の延長とサイクルの特性のため、デフォルトではDelayWorkQueを使っていますが、私たちが普段使っているSingleThreadExectorなどの構造ではなく、自分で定義したLinked BlockingQueを使って列のサイズを設定することができます。DelayWrokQueは無界の列であり、ターゲットデータのソースはkafkaであり、つまり高合併高スループットのメッセージキューであり、ある時間帯に大量のメッセージが来て、OOMを引き起こす可能性が高いです。マルチスレッドを使う時、OOMの可能性を考慮しなければならないです。システムOOMの一時的な解決方法は普通は再起動しかできません。ユーザーのデータが失われるなど挽回不可能な問題が発生する可能性がありますので、コード設計段階からできるだけ妥当な手段を採用してこれらの問題を避けるべきです。
2、redisとスレッド結合を採用する
今回は考え方を変えて、redisを採用して私達を緩衝してくれます。これによってメッセージが多すぎてOOMの問題を避けます。
関連redis zset api:
//
ZADD key score member [[score member] [score member] …]
//
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
// zset
ZREM key member [member …]
私たちはredisベースデータ構造のzset構造を採用して、scoreを採用して目標送信時間の数値を記憶しています。全体の処理の流れは以下の通りです。
public void onMessage(String topic, String message) {
String orderId;
int delayTime = 0;
try {
Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
}.getType());
if (msgMap.isEmpty()) {
return;
}
LOGGER.info("onMessage kafka content:{}", msgMap.toString());
orderId = msgMap.get("orderId");
if(StringUtils.isNotEmpty(orderId)){
delayTime = Integer.parseInt(msgMap.get("delayTime"));
Calendar calendar = Calendar.getInstance();
//
calendar.add(Calendar.MINUTE, delayTime);
long sendTime = calendar.getTimeInMillis();
RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
LOGGER.info("orderId:{}--- redis ---sendTime:{}", ---orderId:{}, sendTime);
}
} catch (Exception e) {
LOGGER.info("onMessage :{}", e);
}
}
public void run(){
//
int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
try {
// orderNum
Calendar calendar = Calendar.getInstance();
long now = calendar.getTimeInMillis();
// key( , )
Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
if (CollectionUtils.isNotEmpty(orders)){
// key
for (String orderId : orderIds) {
RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
}
//
}
} catch (Exception e) {
LOGGER.warn("task.run exception:{}", e);
}
}
これでRedisとスレッドに依存して遅延送信を完了しました。おわりに
上の二つの異なる実現方法について、長所と短所の比較を行います。
締め括りをつける
以上はこの文章の全部の内容です。本文の内容は皆さんの学習や仕事に対して一定の参考学習価値を持ってほしいです。ありがとうございます。