reids模倣メッセージキュー

16028 ワード

文書ディレクトリ
  • 一、メッセージキュー
  • 二、Jedisリソースプール生成例
  • 三、生産者
  • 4、消費者
  • 参照
  • 一、メッセージキュー
    メッセージキューは簡単に言えば消費者生産者モデルであり、消費者消費資源、生産者生産資源であり、どのようにredisを通じてメッセージキューを実現するかは、redis中のlist構造がこのような問題を処理するのに適していることに気づく.listタイプはショッピングモールシステムであるプロセスを示すために使用することができ、1つのプロセスをlpushでlistに入れ、終了したプロセスをrpopで現在の道のりを終了する.
    二、Jedis資源プールの生成例
    リソースプールの構成を追加する必要があることに注意してください.最初はリソースプールを作成しただけなので、リソースプールのプロパティはデフォルトのプロパティを採用し、生産者が8つのリソース数を生産し、単例モードでJedisオブジェクトを返します.
    //       jedis  
    	static class JedisPoolUtil {
    		private static JedisPool pool = null;
    		static {
    			JedisPoolConfig config = new JedisPoolConfig();
    			config.setMaxIdle(30);//      
    			config.setMaxWaitMillis(3000);//        
    			config.setMaxTotal(100);//     
    			pool = new JedisPool(config, "127.0.0.1");
    		}
    		public static Jedis getJedis() {
    			return pool.getResource();
    		}
    		
    	}
    

    三、生産者
    5つのスレッドで生産者をシミュレートし、redis:queueにリソースを追加し続けます.
    class Publish extends Thread {
    	private volatile int count = 0;
    	private String key = "redis:queue";
    	public void putMessage(String message) {
    		Jedis jedis = JedisPoolUtil.getJedis();
    		Long size = jedis.lpush(key, message);
    		System.out.println(Thread.currentThread().getName()+"put ,size:"+size+",count:"+count);
    		count++;
    	}
    	
    	@Override
    	public synchronized void run() {
    		//      5 
    		for (int i = 0; i < 5; i++) {
    			putMessage("message:"+count);
    		}
    	}
    	public static void main(String[] args) {
    		Publish pub = new Publish();
    		Thread t1 = new Thread(pub,"thread_1");
    		Thread t2 = new Thread(pub,"thread_2");
    		Thread t3 = new Thread(pub,"thread_3");
    		Thread t4 = new Thread(pub,"thread_4");
    		Thread t5 = new Thread(pub,"thread_5");
    		t1.start();
    		t2.start();
    		t3.start();
    		t4.start();
    		t5.start();
    	}
    }
    

    四、消費者
    rpopコマンドを使用してlistのリソースを取得し始めたばかりで、後で消費スレッドがアイドル(リソースが取得されていない)になる可能性があることがわかりましたので、brpopを使用して取得することを選択します.listにリソースが存在しないと、現在の実行スレッドがブロックされます.
    
    public class Consumer extends Thread {
    	private final String key = "redis:queue";
    	private volatile int count = 0;
    
    	public void consumer() {
    		Jedis jedis = JedisPoolUtil.getJedis();
    		List<String> message = jedis.brpop(0, key);
    		System.out.println(Thread.currentThread().getName() + "count:" + count + "get:" + message);
    		count++;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			consumer();
    		}
    	}
    
    	public static void main(String[] args) {
    		Consumer consumer = new Consumer();
    		Thread t1 = new Thread(consumer, "Thread_1");
    		Thread t2 = new Thread(consumer, "Thread_2");
    		t1.start();
    		t2.start();
    		
    	}
    }
    

    リファレンス
    【1】https://www.cnblogs.com/qlqwjy/p/9763754.html