「redis深さ冒険」-メッセージキューノートを読む

10658 ワード

前言


今回、redisでキューを作るのを見て、実はあまり役に立たないと思います.市場には非常に成熟したmqが性能を要求しているkafkaが安定性を要求しているrabbit redisのmqは、重要ではない場所でキューを作るよりも仕方がありません.例えば、redisキューの非同期保存ログを通じてredisキューを通じてヒント的なプッシュをするなどです.

Listは簡単なキュー(ポイント対ポイント)を実現


要素rpush/lpushを追加

ノンブロッキングモード
lpop/rpop

プレゼンテーションキューrpush/lpopを使用してキューを右に左に移動
#     
rpush name value1 #。。。。
#    
lpop name 

キュー操作クラス実装
package com.ming.base.queue;

import com.ming.core.utils.SpringBeanManager;
import org.springframework.data.redis.core.StringRedisTemplate;

/**
 * redis list    queue
 *    rpush lpop     
 *
 * @author ming
 * @date 2019-09-03 14:56:07
 */
public class RedisListQueue {

    /**
     * push           
     *
     * @param queueName     
     * @param value         
     * @author ming
     * @date 2019-09-03 14:57:30
     */
    public static void push(String queueName, String value) {
        getClient().opsForList().rightPush(queueName, value);
    }

    /**
     *          
     *
     * @param queueName     
     * @return String   
     * @author ming
     * @date 2019-09-03 14:58:30
     */
    public static String pop(String queueName) {
        return getClient().opsForList().leftPop(queueName);
    }

    /**
     *      
     *
     * @return StringRedisTemplate
     * @author ming
     * @date 2019-09-03 15:03:32
     */
    private static StringRedisTemplate getClient() {
        return SpringBeanManager.getBean(StringRedisTemplate.class);
    }
}


キューテストケース
package com.ming.base.queue;

import com.ming.Start;
import org.apache.commons.lang.StringUtils;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * redis list queue  
 *
 * @author ming
 * @date 2019-09-03 15:10:36
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Start.class)
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class RedisListQueueTest {
    private static final String QUEUE_NAME = "ming";

    /**
     *             100       
     *
     * @author ming
     * @date 2019-09-03 15:12:57
     */
    @Test
    public void aTestPush() {
        // 100        
        for (int i = 0; i < 10000; i++) {
            RedisListQueue.push(QUEUE_NAME, "value" + i);
        }
    }

    /**
     *           
     *              
     *
     * @author ming
     * @date 2019-09-03 15:13:10
     */
    @Test
    public void bTestPop() throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                String tmp = RedisListQueue.pop(QUEUE_NAME);
                System.out.println("  t1:" + i + "::" + tmp);
                if (StringUtils.isEmpty(tmp)) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return;
                }
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                String tmp = RedisListQueue.pop(QUEUE_NAME);
                System.out.println("  t2:" + i + "::" + tmp);
                if (StringUtils.isEmpty(tmp)) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return;
                }
            }
        });
        //  t2      
        t2.setPriority(6);

        t1.start();
        t2.start();

        Thread.sleep(10000L);
    }
}

ブロックモード
blockブロックlist要素blpop/brpop命令フォーマットblpop name timeoutの取得

プレゼンテーションキューrpush/blpopメソッドを使用してキューを作成
#     
rpush name value1 #。。。。
#  10s    
blpop name  10 

キュー操作クラス実装
package com.ming.base.queue;

import com.ming.core.utils.SpringBeanManager;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

/**
 * redis list      queue
 *    rpush lpop     
 *
 * @author ming
 * @date 2019-09-03 14:56:07
 */
public class RedisListBlockQueue {

    /**
     * push           
     *
     * @param queueName     
     * @param value         
     * @author ming
     * @date 2019-09-03 14:57:30
     */
    public static void push(String queueName, String value) {
        getClient().opsForList().rightPush(queueName, value);
    }

    /**
     *                            
     *
     * @param queueName     
     * @param timeout       
     * @param timeUnit      
     * @return String   
     * @author ming
     * @date 2019-09-03 14:58:30
     */
    public static String blockPop(String queueName, long timeout, TimeUnit timeUnit) {
        return getClient().opsForList().leftPop(queueName, timeout, timeUnit);
    }

    /**
     *      
     *
     * @return StringRedisTemplate
     * @author ming
     * @date 2019-09-03 15:03:32
     */
    private static StringRedisTemplate getClient() {
        return SpringBeanManager.getBean(StringRedisTemplate.class);
    }
}


