Springbootアプリケーションredisによる分散キュー購読傍受サービスの実現


redisはredisリストという軽量レベルのキュー購読傍受サービスを提供し、redisキューの特徴1はkafkaなどのキューメッセージサービスに対して、redisキューはメモリ操作に基づいているため、より高速である2処理されたデータ量がkafkaより大きい3は、いくつかの異常なシーンでメッセージを失う可能性がある.
Springboot redisキュー購読傍受を実現
pomファイル
  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
            <version>1.4.7.RELEASE</version>
        </dependency>

redisキューメッセージパブリッシュクラスRedisSender
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
 *    
 */
@Service
public class RedisSender {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    //          
    public void sendChannelMess(String channel, String message) {
        stringRedisTemplate.convertAndSend(channel, message);
    }
}

redisキューリスニングクラスRedisSubListenerConfigの購読
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import java.util.concurrent.*;

@Configuration
public class RedisSubListenerConfig {
    //      
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("seckill"));
        /**
         *         ,             ,          ,        
         */
        ThreadFactory factory = new ThreadFactoryBuilder()
                .setNameFormat("redis-listener-pool-%d").build();
        Executor executor = new ThreadPoolExecutor(
                1,
                1,
                5L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                factory);
        container.setTaskExecutor(executor);
        container.setSubscriptionExecutor(Executors.newFixedThreadPool(1));
        return container;
    }
    //                   
    @Bean
    MessageListenerAdapter listenerAdapter(RedisConsumer redisReceiver) {
        return new MessageListenerAdapter(redisReceiver, "receiveMessage");
    }
    //          redis    
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}


redisキューコンシューマクラスRedisConsumer
import com.seckilldemo.entity.Result;
import com.seckilldemo.redis.RedisUtil;
import com.seckilldemo.service.ISeckillService;
import com.seckilldemo.util.SeckillStatEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 *    
 */
@Service
public class RedisConsumer {
    private static final Logger logger = LoggerFactory.getLogger(RedisConsumer.class);
    @Autowired
    private ISeckillService seckillService;
    @Autowired
    private RedisUtil redisUtil;

    public void receiveMessage(String message) {
        Thread th=Thread.currentThread();
        System.out.println("Tread name:"+th.getName());
        //               (  )
        String[] array = message.split(";");
        if(redisUtil.getValue(array[0])==null){//control      ,           
            Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
            if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
                logger.info("    ");

            }else{
                logger.info("    ");
                redisUtil.cacheValue(array[0], "ok");//    
            }
        }else{
            logger.info("    ");
        }
    }
}


controller
  @PostMapping("/startRedisQueue")
    public Result startRedisQueue(long seckillId,long userId){
        try {
            redisUtil.cacheValue(seckillId+"", null);//    
            if(redisUtil.getValue(seckillId+"")==null){
                //           ws
                redisSender.sendChannelMess("seckill",seckillId+";"+userId);
                return Result.ok();
            }else{
                return Result.error();
            }

        }catch (Exception e){
            System.out.println(e);
        }
        return Result.error();
    }

プロファイルアプリケーション.properties
# Redis
#      (   0)
spring.redis.database=0  
#             
spring.redis.host=111.229.94.187
#        
spring.redis.port=6379
#        (    )         
spring.redis.password=redis123
#         (          )
spring.redis.jedis.pool.max-active=-1
#            (          )
spring.redis.jedis.pool.max-wait=-1
#            
spring.redis.jedis.pool.max-idle=100
#            
spring.redis.jedis.pool.min-idle=0
#       (  )
spring.redis.timeout=3000ms