Redisメッセージキュー(MQ)のパブリケーション(publish)とメッセージリスニング(subscribe)をjedisを用いて実現する

4162 ワード

前言:
本論文はjedis 2.9.0に基づく.jar、commons-pool2-2.4.2.JArおよびjson-20160810.jar
ここでjedis接続プールはcommons-pool 2パケットに依存する必要があり、jsonパケットはオブジェクトインスタンスとjson文字列の相互変換に使用される.
1、jedisのメッセージキュー方法の簡単な説明
1.1、メッセージの発行方法
(チャネルは対応メッセージチャネル、messageは対応メッセージボディ)
jedis.publish(channel, message);
1.2.傍受メッセージ方法
(ここでjedisPubSubは、受信したメッセージを処理するために使用され、channelsは対応するチャネルである)
jedis.subscribe(jedisPubSub, channels);
2、発表メッセージ
/**
	 *  jedis     jedis    
	 * @return
	 */
	public static Jedis getJedis() {
		return RedisPoolManager.getJedis();
	}

	/**
	 *      redis    
	 * 
	 * @param String
	 *            channel
	 * @param String
	 *            message
	 */
	public static void publish(String channel, String message) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.publish(channel, message);
		} finally {
			jedis.close();
		}
	}

	/**
	 *      redis    
	 * 
	 * @param byte[]
	 *            channel
	 * @param byte[]
	 *            message
	 */
	public void publish(byte[] channel, byte[] message) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.publish(channel, message);
		} finally {
			jedis.close();
		}

	}

3、メッセージの傍受
3.1、傍受メッセージ主体方法
/**
	 *       
	 * @param jedisPubSub -     
	 * @param channels -         
	 */
	public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.subscribe(jedisPubSub, channels);
		} finally {
			jedis.close();
		}
	}

	/**
	 *       
	 * @param jedisPubSub -     
	 * @param channels -         
	 */
	public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.subscribe(jedisPubSub, channels);
		} finally {
			jedis.close();
		}
	}

3.2、傍受されたメッセージタスクの処理
class Tasker implements Runnable {
	private String[] channel = null;//       
	private JedisPubSub jedisPubSub = null;//      

	public Tasker(JedisPubSub jedisPubSub, String ...channel) {
		this.jedisPubSub = jedisPubSub;
		this.channel = channel;
	}

	@Override
	public void run() {
		//   channel     
		RedisMQ.subscribe(jedisPubSub, channel);
	}

}

3.3、傍受されたメッセージマスタークラスの処理の実現
package cn.eguid.livePushServer.redisManager;

import java.util.Map;

import org.json.JSONObject;

import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub;

public class RedisMQHandler extends JedisPubSub{
	PushManager pushManager = null;

	public RedisMQHandler(PushManager pushManager) {
		super();
		this.pushManager = pushManager;
	}

	@Override
	//             
	public void onMessage(String channel, String message) {
		JSONObject jsonObj = new JSONObject(message);
		System.out.println(channel+","+message);
		if ("push".equals(channel)) {
			Map map=jsonObj.toMap();
			System.out.println("         ,    :"+map);
//			String appName=pushManager.push(map);
			//                     
			
		} else if ("close".equals(channel)) {
			String appName=jsonObj.getString("appName");
			System.out.println("         ,      :"+appName);
//			pushManager.closePush(appName);
		}
	}
}

4.テストメッセージキューの発行と傍受
public static void main(String[] args) throws InterruptedException {
		
		PushManager pushManager= new PushManagerImpl();
		Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
		Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
		t1.start();
		t2.start();
		
		LivePushEntity livePushInfo=new LivePushEntity();
		livePushInfo.setAppName("test1");
		JSONObject json=new JSONObject(livePushInfo);
		publish("push",json.toString());
		publish("close", json.toString());
		Thread.sleep(2000);
		publish("push", json.toString());
		publish("close",json.toString());
		Thread.sleep(2000);
		publish("push", json.toString());
		publish("close",json.toString());

	}

転載先:https://www.cnblogs.com/eguid/p/10195619.html