rabbitmq実装RPCインスタンス

3517 ワード

rpc(remote procedure callリモートプロシージャ呼び出し)の実装について、RabbitMq実戦ガイドを最近読みました.テストの例を次に示します.
サービス側
/**
 * 

* * rpc ,1、 ,2、 ,3、 response 。 * *

* @author hz16092620 * @date 2018 9 16 10:08:23 * @version */ public class RpcServer { public static void main(String[] args) { consumerMessage(); } /** * * */ public static void consumerMessage() { Connection conn = RabbitConnection.createConnection();// try { String queneName = "rpc_liuhp_quene"; final Channel channel = conn.createChannel(); channel.queueDeclare(queneName, false, true, false, null); // , channel.basicQos(10);// Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { StringBuilder sb = new StringBuilder(); for (byte b : body) { sb.append((char) b); } System.out.println(sb.toString()); BasicProperties props = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build(); channel.basicPublish("", properties.getReplyTo(), props, "result".getBytes());// result } }; channel.basicConsume(queneName, true, consumer); } catch (IOException e) { e.printStackTrace(); } finally { // , } } }

クライアント:
/**
 * 

* * , , * *

* @author hz16092620 * @date 2018 9 16 10:27:08 * @version */ public class RpcClient { public static void main(String[] args) { createClient(); } /** * * */ public static void createClient() { Connection conn = RabbitConnection.createConnection(); Channel channel = null; try { channel = conn.createChannel(); String queneName = channel.queueDeclare().getQueue(); final String uuid = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties().builder().correlationId(uuid).replyTo(queneName).build(); channel.basicPublish("", "rpc_liuhp_quene", props, String.valueOf(new Random().nextInt(100)).getBytes());// // // queneingConsumer , 3.0 。 // QueueingConsumer consume = new QueueingConsumer(channel); /* * while (true) { QueueingConsumer.Delivery delivery = * consume.nextDelivery(); if * (delivery.getProperties().getCorrelationId().equals(uuid)) { * System.out.println(new String(delivery.getBody())); } break; } */ // defaultConsumer Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { while (true) { if (properties.getCorrelationId().equals(uuid)) { StringBuilder sb = new StringBuilder(); for (byte b : body) { sb.append((char) b); } System.out.println(sb.toString()); } break; } } }; channel.basicConsume(queneName, true, consumer); Thread.sleep(30000L); } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }