Springboot統合redisメッセージパブリケーションサブスクリプションモード-マルチサーバ間

39458 ワード


環境:SpringBoot+jdk 1.8 
 
基本構成リファレンスhttps://blog.csdn.net/llll234/article/details/80966952
 
インフラストラクチャを確認すると、いくつかの問題が発生します.
1.実際のアプリケーションでは複数のチャネルを購読する可能性がありますが、この書き方はあまり一般的ではありません.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));
2.使用中にnew RedisPmpSub()を使用してメッセージ受信オブジェクトを構成すると問題が発生します.RedisPmpSubがメッセージ受信クラスであり、メッセージ処理クラスでもある場合.では、Beanを注入する必要がある場合は、成功しますか?
3.後期の拡張性を考慮して、既存コードをできるだけ変更しないように拡張する
 
追加のプロファイル

    org.projectlombok
    lombok
    true



    commons-lang
    commons-lang
    RELEASE



    com.google.code.gson
    gson

GsonUtilはあるSDKに依存しているため、GsonUtil.toJson(this,BasePubMessage.class)はnew Gson()に置き換えることができる.toJson(this, BasePubMessage.class);lombokはプラグインをダウンロードする必要があります
 
パブリッシャ
 
列挙定義
保守性を考慮して、パイプRedisChannelEnumsを列挙して定義します.
 1 public enum RedisChannelEnums {
 2 
 3     /**redis  code           */
 4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","      "),
 5 
 6     ;
 7     /**     +   */
 8     private String code;
 9     private String description;
10 
11     RedisChannelEnums(String code, String description) {
12         this.code = code;
13         this.description = description;
14     }
15 
16 
17     /**   code          */
18     public static RedisChannelEnums getEnum(String code) {
19         RedisChannelEnums[] values = RedisChannelEnums.values();
20         if (null != code && values.length > 0) {
21             for (RedisChannelEnums value : values) {
22                 if (value.code == code) {
23                     return value;
24                 }
25             }
26         }
27         return null;
28     }
29 
30     /**  code     code       */
31     public static boolean containsCode(String code) {
32         RedisChannelEnums anEnum = getEnum(code);
33         return anEnum != null;
34     }
35 
36     /**   code     code     */
37     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
38         return calendarSourceEnum.code == code;
39     }
40 
41 
42     public String getCode() {
43         return code;
44     }
45 
46     public String getDescription() {
47         return description;
48     }
49 
50 
51 }

 
メッセージテンプレート
異なるビジネスシーンを互換化するには、メッセージテンプレートオブジェクトBasePubMessageを定義する必要があります.ToStringメソッドの役割は、オブジェクトをJson文字に変換することです.
 1 @Data
 2 public abstract class BasePubMessage {
 3 
 4     /**        */
 5     protected String channel;
 6 
 7     protected String extra;
 8 
 9     @Override
10     public String toString() {
11         return GsonUtil.toJson(this, BasePubMessage.class);
12     }
13 
14 }

 
メッセージオブジェクトLiveChangeMessageでは、ToStringメソッドの役割は、オブジェクトをJson文字に変換することです.
 1 @Data
 2 public class LiveChangeMessage extends BasePubMessage {
 3 
 4 
 5     /**  Ids*/
 6     private String liveIds;
 7 
 8     @Override
 9     public String toString() {
10         return GsonUtil.toJson(this, LiveChangeMessage.class);
11     }
12 
13 }

 
パブリッシャサービス
public interface RedisPub {


    /**
     *   redis          -   
     * @param redisChannelEnums     
     * @param basePubMessage   
     */
    void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage);

}

 
 1 @Service
 2 public class RedisPubImpl implements RedisPub {
 3 
 4     @Resource
 5     private StringRedisTemplate stringRedisTemplate;
 6 
 7     @Override
 8     public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) {
 9 
10         if(redisChannelEnums ==null || basePubMessage ==null){
11             return;
12         }
13 
14         basePubMessage.setChannel(redisChannelEnums.getCode());
15         stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString());
16         System.out.println("    !");
17     }
18 }

 
購読者
 