キューテストケース
package com.ming.base.queue;

import com.ming.Start;
import org.apache.commons.lang.StringUtils;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

/**
 * redis list block  queue  
 *
 * @author ming
 * @date 2019-09-03 15:10:36
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Start.class)
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class RedisListBlockQueueTest {
    private static final String QUEUE_NAME = "ming";

    /**
     *             100       
     *
     * @author ming
     * @date 2019-09-03 15:12:57
     */
    @Test
    public void aTestPush() {
        // 100        
        for (int i = 0; i < 5; i++) {
            RedisListQueue.push(QUEUE_NAME, "value" + i);
        }
    }

    /**
     *           
     *              
     *
     * @author ming
     * @date 2019-09-03 15:13:10
     */
    @Test
    public void bTestBlockPop() throws InterruptedException {
        Thread t1 = new Thread(() -> {
            while (true){
                //         10s
                String tmp = RedisListBlockQueue.blockPop(QUEUE_NAME, 10, TimeUnit.SECONDS);
                System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"  t1:" + "::" + tmp);
            }
        });
        Thread t2 = new Thread(() -> {
            while (true){
                String tmp = RedisListBlockQueue.blockPop(QUEUE_NAME, 10, TimeUnit.SECONDS);
                System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"  t2:" + "::" + tmp);
            }
        });
        //  t2      
        t2.setPriority(6);

        t1.start();
        t2.start();

        Thread.sleep(1000000L);
    }
}

redisのpublishとsubscribe(点対多)


redisはlistをキューに使用してパブリッシュサブスクリプションモードを迅速に実行できないことを補うための簡単なパブリケーションサブスクリプションシステムを提供します.
コマンド#コマンド#
書式設定
コメント
publish
publish channelName message
メッセージを受信した購読者の数を返します.
subscribe
subscribe channelName
チャネルの購読
unsubscribe
unsubscribe channelName
キャンセルチャンネル
psubscribe
psubscribe channelPattern
channelPattern正則に従ってルールに合致するchannelを購読する
punsubscribe
punsubscribe channelPattern
channelPatternルールに従ってルールに合致するchannelの購読をキャンセル
pubsub
pubsub subcommand arg
パブリッシュサブスクリプションシステムのステータスの表示
  • pubsub詳細
  • subcommand
    arg
    説明
    CHANNELS
    [pattern]
    指定されたモードpatternのアクティブなチャネルを返し、SUBSCRIBEによって購読されたチャネルを返すことを指定します.
    NUMSUB
    channel channel2 ...
    指定したチャンネルの購読数を返します
    NUMPAT
    -
    サブスクリプション・モードの数を返します.注意:このコマンドは、サブスクリプション・モードのクライアントの数ではなく、クライアントがサブスクリプションしたすべてのモードの数の合計を返します.

    JAvaコードの例

    package com.ming.base.queue;
    
    import com.ming.Start;
    import org.junit.FixMethodOrder;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.junit.runners.MethodSorters;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.nio.charset.StandardCharsets;
    
    /**
     * redis pub sub  
     *
     * @author ming
     * @date 2019-09-03 15:10:36
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Start.class)
    @FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
    public class RedisPubSubTest {
        private static final String CHANNEL_NAME = "ming";
    
        @Autowired
        private RedisTemplate stringRedisTemplate;
    
        /**
         *       
         *
         * @author ming
         * @date 2019-09-03 15:12:57
         */
        @Test
        public void bTestPub() {
            for (int i = 0; i < 10; i++) {
                stringRedisTemplate.convertAndSend(CHANNEL_NAME, "mingtest" + i + "::" + System.currentTimeMillis());
            }
        }
    
        /**
         *        
         *            
         *                bTestPub     
         *
         * @author ming
         * @date 2019-09-03 15:13:10
         */
        @Test
        public void aTestSub() throws InterruptedException {
            Thread t1 = new Thread(() -> stringRedisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> System.out.println("t1-message:" + message), CHANNEL_NAME.getBytes(StandardCharsets.UTF_8)));
            Thread t2 = new Thread(() -> stringRedisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> System.out.println("t2-message:" + message), CHANNEL_NAME.getBytes(StandardCharsets.UTF_8)));
            t1.start();
            t2.start();
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    

    まとめ


    本にはzsetで実現されたキューもありますが、redisのキューに誤差を許容する速い案を作る必要はないと思いますが、キューには速度、安定性、成熟したmq kafkaやrabbitを直接使う必要があります.