Springbootアプリケーションredisによる分散キュー購読傍受サービスの実現
redisはredisリストという軽量レベルのキュー購読傍受サービスを提供し、redisキューの特徴1はkafkaなどのキューメッセージサービスに対して、redisキューはメモリ操作に基づいているため、より高速である2処理されたデータ量がkafkaより大きい3は、いくつかの異常なシーンでメッセージを失う可能性がある.
Springboot redisキュー購読傍受を実現
pomファイル
redisキューメッセージパブリッシュクラスRedisSender
redisキューリスニングクラスRedisSubListenerConfigの購読
redisキューコンシューマクラスRedisConsumer
controller
プロファイルアプリケーション.properties
Springboot redisキュー購読傍受を実現
pomファイル
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.4.7.RELEASE</version>
</dependency>
redisキューメッセージパブリッシュクラスRedisSender
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
*
*/
@Service
public class RedisSender {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//
public void sendChannelMess(String channel, String message) {
stringRedisTemplate.convertAndSend(channel, message);
}
}
redisキューリスニングクラスRedisSubListenerConfigの購読
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.util.concurrent.*;
@Configuration
public class RedisSubListenerConfig {
//
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("seckill"));
/**
* , , ,
*/
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("redis-listener-pool-%d").build();
Executor executor = new ThreadPoolExecutor(
1,
1,
5L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
factory);
container.setTaskExecutor(executor);
container.setSubscriptionExecutor(Executors.newFixedThreadPool(1));
return container;
}
//
@Bean
MessageListenerAdapter listenerAdapter(RedisConsumer redisReceiver) {
return new MessageListenerAdapter(redisReceiver, "receiveMessage");
}
// redis
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
redisキューコンシューマクラスRedisConsumer
import com.seckilldemo.entity.Result;
import com.seckilldemo.redis.RedisUtil;
import com.seckilldemo.service.ISeckillService;
import com.seckilldemo.util.SeckillStatEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
*/
@Service
public class RedisConsumer {
private static final Logger logger = LoggerFactory.getLogger(RedisConsumer.class);
@Autowired
private ISeckillService seckillService;
@Autowired
private RedisUtil redisUtil;
public void receiveMessage(String message) {
Thread th=Thread.currentThread();
System.out.println("Tread name:"+th.getName());
// ( )
String[] array = message.split(";");
if(redisUtil.getValue(array[0])==null){//control ,
Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
logger.info(" ");
}else{
logger.info(" ");
redisUtil.cacheValue(array[0], "ok");//
}
}else{
logger.info(" ");
}
}
}
controller
@PostMapping("/startRedisQueue")
public Result startRedisQueue(long seckillId,long userId){
try {
redisUtil.cacheValue(seckillId+"", null);//
if(redisUtil.getValue(seckillId+"")==null){
// ws
redisSender.sendChannelMess("seckill",seckillId+";"+userId);
return Result.ok();
}else{
return Result.error();
}
}catch (Exception e){
System.out.println(e);
}
return Result.error();
}
プロファイルアプリケーション.properties
# Redis
# ( 0)
spring.redis.database=0
#
spring.redis.host=111.229.94.187
#
spring.redis.port=6379
# ( )
spring.redis.password=redis123
# ( )
spring.redis.jedis.pool.max-active=-1
# ( )
spring.redis.jedis.pool.max-wait=-1
#
spring.redis.jedis.pool.max-idle=100
#
spring.redis.jedis.pool.min-idle=0
# ( )
spring.redis.timeout=3000ms