溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java實現RPC框架

發布時間:2020-06-19 12:58:09 來源:億速云 閱讀:195 作者:鴿子 欄目:編程語言

RPC框架稱為遠程調用框架,其實現的核心原理就是消費者端使用動態代理來代理一個接口的方法(基于JDK的動態代理,當然如果使用CGLib可以直接使用無接口類的方法),通過加入網絡傳輸編程,傳輸調用接口方法名稱,方法參數來給提供者獲取,再通過反射,來執行該接口的方法,再將反射執行的結果通過網絡編程傳回消費者端。

現在我們來依次實現這些概念。這里我們做最簡單的實現,網絡編程使用的是BIO,大家可以使用Reactor模式的Netty來改寫性能更好的方式。而網絡傳輸中使用的序列化和反序列化也是Java自帶的,當然這樣的傳輸字節比較大,可以使用google的protoBuffer或者kryo來處理。這里只為了方便說明原理。

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.guanjian</groupId>
    <artifactId>rpc-framework</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

首先當然是我們要進行遠程調用的接口以及接口的方法。

public interface HelloService {
    String sayHello(String content);}

接口實現類

public class HelloServiceImpl implements HelloService {    public String sayHello(String content) {        return "hello," + content;    }
}

消費者端的動態代理,如果你是把提供者和消費者寫在兩個工程中,則提供者端需要上面的接口和實現類,而消費者端只需要上面的接口。

public class ConsumerProxy {
    /**
     * 消費者端的動態代理
     * @param interfaceClass 代理的接口類
     * @param host 遠程主機IP
     * @param port 遠程主機端口
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class[]{interfaceClass}, (proxy,method,args) -> {
                    //創建一個客戶端套接字
                    Socket socket = new Socket(host, port);
                    try {
                        //創建一個對外傳輸的對象流,綁定套接字
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            //將動態代理的方法名寫入對外傳輸的對象流中
                            output.writeUTF(method.getName());
                            //將動態代理的方法的參數寫入對外傳輸的對象流中
                            output.writeObject(args);
                            //創建一個對內傳輸的對象流,綁定套接字
                            //這里是為了獲取提供者端傳回的結果
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                //從對內傳輸的對象流中獲取結果
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable) result;
                                }
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
        );
    }
}

有關JDK動態代理的內容可以參考AOP原理與自實現 ,BIO的部分可以參考傳統IO與NIO比較

提供者端的網絡傳輸和遠程方式調用服務

public class ProviderReflect {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * RPC監聽和遠程方法調用
     * @param service RPC遠程方法調用的接口實例
     * @param port 監聽的端口
     * @throws Exception
     */
    public static void provider(final Object service,int port) throws Exception {
        //創建服務端的套接字,綁定端口port
        ServerSocket serverSocket = new ServerSocket(port);
        while (true) {
            //開始接收客戶端的消息,并以此創建套接字
            final Socket socket = serverSocket.accept();
            //多線程執行,這里的問題是連接數過大,線程池的線程數會耗盡
            executorService.execute(() -> {
                try {
                    //創建呢一個對內傳輸的對象流,并綁定套接字
                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                    try {
                        try {
                            //從對象流中讀取接口方法的方法名
                            String methodName = input.readUTF();
                            //從對象流中讀取接口方法的所有參數
                            Object[] args = (Object[]) input.readObject();
                            Class[] argsTypes = new Class[args.length];
                            for (int i = 0;i < args.length;i++) {
                                argsTypes[i] = args[i].getClass();

                            }
                            //創建一個對外傳輸的對象流,并綁定套接字
                            //這里是為了將反射執行結果傳遞回消費者端
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                Class<?>[] interfaces = service.getClass().getInterfaces();
                                Method method = null;
                                for (int i = 0;i < interfaces.length;i++) {
                                    method = interfaces[i].getDeclaredMethod(methodName,argsTypes);
                                    if (method != null) {
                                        break;
                                    }
                                }
                                Object result = method.invoke(service, args);
                                //將反射執行結果寫入對外傳輸的對象流中
                                output.writeObject(result);
                            } catch (Throwable t) {
                                output.writeObject(t);
                            } finally {
                                output.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            input.close();
                        }
                    } finally {
                        socket.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

啟動提供者端的網絡偵聽和遠程調用

public class RPCProviderMain {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        ProviderReflect.provider(service,8083);
    }
}

啟動消費者的動態代理調用

public class RPCConsumerMain {
    public static void main(String[] args) throws InterruptedException {
        HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083);
        for (int i = 0;i < 1000;i++) {
            String hello = service.sayHello("你好_" + i);
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }
}

運行結果

hello,你好_0
hello,你好_1
hello,你好_2
hello,你好_3
hello,你好_4
hello,你好_5

.....

如果你要擴展成一個Netty+ProtoBuffer的高性能RPC框架可以參考Netty整合Protobuffer 的相關寫法。

以上就是Java 手寫一個RPC框架的詳細內容,更多請關注億速云其它相關文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女