Redisメッセージキュー(MQ)のパブリケーション(publish)とメッセージリスニング(subscribe)をjedisを用いて実現する
4162 ワード
前言:
本論文はjedis 2.9.0に基づく.jar、commons-pool2-2.4.2.JArおよびjson-20160810.jar
ここでjedis接続プールはcommons-pool 2パケットに依存する必要があり、jsonパケットはオブジェクトインスタンスとjson文字列の相互変換に使用される.
1、jedisのメッセージキュー方法の簡単な説明
1.1、メッセージの発行方法
(チャネルは対応メッセージチャネル、messageは対応メッセージボディ)
jedis.publish(channel, message);
1.2.傍受メッセージ方法
(ここでjedisPubSubは、受信したメッセージを処理するために使用され、channelsは対応するチャネルである)
jedis.subscribe(jedisPubSub, channels);
2、発表メッセージ
3、メッセージの傍受
3.1、傍受メッセージ主体方法
3.2、傍受されたメッセージタスクの処理
3.3、傍受されたメッセージマスタークラスの処理の実現
4.テストメッセージキューの発行と傍受
転載先:https://www.cnblogs.com/eguid/p/10195619.html
本論文はjedis 2.9.0に基づく.jar、commons-pool2-2.4.2.JArおよびjson-20160810.jar
ここでjedis接続プールはcommons-pool 2パケットに依存する必要があり、jsonパケットはオブジェクトインスタンスとjson文字列の相互変換に使用される.
1、jedisのメッセージキュー方法の簡単な説明
1.1、メッセージの発行方法
(チャネルは対応メッセージチャネル、messageは対応メッセージボディ)
jedis.publish(channel, message);
1.2.傍受メッセージ方法
(ここでjedisPubSubは、受信したメッセージを処理するために使用され、channelsは対応するチャネルである)
jedis.subscribe(jedisPubSub, channels);
2、発表メッセージ
/**
* jedis jedis
* @return
*/
public static Jedis getJedis() {
return RedisPoolManager.getJedis();
}
/**
* redis
*
* @param String
* channel
* @param String
* message
*/
public static void publish(String channel, String message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
/**
* redis
*
* @param byte[]
* channel
* @param byte[]
* message
*/
public void publish(byte[] channel, byte[] message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
3、メッセージの傍受
3.1、傍受メッセージ主体方法
/**
*
* @param jedisPubSub -
* @param channels -
*/
public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
}
/**
*
* @param jedisPubSub -
* @param channels -
*/
public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
}
3.2、傍受されたメッセージタスクの処理
class Tasker implements Runnable {
private String[] channel = null;//
private JedisPubSub jedisPubSub = null;//
public Tasker(JedisPubSub jedisPubSub, String ...channel) {
this.jedisPubSub = jedisPubSub;
this.channel = channel;
}
@Override
public void run() {
// channel
RedisMQ.subscribe(jedisPubSub, channel);
}
}
3.3、傍受されたメッセージマスタークラスの処理の実現
package cn.eguid.livePushServer.redisManager;
import java.util.Map;
import org.json.JSONObject;
import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub;
public class RedisMQHandler extends JedisPubSub{
PushManager pushManager = null;
public RedisMQHandler(PushManager pushManager) {
super();
this.pushManager = pushManager;
}
@Override
//
public void onMessage(String channel, String message) {
JSONObject jsonObj = new JSONObject(message);
System.out.println(channel+","+message);
if ("push".equals(channel)) {
Map map=jsonObj.toMap();
System.out.println(" , :"+map);
// String appName=pushManager.push(map);
//
} else if ("close".equals(channel)) {
String appName=jsonObj.getString("appName");
System.out.println(" , :"+appName);
// pushManager.closePush(appName);
}
}
}
4.テストメッセージキューの発行と傍受
public static void main(String[] args) throws InterruptedException {
PushManager pushManager= new PushManagerImpl();
Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
t1.start();
t2.start();
LivePushEntity livePushInfo=new LivePushEntity();
livePushInfo.setAppName("test1");
JSONObject json=new JSONObject(livePushInfo);
publish("push",json.toString());
publish("close", json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
}
転載先:https://www.cnblogs.com/eguid/p/10195619.html