「redis深さ冒険」-メッセージキューノートを読む
10658 ワード
前言
今回、redisでキューを作るのを見て、実はあまり役に立たないと思います.市場には非常に成熟したmqが性能を要求しているkafkaが安定性を要求しているrabbit redisのmqは、重要ではない場所でキューを作るよりも仕方がありません.例えば、redisキューの非同期保存ログを通じてredisキューを通じてヒント的なプッシュをするなどです.
Listは簡単なキュー(ポイント対ポイント)を実現
要素rpush/lpushを追加
ノンブロッキングモード
lpop/rpop
プレゼンテーションキューrpush/lpopを使用してキューを右に左に移動
#
rpush name value1 #。。。。
#
lpop name
キュー操作クラス実装
package com.ming.base.queue;
import com.ming.core.utils.SpringBeanManager;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* redis list queue
* rpush lpop
*
* @author ming
* @date 2019-09-03 14:56:07
*/
public class RedisListQueue {
/**
* push
*
* @param queueName
* @param value
* @author ming
* @date 2019-09-03 14:57:30
*/
public static void push(String queueName, String value) {
getClient().opsForList().rightPush(queueName, value);
}
/**
*
*
* @param queueName
* @return String
* @author ming
* @date 2019-09-03 14:58:30
*/
public static String pop(String queueName) {
return getClient().opsForList().leftPop(queueName);
}
/**
*
*
* @return StringRedisTemplate
* @author ming
* @date 2019-09-03 15:03:32
*/
private static StringRedisTemplate getClient() {
return SpringBeanManager.getBean(StringRedisTemplate.class);
}
}
キューテストケース
package com.ming.base.queue;
import com.ming.Start;
import org.apache.commons.lang.StringUtils;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* redis list queue
*
* @author ming
* @date 2019-09-03 15:10:36
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Start.class)
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class RedisListQueueTest {
private static final String QUEUE_NAME = "ming";
/**
* 100
*
* @author ming
* @date 2019-09-03 15:12:57
*/
@Test
public void aTestPush() {
// 100
for (int i = 0; i < 10000; i++) {
RedisListQueue.push(QUEUE_NAME, "value" + i);
}
}
/**
*
*
*
* @author ming
* @date 2019-09-03 15:13:10
*/
@Test
public void bTestPop() throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
String tmp = RedisListQueue.pop(QUEUE_NAME);
System.out.println(" t1:" + i + "::" + tmp);
if (StringUtils.isEmpty(tmp)) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
String tmp = RedisListQueue.pop(QUEUE_NAME);
System.out.println(" t2:" + i + "::" + tmp);
if (StringUtils.isEmpty(tmp)) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
}
});
// t2
t2.setPriority(6);
t1.start();
t2.start();
Thread.sleep(10000L);
}
}
ブロックモード
blockブロックlist要素blpop/brpop命令フォーマットblpop name timeoutの取得
プレゼンテーションキューrpush/blpopメソッドを使用してキューを作成
#
rpush name value1 #。。。。
# 10s
blpop name 10
キュー操作クラス実装
package com.ming.base.queue;
import com.ming.core.utils.SpringBeanManager;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
/**
* redis list queue
* rpush lpop
*
* @author ming
* @date 2019-09-03 14:56:07
*/
public class RedisListBlockQueue {
/**
* push
*
* @param queueName
* @param value
* @author ming
* @date 2019-09-03 14:57:30
*/
public static void push(String queueName, String value) {
getClient().opsForList().rightPush(queueName, value);
}
/**
*
*
* @param queueName
* @param timeout
* @param timeUnit
* @return String
* @author ming
* @date 2019-09-03 14:58:30
*/
public static String blockPop(String queueName, long timeout, TimeUnit timeUnit) {
return getClient().opsForList().leftPop(queueName, timeout, timeUnit);
}
/**
*
*
* @return StringRedisTemplate
* @author ming
* @date 2019-09-03 15:03:32
*/
private static StringRedisTemplate getClient() {
return SpringBeanManager.getBean(StringRedisTemplate.class);
}
}
キューテストケース
package com.ming.base.queue;
import com.ming.Start;
import org.apache.commons.lang.StringUtils;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
* redis list block queue
*
* @author ming
* @date 2019-09-03 15:10:36
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Start.class)
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class RedisListBlockQueueTest {
private static final String QUEUE_NAME = "ming";
/**
* 100
*
* @author ming
* @date 2019-09-03 15:12:57
*/
@Test
public void aTestPush() {
// 100
for (int i = 0; i < 5; i++) {
RedisListQueue.push(QUEUE_NAME, "value" + i);
}
}
/**
*
*
*
* @author ming
* @date 2019-09-03 15:13:10
*/
@Test
public void bTestBlockPop() throws InterruptedException {
Thread t1 = new Thread(() -> {
while (true){
// 10s
String tmp = RedisListBlockQueue.blockPop(QUEUE_NAME, 10, TimeUnit.SECONDS);
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+" t1:" + "::" + tmp);
}
});
Thread t2 = new Thread(() -> {
while (true){
String tmp = RedisListBlockQueue.blockPop(QUEUE_NAME, 10, TimeUnit.SECONDS);
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+" t2:" + "::" + tmp);
}
});
// t2
t2.setPriority(6);
t1.start();
t2.start();
Thread.sleep(1000000L);
}
}
redisのpublishとsubscribe(点対多)
redisはlistをキューに使用してパブリッシュサブスクリプションモードを迅速に実行できないことを補うための簡単なパブリケーションサブスクリプションシステムを提供します.
コマンド#コマンド#
書式設定
コメント
publish
publish channelName message
メッセージを受信した購読者の数を返します.
subscribe
subscribe channelName
チャネルの購読
unsubscribe
unsubscribe channelName
キャンセルチャンネル
psubscribe
psubscribe channelPattern
channelPattern正則に従ってルールに合致するchannelを購読する
punsubscribe
punsubscribe channelPattern
channelPatternルールに従ってルールに合致するchannelの購読をキャンセル
pubsub
pubsub subcommand arg
パブリッシュサブスクリプションシステムのステータスの表示
arg
説明
CHANNELS
[pattern]
指定されたモードpatternのアクティブなチャネルを返し、SUBSCRIBEによって購読されたチャネルを返すことを指定します.
NUMSUB
channel channel2 ...
指定したチャンネルの購読数を返します
NUMPAT
-
サブスクリプション・モードの数を返します.注意:このコマンドは、サブスクリプション・モードのクライアントの数ではなく、クライアントがサブスクリプションしたすべてのモードの数の合計を返します.
JAvaコードの例
package com.ming.base.queue;
import com.ming.Start;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.nio.charset.StandardCharsets;
/**
* redis pub sub
*
* @author ming
* @date 2019-09-03 15:10:36
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Start.class)
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class RedisPubSubTest {
private static final String CHANNEL_NAME = "ming";
@Autowired
private RedisTemplate stringRedisTemplate;
/**
*
*
* @author ming
* @date 2019-09-03 15:12:57
*/
@Test
public void bTestPub() {
for (int i = 0; i < 10; i++) {
stringRedisTemplate.convertAndSend(CHANNEL_NAME, "mingtest" + i + "::" + System.currentTimeMillis());
}
}
/**
*
*
* bTestPub
*
* @author ming
* @date 2019-09-03 15:13:10
*/
@Test
public void aTestSub() throws InterruptedException {
Thread t1 = new Thread(() -> stringRedisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> System.out.println("t1-message:" + message), CHANNEL_NAME.getBytes(StandardCharsets.UTF_8)));
Thread t2 = new Thread(() -> stringRedisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> System.out.println("t2-message:" + message), CHANNEL_NAME.getBytes(StandardCharsets.UTF_8)));
t1.start();
t2.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
まとめ
本にはzsetで実現されたキューもありますが、redisのキューに誤差を許容する速い案を作る必要はないと思いますが、キューには速度、安定性、成熟したmq kafkaやrabbitを直接使う必要があります.