redis非同期メッセージ処理の実装
詳細
メッセージのサブスクリプションとパブリケーションをredisで実装する方法
テストクラス
/**
メッセージのサブスクリプションとパブリケーションをredisで実装する方法
テストクラス
/**
*redis
*/
@Controller
@RequestMapping(value="/queue")
publicclassTestControlller{
@Autowired
privateRedisTemplate redisService;
static{
newTestControlller().initThread("msgQueue");
}
/** - **/
public void initThread(String key){
newThread(new blpopMessageHandler(key)).start();
}
/** -- **/
@RequestMapping(value="/brpush")
@UnSession
@ResponseBody
publicboolean brpush(String key,String name,int age)throwsException{
boolean flag = redisService.rpush(key,JSON.toJSONString(newEdu(name,age)));
return flag;
}
/** **/
class blpopMessageHandler implementsRunnable{
privateString key;
privateRedisTemplate redisTemplate=(RedisTemplate)SpringContextUtil.getBean("redisTemplate");
publicString getKey(){
return key;
}
publicvoid setKey(String key){
this.key = key;
}
public blpopMessageHandler(String key){
this.key = key;
}
@Override
publicvoid run(){
do{
//
String result = redisTemplate.blpop(1000,key);
if(EmptyUtil.isNotEmpty(result)){
//do something
Edu edu = JSON.parseObject(result,Edu.class);
System.out.println("【 】"+edu.getName()+"---->"+edu.getAge());
}
}while(true);
}
}
}
model
publicclassEduimplementsSerializable{
privatestaticfinallong serialVersionUID =-6336530413316596246L;
privateString name;
privateint age;
publicString getName(){
return name;
}
publicvoid setName(String name){
this.name = name;
}
publicint getAge(){
return age;
}
publicvoid setAge(int age){
this.age = age;
}
publicEdu(String name,int age){
this.name = name;
this.age = age;
}
@Override
publicString toString(){
return"Edu{"+
"name='"+ name +'\''+
", age="+ age +
'}';
}
}
redis
/**
* Jedis ,
*/
@Component
publicclassRedisTemplate{
privatestaticLogger logger =LoggerFactory.getLogger(RedisTemplate.class);
// pool jedis
privatePool<Jedis> jedisPool;
publicRedisTemplate(Pool<Jedis> jedisPool){
this.jedisPool = jedisPool;
}
/**
* REDIS
* @param key reids
* @param value
*/
publicboolean rpush(String key,String value){
Jedis jedis =null;
boolean flag =false;
try{
jedis = jedisPool.getResource();
jedis.rpush(key.getBytes("utf-8"),value.getBytes("utf-8"));
flag =true;
}catch(Exception e){
// redis
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//
close(jedis);
}
return flag;
}
/**
*
* @param key
* @return
*/
publicString lpop(String key){
byte[] bytes =null;
Jedis jedis =null;
try{
jedis = jedisPool.getResource();
bytes = jedis.lpop(key.getBytes("utf-8"));
}catch(Exception e){
// redis
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//
close(jedis);
}
if(EmptyUtil.isNotEmpty(bytes)){
try{
returnnewString(bytes,"utf-8");
}catch(UnsupportedEncodingException e){
e.printStackTrace();
}
}
returnnull;
}
/**
* list
* @param timeout
* @param key
* @return
*/
publicString blpop(int timeout,String key){
List<String> list =newArrayList<>();
Jedis jedis =null;
try{
jedis = jedisPool.getResource();
list = jedis.blpop(timeout,key);
}catch(Exception e){
// redis
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//
close(jedis);
}
returnEmptyUtil.isNotEmpty(list)&& list.size()>1? list.get(1):null;
}
privatevoid close(Jedis jedis){
try{
jedisPool.returnResource(jedis);
}catch(Exception e){
if(jedis.isConnected()){
jedis.quit();
jedis.disconnect();
}
}
}
}