rabbitmq実装RPCインスタンス
3517 ワード
rpc(remote procedure callリモートプロシージャ呼び出し)の実装について、RabbitMq実戦ガイドを最近読みました.テストの例を次に示します.
サービス側
クライアント:
サービス側
/**
*
*
* rpc ,1、 ,2、 ,3、 response 。
*
*
* @author hz16092620
* @date 2018 9 16 10:08:23
* @version
*/
public class RpcServer {
public static void main(String[] args) {
consumerMessage();
}
/**
*
* */
public static void consumerMessage() {
Connection conn = RabbitConnection.createConnection();//
try {
String queneName = "rpc_liuhp_quene";
final Channel channel = conn.createChannel();
channel.queueDeclare(queneName, false, true, false, null);
// ,
channel.basicQos(10);//
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
StringBuilder sb = new StringBuilder();
for (byte b : body) {
sb.append((char) b);
}
System.out.println(sb.toString());
BasicProperties props = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
channel.basicPublish("", properties.getReplyTo(), props, "result".getBytes());// result
}
};
channel.basicConsume(queneName, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
// ,
}
}
}
クライアント:
/**
*
*
* , ,
*
*
* @author hz16092620
* @date 2018 9 16 10:27:08
* @version
*/
public class RpcClient {
public static void main(String[] args) {
createClient();
}
/**
*
* */
public static void createClient() {
Connection conn = RabbitConnection.createConnection();
Channel channel = null;
try {
channel = conn.createChannel();
String queneName = channel.queueDeclare().getQueue();
final String uuid = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties().builder().correlationId(uuid).replyTo(queneName).build();
channel.basicPublish("", "rpc_liuhp_quene", props, String.valueOf(new Random().nextInt(100)).getBytes());//
//
// queneingConsumer , 3.0 。
// QueueingConsumer consume = new QueueingConsumer(channel);
/*
* while (true) { QueueingConsumer.Delivery delivery =
* consume.nextDelivery(); if
* (delivery.getProperties().getCorrelationId().equals(uuid)) {
* System.out.println(new String(delivery.getBody())); } break; }
*/
// defaultConsumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
while (true) {
if (properties.getCorrelationId().equals(uuid)) {
StringBuilder sb = new StringBuilder();
for (byte b : body) {
sb.append((char) b);
}
System.out.println(sb.toString());
}
break;
}
}
};
channel.basicConsume(queneName, true, consumer);
Thread.sleep(30000L);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}