SpringBoot統合Redisメッセージ配信サブスクリプション

9090 ワード

前言:
  • SpringBoot統合Redisここでは
  • については触れない.
  • は紙面を減らすため、一部のコード(使用に影響しない)
  • のみである.
  • 本文はlombokプラグインとlogbackログに基づいているので、いくつかの注釈が間違っているか、インポートできない場合は、基本的にこの2つの問題
  • です.
  • を噴霧しないでください.
    本文
  • Redis定数配置(個人の習慣、本類は需要によって決まる)
    public class RedisConstant {
        /**
       	 *         
       	 */
       	public static final String TOPIC_PRAISE = "TOPIC_PRAISE";
       	/**
       	 *         
       	 */
       	public static final String TOPIC_COLLECT = "TOPIC_COLLECT";
       	/**
       	 *         
       	 */
       	public static final String TOPIC_COMMENT = "TOPIC_COMMENT";
       	/**
       	 *         
       	 */
       	public static final String TOPIC_FOCUS = "TOPIC_FOCUS";
    }
    
  • .
  • メッセージプロセッサ
    package com.java.single.service.impl;
    import java.util.concurrent.CountDownLatch;
    import com.java.single.entity.res.MessageCollect;
    import com.java.single.entity.res.MessageComment;
    import com.java.single.entity.res.MessageFocus;
    import com.java.single.util.PushUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    @Service
    public class ReceiverServiceImpl {
    
    	private CountDownLatch latch;
    
    	@Autowired
    	public ReceiverServiceImpl(CountDownLatch latch) {
    		this.latch = latch;
    	}
    
    	public void praiseReceive(MessagePraise messagePraise) {
    		log.info("      :[{}]", messagePraise);
    		PushUtil.pushOne(messagePraise.getToUserId(), "   ",
    				messagePraise.getUserName() + "    " + messagePraise.getTitle() + "   ");
    		latch.countDown();
    	}
    
    	public void collectReceive(MessageCollect messageCollect) {
    		log.info("      :[{}]", messageCollect);
    		PushUtil.pushOne(messageCollect.getToUserId(), "   ",
    				messageCollect.getUserName() + "     " + messageCollect.getTitle() + "   ");
    		latch.countDown();
    	}
    
    	public void commentReceive(MessageComment messageComment) {
    		log.info("      :[{}]", messageComment);
    		PushUtil.pushOne(messageComment.getToUserId(), "      ",
    				messageComment.getUserName() + " " + messageComment.getTitle() + "     :" + messageComment.getText());
    		latch.countDown();
    	}
    
    	public void focusReceive(MessageFocus messageFocus) {
    		log.info("      :[{}]", messageFocus);
    		PushUtil.pushOne(messageFocus.getToUserId(), "   ", messageFocus.getUserName() + "    ");
    		latch.countDown();
    	}
    }
    
    は、上述したメッセージプロセッサであり、現在は不要である.PushUtilは個人パッケージのプッシュメッセージプッシュツールクラスであり、削除し、自分の論理に基づいて関連操作を決定することができる.上記は全部で4つのテーマを書きましたが、そのうちの1つのテーマを例にとると、実際のプロジェクトでは自分のニーズに応じて
     package com.java.single.entity.res;
     import java.io.Serializable;
     import lombok.Data;
     import lombok.NoArgsConstructor;
     
     @Data
     @NoArgsConstructor
     public class MessageCollect implements Serializable {
     	/**
     	 * 
     	 */
     	private static final long serialVersionUID = 1L;
     	/**
     	 *       
     	 */
     	private String id;
     	/**
     	 *       
     	 */
     	private String userId;
     	/**
     	 *       
     	 */
     	private String userName;
     	/**
     	 *       
     	 */
     	private String title;
     	/**
     	 *       
     	 */
     	private String toUserId;
     	/**
     	 * 
     	 * @param id
     	 *                  
     	 * @param userId
     	 *                  
     	 * @param userName
     	 *                  
     	 * @param title
     	 *                  
     	 * @param toUserId
     	 *                  
     	 */
     	public MessageCollect(String id, String userId, String userName, String title, String toUserId) {
     		super();
     		this.id = id;
     		this.userId = userId;
     		this.userName = userName;
     		this.title = title;
     		this.toUserId = toUserId;
     	}
    }
    
    注意を変更し、このクラスには2つの注意点があります:1、シーケンス化を実現しなければなりません.2、パラメトリック構造方法が必要です.
  • 構成RedisMessageListener構成クラス
    import java.util.concurrent.CountDownLatch;
    import com.java.single.constant.RedisConstant;
    import com.java.single.service.impl.ReceiverServiceImpl;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    @Configuration
    public class RedisMessageListener {
    
    	/**
    	 * redis       
    	 * 
    	 * @param connectionFactory
    	 * @param praiseListenerAdapter
    	 *                     
    	 * @param collectListenerAdapter
    	 *                     
    	 * @param commentListenerAdapter
    	 *                     
    	 * @param focusListenerAdapter
    	 *                     
    	 * @return
    	 */
    	@Bean
    	RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
    			MessageListenerAdapter praiseListenerAdapter, MessageListenerAdapter collectListenerAdapter,
    			MessageListenerAdapter commentListenerAdapter, MessageListenerAdapter focusListenerAdapter) {
    		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    		container.setConnectionFactory(connectionFactory);
    		
    		//              ,             String  ,             String  
    		Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(
    				Object.class);
    		ObjectMapper objectMapper = new ObjectMapper();
    		objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    		objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    		jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    		
    		//                      
    		
    		praiseListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
    		//               
    		container.addMessageListener(praiseListenerAdapter, new PatternTopic(RedisConstant.TOPIC_PRAISE));
    		//               
    		collectListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
    		container.addMessageListener(collectListenerAdapter, new PatternTopic(RedisConstant.TOPIC_COLLECT));
    		//               
    		commentListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
    		container.addMessageListener(commentListenerAdapter, new PatternTopic(RedisConstant.TOPIC_COMMENT));
    		//               
    		focusListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
    		container.addMessageListener(focusListenerAdapter, new PatternTopic(RedisConstant.TOPIC_FOCUS));
    		return container;
    	}
    
    	/**
    	 *          ,       
    	 * 
    	 * @param receiver
    	 * @return
    	 */
    	@Bean
    	MessageListenerAdapter praiseListenerAdapter(ReceiverServiceImpl receiver) {
    		MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "praiseReceive");
    		return messageListenerAdapter;
    	}
    
    	/**
    	 *          ,       
    	 * 
    	 * @param receiver
    	 * @return
    	 */
    	@Bean
    	MessageListenerAdapter collectListenerAdapter(ReceiverServiceImpl receiver) {
    		MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "collectReceive");
    		return messageListenerAdapter;
    	}
    
    	/**
    	 *          ,       
    	 * 
    	 * @param receiver
    	 * @return
    	 */
    	@Bean
    	MessageListenerAdapter commentListenerAdapter(ReceiverServiceImpl receiver) {
    		MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "commentReceive");
    		return messageListenerAdapter;
    	}
    
    	/**
    	 *          ,       
    	 * 
    	 * @param receiver
    	 * @return
    	 */
    	@Bean
    	MessageListenerAdapter focusListenerAdapter(ReceiverServiceImpl receiver) {
    		MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "focusReceive");
    		return messageListenerAdapter;
    	}
    
    	@Bean
    	ReceiverServiceImpl receiver(CountDownLatch latch) {
    		return new ReceiverServiceImpl(latch);
    	}
    
    	@Bean
    	CountDownLatch latch() {
    		return new CountDownLatch(1);
    	}
    }
    
    注釈は十分であるはずです.シーケンス化メソッドを変更しなければならないことに注意してください.そうしないと、デフォルトでStringタイプのシーケンス化メソッドが使用されます.ReceiverServiceImpleメソッドはStringタイプのパラメータを受信しても、指定したクラスに再変換する必要があります!!!
  • パブリッシュメッセージ
    redisTemplate.convertAndSend(RedisConstant.TOPIC_COLLECT,
    					new MessageCollect(id, userPublish.getUserId(), userName, title, userId));
    
    は、使用時にコンストラクション関数のパラメータを独自に設定する必要があります.これにより、プロジェクト起動後に上記のメソッドを実行すると、ReceiverServiceImpleのcollectReceiveメソッドが自動的にこのメッセージを処理し、どのメソッドがどのような操作を実行するかを決定する構成がRedisMessageListenerで実現されます.

  • ログ印刷:
          :[MessageCollect(id=c8738aed3ef9421898ebb4be62a11111, userId=6e5c7e33cbd34453b3273ed775e11111, userName=single_cong, title=CSDN  , toUserId=d8e87f6615f64368a53bd6b4dbebc111)]
    

    まとめ
    ネット上のほとんどはStringタイプのメッセージ配信サブスクリプションで、現実的なニーズを満たすことはできないに違いないので、シーケンス化方式をカスタマイズし、テーマと対応する処理方法の構成を実現するには、ここで2点に注意する必要があります.1つ目はシーケンス化タイプの選択です.2つ目は、ここでシーケンス化されたクラスにパラメトリック構造法が必要である(MyBatisが結果セットを返すときに、時報構造関数が存在しない場合にパラメトリック構造関数を追加すれば解決できるような感じがする).