redis非同期メッセージ処理の実装


詳細
 
メッセージのサブスクリプションとパブリケーションをredisで実装する方法
 
テストクラス
/** *redis */ @Controller @RequestMapping(value="/queue") publicclassTestControlller{
  • @Autowired
  • privateRedisTemplate redisService;
  • static{
  • newTestControlller().initThread("msgQueue"); }
  • /** - **/
  • public void initThread(String key){ newThread(new blpopMessageHandler(key)).start(); }
  • /** -- **/
  • @RequestMapping(value="/brpush") @UnSession @ResponseBody publicboolean brpush(String key,String name,int age)throwsException{ boolean flag = redisService.rpush(key,JSON.toJSONString(newEdu(name,age))); return flag; }
  • /** **/
  • class blpopMessageHandler implementsRunnable{
  • privateString key;
  • privateRedisTemplate redisTemplate=(RedisTemplate)SpringContextUtil.getBean("redisTemplate");
  • publicString getKey(){
  • return key; }
  • publicvoid setKey(String key){
  • this.key = key; }
  • public blpopMessageHandler(String key){
  • this.key = key; }
  • @Override
  • publicvoid run(){
  • do{
  • //
  • String result = redisTemplate.blpop(1000,key);
  • if(EmptyUtil.isNotEmpty(result)){
  • //do something
  • Edu edu = JSON.parseObject(result,Edu.class);
  • System.out.println("【 】"+edu.getName()+"---->"+edu.getAge());
  • }
  • }while(true);
  • }
  • }
  • }
  • model

    1.  
    2. publicclassEduimplementsSerializable{
    3. privatestaticfinallong serialVersionUID =-6336530413316596246L;
    4. privateString name;
    5. privateint age;
    6. publicString getName(){
    7. return name; }
    8. publicvoid setName(String name){
    9. this.name = name; }
    10. publicint getAge(){
    11. return age; }
    12. publicvoid setAge(int age){
    13. this.age = age; }
    14. publicEdu(String name,int age){
    15. this.name = name; this.age = age; }
    16. @Override
    17. publicString toString(){ return"Edu{"+ "name='"+ name +'\''+ ", age="+ age + '}'; } }

      redis

      1.  
      2. /** * Jedis , */ @Component publicclassRedisTemplate{ privatestaticLogger logger =LoggerFactory.getLogger(RedisTemplate.class);
      3. // pool jedis
      4. privatePool<Jedis> jedisPool;
      5. publicRedisTemplate(Pool<Jedis> jedisPool){
      6. this.jedisPool = jedisPool; }
      7. /**
      8. * REDIS * @param key reids * @param value */ publicboolean rpush(String key,String value){
      9. Jedis jedis =null;
      10. boolean flag =false;
      11. try{ jedis = jedisPool.getResource(); jedis.rpush(key.getBytes("utf-8"),value.getBytes("utf-8")); flag =true; }catch(Exception e){ // redis jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ // close(jedis); }
      12. return flag;
      13. }
      14. /**
      15. * * @param key * @return */ publicString lpop(String key){
      16. byte[] bytes =null;
      17. Jedis jedis =null; try{
      18. jedis = jedisPool.getResource();
      19. bytes = jedis.lpop(key.getBytes("utf-8"));
      20. }catch(Exception e){
      21. // redis
      22. jedisPool.returnBrokenResource(jedis); e.printStackTrace();
      23. }finally{
      24. //
      25. close(jedis);
      26. }
      27. if(EmptyUtil.isNotEmpty(bytes)){ try{ returnnewString(bytes,"utf-8"); }catch(UnsupportedEncodingException e){ e.printStackTrace(); } }
      28. returnnull;
      29. }
      30. /**
      31. * list * @param timeout * @param key * @return */ publicString blpop(int timeout,String key){
      32. List<String> list =newArrayList<>();
      33. Jedis jedis =null;
      34. try{
      35. jedis = jedisPool.getResource();
      36. list = jedis.blpop(timeout,key);
      37. }catch(Exception e){
      38. // redis
      39. jedisPool.returnBrokenResource(jedis); e.printStackTrace();
      40. }finally{
      41. //
      42. close(jedis);
      43. }
      44. returnEmptyUtil.isNotEmpty(list)&& list.size()>1? list.get(1):null; }
      45. privatevoid close(Jedis jedis){
      46. try{ jedisPool.returnResource(jedis); }catch(Exception e){ if(jedis.isConnected()){ jedis.quit(); jedis.disconnect(); } } }
      47. }