注記の構成
RedisConfigはサブスクライバの構成クラスとして、主な役割は、Redisメッセージリスナーコンテナ、メッセージ受信処理クラスの構成と同時に新たに追加された機能が、上記のいくつかの問題を解決することである.
 1 @Service
 2 @Configuration
 3 @EnableCaching
 4 public class RedisConfig {
 5 
 6 
 7     /**
 8      *       
 9      * classInstanceMap : key-beanName value-       
10      */
11     private ConcurrentHashMap classInstanceMap = new ConcurrentHashMap<>(20);
12 
13     /**
14      *        Strategy   Bean
15      *
16      * @param strategyMap
17      *             
18      */
19     @Autowired
20     public RedisConfig(Map strategyMap) {
21         this.classInstanceMap.clear();
22         strategyMap.forEach((k, v) ->
23                 this.classInstanceMap.put(k.toLowerCase(), v)
24         );
25     }
26 
27 
28     /**
29      * Redis       
30      *
31      * @param connectionFactory
32      *
33      * @return
34      */
35     @Bean
36     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
37 
38         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
39         container.setConnectionFactory(connectionFactory);
40 
41         RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values();
42         if (redisChannelEnums.length > 0) {
43             for (RedisChannelEnums redisChannelEnum : redisChannelEnums) {
44                 if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) {
45                     continue;
46                 }
47                 //      pmp channel    ,   
48                 //             ,         RedisChannelEnums  +BaseSub   
49 
50                 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
51                 BaseSub baseSub = classInstanceMap.get(toLowerCase);
52                 container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
53             }
54         }
55         return container;
56     }
57 
58     /**
59      *          
60      *
61      * @param baseSub
62      *                 
63      *
64      * @return MessageListenerAdapter
65      */
66     @Bean()
67     @Scope("prototype")
68     MessageListenerAdapter listenerAdapter(BaseSub baseSub) {
69         //       messageListenerAdapter             ,         “receiveMessage”
70         //         ,              handleMessage           
71         //  2           receiveMessage
72         return new MessageListenerAdapter(baseSub, "receiveMessage");
73     }
74 
75 }

  
@Autowired
public RedisConfig(Map strategyMap)                        ,                      。
             

 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
BaseSub baseSub = classInstanceMap.get(toLowerCase);
container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
                ,              。               。
           redisChannelEnum    ,             。
    ,           

  
列挙
RedisChannelEnumsの役割:異なるパイプに対応する購読者を定義し、後でパイプタイプを追加するには列挙を1つ追加するだけでよい
 1 public enum RedisChannelEnums {
 2 
 3     /**redis               */
 4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "      "),
 5 
 6     ;
 7     /**     +   */
 8     private String code;
 9     private Class extends BaseSub> className;
10     private String description;
11 
12     RedisChannelEnums(String code, Class extends BaseSub> className, String description) {
13         this.code = code;
14         this.className=className;
15         this.description = description;
16     }
17 
18 
19     /**   code          */
20     public static RedisChannelEnums getEnum(String code) {
21         RedisChannelEnums[] values = RedisChannelEnums.values();
22         if (null != code && values.length > 0) {
23             for (RedisChannelEnums value : values) {
24                 if (value.code == code) {
25                     return value;
26                 }
27             }
28         }
29         return null;
30     }
31 
32     /**  code     code       */
33     public static boolean containsCode(String code) {
34         RedisChannelEnums anEnum = getEnum(code);
35         return anEnum != null;
36     }
37 
38     /**   code     code     */
39     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
40         return calendarSourceEnum.code == code;
41     }
42 
43 
44     public String getCode() {
45         return code;
46     }
47 
48     public String getDescription() {
49         return description;
50     }
51 
52     public Class extends BaseSub> getClassName() {
53         return className;
54     }
55 }

 
メッセージテンプレート
BaseSubMessageは共通のフィールドを定義し、json文字との共通変換
 1 @Data
 2 abstract class BaseSubMessage {
 3 
 4     /**          */
 5     private String channel;
 6 
 7     private String extra;
 8 
 9     private String json;
10 
11     BaseSubMessage(String json) {
12         if(StringUtils.isEmpty(json)){
13             return;
14         }
15 
16         this.json = json;
17         Map map = new Gson().fromJson(this.json, Map.class);
18         BeanHelper.populate(this, map);
19     }
20 
21 }

 
LiveChangeMessage現在のビジネスシーンを定義するフィールド
 1 @Data
 2 @ToString(callSuper = true)
 3 public class LiveChangeMessage extends BaseSubMessage {
 4 
 5     /**   Ids */
 6     private String liveIds;
 7 
 8     public LiveChangeMessage(String json) {
 9         super(json);
10     }
11 
12 }

 
 
購読者サービス
BaseSubは、メッセージを受信するための一般的な方法を定義します.
1 public interface BaseSub {
2 
3     /**
4      *     
5      * @param jsonMessage  json  
6      */
7     void receiveMessage(String jsonMessage);
8 }

 
LiveChangeSubメッセージ受信オブジェクト
 1 @Component
 2 public class LiveChangeSub implements BaseSub {
 3 
 4     /***/
 5     @Autowired
 6     private CategoryMapper categoryMapper;
 7     
 8     @Override
 9     public void receiveMessage(String jsonMessage) {
10 
11         System.out.println("  aries-server.....................");
12         //            RedisConfig2 listenerAdapter MessageListenerAdapter  2  
13         System.out.println("   LiveChangeSub" + "-----" + jsonMessage);
14 
15         LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage);
16         System.out.println(liveChangeMessage);
17         
18         Category category = categoryMapper.get(1L);
19         System.out.println("category:" + category);
20 
21 
22     }
23 }

 
 
まとめ
       :      ,     ,A redis     
       :             ,     ,A redis     
    :     、         。         ,         。           ,  
             。                             。

  :    ,   
  :    redis  ,   redis       。        ,      。