這篇文章主要介紹了RabbitMQ如何實現RPC遠程調用消息隊列,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
客戶端接口
我們創建一個客戶端類來說明如何使用RPC服務,暴露一個call方法來發送RPC請求和數據獲取結果。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);盡管RPC是編程中一種常見的模式,但其也常常飽受批評。因為程序員常常不知道調用的方法是本地方法還是一個RPC方法,這在調試中常常增加一些不必要的復雜性。我們應該簡化代碼,而不是濫用RPC導致代碼變的臃腫。
回調隊列
一般來說,通過RabbitMQ實現RPC非常簡單,客戶端發送一個請求消息,服務端響應消息就完成了。為了接收到響應內容,我們在請求中發送”callback“隊列地址,也可以使用默認的隊列。
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties .Builder().replyTo(callbackQueueName) .build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());AMQP協議中預定了14個消息屬性,除了下面幾個,其它的都很少使用:
deliveryMode : 標識消息是持久化還是瞬態的。
contentType : 描述 mime-type的編碼類型,如JSON編碼為”application/json“。
replyTo : 通常在回調隊列中使用。
correlationId : 在請求中關聯RPC響應時使用。
關聯Id(Correlation Id)
在前面的方法中,要求在每個RPC請求創建回調隊列,這可真是一件繁瑣的事情,但幸運的是我們有個好方法-在每個客戶端創建一個簡單的回調隊列。
這樣問題又來了,隊列如何知道這些響應來自哪個請求呢?這時候correlationId就出場了。我們在每個請求中都設置一個唯一的值,這樣我們在回調隊列中接收消息的時候就能知道是哪個請求發送的。如果收到未知的correlationId,就廢棄該消息,因為它不是我們發出的請求。
你可能會問,為什么拋棄未知消息而不是拋出錯誤呢?這是由服務器競爭資源所導致的。盡管這不太可能,試想一下,如果RPC服務器在發送完響應后而在發送應答消息前死掉了,重啟RPC服務器會重新發送請求。這就是我們在客戶機上優雅地處理重復的反應,RPC應該是等同的。

?。?)客戶端啟動,創建一個匿名且唯一的回調隊列。
?。?)對每個RPC請求,客戶端發送一個包含replyTo和correlationId兩個屬性的消息。
?。?)請求發送到rpc_queue隊列。
?。?)RPC服務在隊列中等待請求,當請求出現時,根據replyTo字段使用隊列將結果發送到客戶端。
?。?)客戶端在回調隊列中等待數據。當消息出現時,它會檢查correlationId屬性,如果該值匹配的話,就會返回響應結果給應用。
示例代碼
RPCServer.java
package com.favccxx.favrabbit;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId())
.build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
} RPCClient.java
package com.favccxx.favrabbit;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}先啟動RPCServer,然后運行RPCClient,控制臺輸出如下內容
RPCClient[x] Requesting fib(30) RPCClient[.] Got '832040' RPCServer[x] Awaiting RPC requests RPCServer[.] fib(30) |
感謝你能夠認真閱讀完這篇文章,希望小編分享的“RabbitMQ如何實現RPC遠程調用消息隊列”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。