Spring-data-redis:分散キュー

7902 ワード

Redisにおけるlistデータ構造は、「両端キュー」の特性を有し、redisは永続的なデータの能力を有するため、redisが分散キューを実現するのは非常に安全で信頼性が高い.JMSの「Queue」に似ていますが、機能と信頼性(トランザクション)はJMSほど厳しくありません.Redis自体の高性能と「便利な」分散設計(replicas,sharding)は、「分散キュー」を実現するために良好な基礎を提供することができる.
Redis内のキューがブロックされている場合、connection全体が他の操作を続行できないため、接続プールベースの設計には注意が必要です.
我々はspring-data-redisを通じて「同期キュー」を実現し、設計スタイルはJMSと似ている.しかし、この例では、キュー消費後のメッセージ確認メカニズムは提供されていません.興味があれば、自分で実現してみてください.
1)Redisにおける「キュー」は両端キューであり、listデータ構造に基づいて実現され、「キューブロック」機能を提供する.
2)redisを「分散キュー」serverとして使用し、データアクセスが密集している場合は、listデータ構造に関する制限を必ず構成してください.
// list          ,      linkedlist
//      /       ,      
list-max-ziplist-entries 5120
list-max-ziplist-value 1024

3)Redisは「キュー」の持続性を提供しており、追加の技術サポートを必要としない
4)Redisは、JMSの意味における「queue」メッセージの消費確認機能を提供していない.すなわち、キュー内のメッセージがredis-clientによって受信された後、「メッセージが到着したことを確認する」動作は実行されない.分散キューの場合は、厳格なメッセージ確認が必要であり、追加のテクニカルサポートが必要です.
5)RedisはJMSのように高度に中心化された「キュー」サービスクラスタを提供することができず、「高速/コンパクト/タイムリー消費」のシナリオに適している.
6)本例では、メッセージの受信は、1つのバックグラウンドスレッド(以下RedisQueueを参照)で行うが、実際にはスレッドプール方式で行うことで、パフォーマンスを向上させることができる.ただし、このシナリオでは、次の2つの前提に基づいています.
A)単一queueのメッセージが多く、各メッセージの処理時間が長い場合(すなわち、消費速度が受信速度より遅い)
B)このスレッドプールが複数のqueue共通スレッドリソースによって利用可能である場合、1つのqueueでスレッドプールが作成される場合、実に浪費的で安全でない問題がある.
C)マルチスレッド環境におけるqueueに対する操作は,クライアントレベルでキューの順序を乱すことにより異常が生じる可能性があることを確認する必要がある.例えばスレッド1がqueueからdata 1を取得する、スレッド2がqueueからdata 2を取得すると、スレッドスケジューリングの問題によりdata 2が優先的に実行する可能性がある.
 
一.プロファイル:

	
		
		
		
		
		
		
		
	
	
		
		
		
		
		
		
	
	
		
		
			
		
	
	
	
		
		
		
	

二.プログラムの例:
1)QueueListener:キューにデータがある場合、JMSと同様のコールバック操作を行うことができる.
public interface RedisQueueListener {

	public void onMessage(T value);
}
public class QueueListener implements RedisQueueListener {

	@Override
	public void onMessage(String value) {
		System.out.println(value);
		
	}

}

2)RedisQueue:キュー操作、内部パッケージredisTemplateインスタンス;「listener」が構成されている場合、queueは「メッセージコールバック」方式で実行され、listenerThreadは「キュー情報」を自動的に処理するバックグラウンドスレッドです.「listener」を構成しない場合は、redisQueueを他のspring beanに注入し、手動で「take」データを削除できます.
public class RedisQueue implements InitializingBean,DisposableBean{
	private RedisTemplate redisTemplate;
	private String key;
	private int cap = Short.MAX_VALUE;//       ,             
	private byte[] rawKey;
	private RedisConnectionFactory factory;
	private RedisConnection connection;//for blocking
	private BoundListOperations listOperations;//noblocking
	
	private Lock lock = new ReentrantLock();//    IO    
	
	private RedisQueueListener listener;//    
	private Thread listenerThread;
	
	private boolean isClosed;
	
	public void setRedisTemplate(RedisTemplate redisTemplate) {
		this.redisTemplate = redisTemplate;
	}

	public void setListener(RedisQueueListener listener) {
		this.listener = listener;
	}

	public void setKey(String key) {
		this.key = key;
	}
	

	@Override
	public void afterPropertiesSet() throws Exception {
		factory = redisTemplate.getConnectionFactory();
		connection = RedisConnectionUtils.getConnection(factory);
		rawKey = redisTemplate.getKeySerializer().serialize(key);
		listOperations = redisTemplate.boundListOps(key);
		if(listener != null){
			listenerThread = new ListenerThread();
			listenerThread.setDaemon(true);
			listenerThread.start();
		}
	}
	
	
	/**
	 * blocking
	 * remove and get last item from queue:BRPOP
	 * @return
	 */
	public T takeFromTail(int timeout) throws InterruptedException{ 
		lock.lockInterruptibly();
		try{
			List results = connection.bRPop(timeout, rawKey);
			if(CollectionUtils.isEmpty(results)){
				return null;
			}
			return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
		}finally{
			lock.unlock();
		}
	}
	
	public T takeFromTail() throws InterruptedException{
		return takeFromTail(0);
	}
	
	/**
	 *      ,  
	 */
	public void pushFromHead(T value){
		listOperations.leftPush(value);
	}
	
	public void pushFromTail(T value){
		listOperations.rightPush(value);
	}
	
	/**
	 * noblocking
	 * @return null if no item in queue
	 */
	public T removeFromHead(){
		return listOperations.leftPop();
	}
	
	public T removeFromTail(){
		return listOperations.rightPop();
	}
	
	/**
	 * blocking
	 * remove and get first item from queue:BLPOP
	 * @return
	 */
	public T takeFromHead(int timeout) throws InterruptedException{
		lock.lockInterruptibly();
		try{
			List results = connection.bLPop(timeout, rawKey);
			if(CollectionUtils.isEmpty(results)){
				return null;
			}
			return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
		}finally{
			lock.unlock();
		}
	}
	
	public T takeFromHead() throws InterruptedException{
		return takeFromHead(0);
	}

	@Override
	public void destroy() throws Exception {
		if(isClosed){
			return;
		}
		shutdown();
		RedisConnectionUtils.releaseConnection(connection, factory);
	}
	
	private void shutdown(){
		try{
			listenerThread.interrupt();
		}catch(Exception e){
			//
		}
	}
	
	class ListenerThread extends Thread {
		
		@Override
		public void run(){
			try{
				while(true){
					T value = takeFromHead();//cast exception? you should check.
					//    
					if(value != null){
						try{
							listener.onMessage(value);
						}catch(Exception e){
							//
						}
					}
				}
			}catch(InterruptedException e){
				//
			}
		}
	}
	
}

3)使用とテスト:
public static void main(String[] args) throws Exception{
	ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");
	RedisQueue redisQueue = (RedisQueue)context.getBean("jedisQueue");
	redisQueue.pushFromHead("test:app");
	Thread.sleep(15000);
	redisQueue.pushFromHead("test:app");
	Thread.sleep(15000);
	redisQueue.destroy();
}

プログラムの実行中にredis-cli(クライアントウィンドウ)でlpushを実行すると、プログラムのコンソールがキュー情報を正常に印刷できることがわかります.