這篇文章主要介紹“怎么使用Netty實現類似Dubbo的遠程接口調用”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“怎么使用Netty實現類似Dubbo的遠程接口調用”文章能幫助大家解決問題。
Netty 是一個基于NIO的客戶、服務器端的編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty相當于簡化和流線化了網絡應用的編程開發過程,例如:基于TCP和UDP的socket服務開發。
實現步驟:
創建接口和實現類
創建客戶端代碼
通過動態代理模式,封裝netty遠程接口調用
通過異步線程等待/通知,實現異步轉同步
創建服務端代碼
自定義編碼解碼器
編寫測試客戶端發送請求代碼
工程依賴引入
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.75.Final</version> </dependency>
定義簡單接口和實現類。通過注解定義接口和服務實現,在后續代碼中解析注解。
@ServiceEntry定義接口serviceId
@MyService定義服務實現類
public interface IHelloService { @ServiceEntry(serviceId = "001", name = "hello") String hello(String msg); } @MyService public class HelloServiceImpl implements IHelloService { @Override public String hello(String msg) { return "re:這里是服務端,已收到客戶端消息:" + msg.hashCode(); } }
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface ServiceEntry { /** * 服務Id */ String serviceId(); /** * 服務名稱 */ String name() default ""; } @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface MyService { String value() default ""; }
1)創建客戶端Handler,MyClientHandler,繼承ChannelInboundHandlerAdapter,并實現Callable接口
客戶端發送請求時,會調用call方法,在這里將異步轉同步
將請求context放入map,并等待線程,在收到服務端返回時,異步通知線程執行,返回結果數據
收到服務端返回時,設置返回結果數據,并通知線程執行
public class MyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> { private ChannelHandlerContext ctx; private ConcurrentHashMap<String, SyncSendContext> syncSendContextMap = new ConcurrentHashMap<>(); private Object[] param; private String serviceId; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端和服務端鏈接成功"); this.ctx = ctx; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客戶端收到服務端回復: " + msg); ResponseData data = (ResponseData) msg; String id = data.getId(); // 收到服務端返回時,設置返回結果數據,并通知線程執行 SyncSendContext context = syncSendContextMap.get(id); context.setResp(data); synchronized (context) { context.notify(); } } @Override public String call() throws Exception { System.out.println("客戶端向服務端發送消息: " + param[0].toString()); String id = UUID.randomUUID().toString(); RequestData data = new RequestData(); data.setId(id); //強制設置參數1 data.setData(param[0].toString()); data.setServiceId(serviceId); SyncSendContext context = new SyncSendContext(); context.setRequest(data); // 將請求context放入map,并等待線程,在收到服務端返回時,異步通知線程執行,返回結果數據 syncSendContextMap.put(id, context); synchronized (context) { ctx.writeAndFlush(data); context.wait(); return (String) context.getResp().getData(); } } public void setParam(Object[] param) { this.param = param; } public void setServiceId(String serviceId) { this.serviceId = serviceId; } }
2)創建客戶端代碼,MyClient
通過動態代理,包裝遠程服務請求
初始化服務端鏈接,通過雙檢鎖確保clientHandler是單例實現
發送請求時,通過線程池異步發送clientHandler
public class MyClient { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private MyClientHandler clientHandler; // 通過動態代理,包裝遠程服務請求 public <T> T getServie(final Class<T> service) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{service}, (proxy, method, args) -> { if (clientHandler == null) { init("127.0.0.1", 7000); } ServiceEntry annotation = method.getAnnotation(ServiceEntry.class); if (annotation == null) { return null; } clientHandler.setParam(args); clientHandler.setServiceId(annotation.serviceId()); return executor.submit(clientHandler).get(); }); } // 初始化服務端鏈接,通過雙檢鎖確保clientHandler是單例實現 private synchronized void init(String hostname, int port) { if (clientHandler != null) { return; } clientHandler = new MyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap().group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ResponseMessageCodec()); pipeline.addLast(new RequestMessageCodec()); pipeline.addLast(clientHandler); } }); bootstrap.connect(hostname, port).sync(); } catch (Exception e) { e.printStackTrace(); } } }
1)創建服務工程類ServiceFacatory,解析注解保存服務接口和實現類,調用的時候從Map直接獲取
public class ServiceFacatory { private static final Map<String, Method> methodMap = new HashMap<>(); private static final Map<String, Object> serviceMap = new HashMap<>(); public static void init() throws Exception { // 要掃描的包 String packages = "com.hj.netty.dubbo.api"; Set<MethodInfo> methods = PackageUtils.findClassAnnotationMethods(packages, ServiceEntry.class); for (MethodInfo info : methods) { ServiceEntry serviceEntry = (ServiceEntry) info.getAnnotation(); methodMap.put(serviceEntry.serviceId(), info.getMethod()); String serviceName = info.getMethod().getDeclaringClass().getName(); if (!serviceMap.containsKey(serviceName)) { Object instance = info.getMethod().getDeclaringClass().newInstance(); serviceMap.put(serviceName, instance); } } } public static Object invoke(String serviceId, Object args) throws Exception { Method method = methodMap.get(serviceId); String serviceName = method.getDeclaringClass().getName(); Object instance = serviceMap.get(serviceName); Object result = method.invoke(instance, args); return result; } } @Data @AllArgsConstructor public class MethodInfo { private Annotation annotation; private Method method; }
2)包解析工具類,解析指定目錄下的所有service類
import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.core.type.classreading.CachingMetadataReaderFactory; import org.springframework.core.type.classreading.MetadataReader; import org.springframework.core.type.classreading.MetadataReaderFactory; import org.springframework.util.SystemPropertyUtils; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.lang.reflect.Parameter; import java.util.*; public class PackageUtils { private final static Logger log = LoggerFactory.getLogger(PackageUtils.class); //掃描 scanPackages 下的文件的匹配符 protected static final String DEFAULT_RESOURCE_PATTERN = "**/*.class"; /** * 結合spring的類掃描方式 * 根據需要掃描的包路徑及相應的注解,獲取最終測method集合 * 僅返回public方法,如果方法是非public類型的,不會被返回 * 可以掃描工程下的class文件及jar中的class文件 * * @param scanPackages * @param annotation * @return */ public static Set<MethodInfo> findClassAnnotationMethods(String scanPackages, Class<? extends Annotation> annotation) { //獲取所有的類 Set<String> clazzSet = findPackageClass(scanPackages); Set<MethodInfo> methods = new HashSet<>(); //遍歷類,查詢相應的annotation方法 for (String clazz : clazzSet) { try { Set<MethodInfo> ms = findAnnotationMethods(clazz, annotation); methods.addAll(ms); } catch (ClassNotFoundException ignore) { } } return methods; } public static Set<MethodInfo> findAnnotationMethods(String fullClassName, Class<? extends Annotation> anno) throws ClassNotFoundException { Set<MethodInfo> methodSet = new HashSet<>(); Class<?> clz = Class.forName(fullClassName); // 存儲接口中定義的方法 Map<String, Method> mapMethodInf = new HashMap<>(); for (int i = 0; i < clz.getInterfaces().length; i++) { Class<?> inf = clz.getInterfaces()[i]; Method[] methods = inf.getDeclaredMethods(); for (Method method : methods) { String key = getMethodKey(method); mapMethodInf.put(key, method); } } Method[] methods = clz.getDeclaredMethods(); for (Method method : methods) { if (method.getModifiers() != Modifier.PUBLIC) { continue; } Annotation annotation = method.getAnnotation(anno); if (annotation != null) { methodSet.add(new MethodInfo(annotation,method)); } else { // 從接口中讀取對應的方法 String key = getMethodKey(method); Method methodInf = mapMethodInf.get(key); annotation = methodInf.getAnnotation(anno); if (annotation != null) { methodSet.add(new MethodInfo(annotation,method)); } } } return methodSet; } /** * 根據掃描包的,查詢下面的所有類 * * @param scanPackages 掃描的package路徑 * @return */ private static Set<String> findPackageClass(String scanPackages) { if (StringUtils.isBlank(scanPackages)) { return Collections.EMPTY_SET; } //驗證及排重包路徑,避免父子路徑多次掃描 Set<String> packages = checkPackage(scanPackages); ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver(); MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resourcePatternResolver); Set<String> clazzSet = new HashSet<String>(); for (String basePackage : packages) { if (StringUtils.isBlank(basePackage)) { continue; } String packageSearchPath = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + org.springframework.util.ClassUtils.convertClassNameToResourcePath(SystemPropertyUtils.resolvePlaceholders(basePackage)) + "/" + DEFAULT_RESOURCE_PATTERN; try { Resource[] resources = resourcePatternResolver.getResources(packageSearchPath); for (Resource resource : resources) { //檢查resource,這里的resource都是class String clazz = loadClassName(metadataReaderFactory, resource); clazzSet.add(clazz); } } catch (Exception e) { log.error("獲取包下面的類信息失敗,package:" + basePackage, e); } } return clazzSet; } /** * 排重、檢測package父子關系,避免多次掃描 * * @param scanPackages * @return 返回檢查后有效的路徑集合 */ private static Set<String> checkPackage(String scanPackages) { if (StringUtils.isBlank(scanPackages)) { return Collections.EMPTY_SET; } Set<String> packages = new HashSet<>(); //排重路徑 Collections.addAll(packages, scanPackages.split(",")); String[] strings = packages.toArray(new String[packages.size()]); for (String pInArr : strings) { if (StringUtils.isBlank(pInArr) || pInArr.equals(".") || pInArr.startsWith(".")) { continue; } if (pInArr.endsWith(".")) { pInArr = pInArr.substring(0, pInArr.length() - 1); } Iterator<String> packageIte = packages.iterator(); boolean needAdd = true; while (packageIte.hasNext()) { String pack = packageIte.next(); if (pInArr.startsWith(pack + ".")) { //如果待加入的路徑是已經加入的pack的子集,不加入 needAdd = false; } else if (pack.startsWith(pInArr + ".")) { //如果待加入的路徑是已經加入的pack的父集,刪除已加入的pack packageIte.remove(); } } if (needAdd) { packages.add(pInArr); } } return packages; } /** * 加載資源,根據resource獲取className * * @param metadataReaderFactory spring中用來讀取resource為class的工具 * @param resource 這里的資源就是一個Class */ private static String loadClassName(MetadataReaderFactory metadataReaderFactory, Resource resource) { try { if (resource.isReadable()) { MetadataReader metadataReader = metadataReaderFactory.getMetadataReader(resource); if (metadataReader != null) { return metadataReader.getClassMetadata().getClassName(); } } } catch (Exception e) { log.error("根據resource獲取類名稱失敗", e); } return null; } private static String getMethodKey(Method method) { StringBuilder key = new StringBuilder(method.getName()); for (Parameter parameter : method.getParameters()) { key.append(parameter.getType().getName()) .append(parameter.getName()); } return key.toString(); } }
3)創建服務端Handler類,接收客戶端請求,并調用服務實現類執行接口
public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端接入"); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到客戶端消息:" + msg); RequestData req = (RequestData) msg; if (req != null) { String args = req.getData(); String serviceId = req.getServiceId(); // 調用服務實現類 Object res = ServiceFacatory.invoke(serviceId, args); ResponseData resp = new ResponseData(); resp.setData(res); resp.setId(req.getId()); ctx.writeAndFlush(resp); } System.out.println("----------響應結束----------" + req.getData()); } }
4)創建服務端啟動類MyServer、ServerApp,啟動端口監聽;加入編解碼器和服務端MyServerHandler
public class MyServer { public static void start(String hostname, int port) throws Exception { ServiceFacatory.init(); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new RequestMessageCodec()); pipeline.addLast(new ResponseMessageCodec()); pipeline.addLast(new MyServerHandler()); } }); ChannelFuture future = bootstrap.bind(hostname, port).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } public class ServerApp { public static void main(String[] args) throws Exception { MyServer.start("127.0.0.1", 7000); } }
1)創建請求數據編解碼器RequestMessageCodec,實現String和請求參數對象RequestData之間互相轉換
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> { @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.encode 被調用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.decode 被調用 " + msg); RequestData po = JSONObject.parseObject(msg, RequestData.class); out.add(po); } }
2)創建服務響應數據編解碼器ResponseMessageCodec,實現String和響應數據對象ResponseData之間互相轉換
public class ResponseMessageCodec extends MessageToMessageCodec<String, ResponseData> { @Override protected void encode(ChannelHandlerContext ctx, ResponseData msg, List<Object> out) throws Exception { System.out.println("ResponseMessageCodec.encode 被調用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("ResponseMessageCodec.decode 被調用 " + msg); ResponseData po = JSONObject.parseObject(msg, ResponseData.class); out.add(po); } }
3)創建請求、響應VO
@Data public class RequestData { private String id; private String serviceId; private String data; } @Data public class ResponseData { private String id; private Object data; } @Data public class SyncSendContext { private ResponseData resp; private RequestData request; }
1)創建客戶端請求類ClientTest,模擬發送數據
public class ClientTest { public static void main(String[] args) throws Exception { MyClient client = new MyClient(); IHelloService servie = client.getServie(IHelloService.class); for (int i = 0; i < 5; i++) { Thread.sleep(2 * 1000); String res = servie.hello("你好 服務端 ~~ " + i); System.out.println("service 得到服務端返回消息: " + res); System.out.println("-----------------------------" + i +" 結束"); } } }
2)運行ServerApp,啟動服務端,運行ClientTest模擬客戶端發送數據
客戶端日志:
客戶端和服務端鏈接成功
客戶端向服務端發送消息: 你好 服務端 ~~ 0
RequestMessageCodec.encode 被調用 RequestData(id=d081f840-4367-42d3-909c-32f6a1654c60, serviceId=001, data=你好 服務端 ~~ 0)
ResponseMessageCodec.decode 被調用 {"data":"re:這里是服務端,已收到客戶端消息:1845339960","id":"d081f840-4367-42d3-909c-32f6a1654c60"}
客戶端收到服務端回復: ResponseData(id=d081f840-4367-42d3-909c-32f6a1654c60, data=re:這里是服務端,已收到客戶端消息:1845339960)
service 得到服務端返回消息: re:這里是服務端,已收到客戶端消息:1845339960
-----------------------------0 結束
客戶端向服務端發送消息: 你好 服務端 ~~ 1
RequestMessageCodec.encode 被調用 RequestData(id=d49105b0-2624-43c2-bb19-c826987133f1, serviceId=001, data=你好 服務端 ~~ 1)
ResponseMessageCodec.decode 被調用 {"data":"re:這里是服務端,已收到客戶端消息:1845339961","id":"d49105b0-2624-43c2-bb19-c826987133f1"}
客戶端收到服務端回復: ResponseData(id=d49105b0-2624-43c2-bb19-c826987133f1, data=re:這里是服務端,已收到客戶端消息:1845339961)
service 得到服務端返回消息: re:這里是服務端,已收到客戶端消息:1845339961
-----------------------------1 結束
客戶端向服務端發送消息: 你好 服務端 ~~ 2
RequestMessageCodec.encode 被調用 RequestData(id=13f82f4a-0a2f-41cc-8420-38ab20fab2d2, serviceId=001, data=你好 服務端 ~~ 2)
ResponseMessageCodec.decode 被調用 {"data":"re:這里是服務端,已收到客戶端消息:1845339962","id":"13f82f4a-0a2f-41cc-8420-38ab20fab2d2"}
客戶端收到服務端回復: ResponseData(id=13f82f4a-0a2f-41cc-8420-38ab20fab2d2, data=re:這里是服務端,已收到客戶端消息:1845339962)
service 得到服務端返回消息: re:這里是服務端,已收到客戶端消息:1845339962
-----------------------------2 結束
客戶端向服務端發送消息: 你好 服務端 ~~ 3
RequestMessageCodec.encode 被調用 RequestData(id=f4576cbd-8ee5-438c-ae6d-810b836c177a, serviceId=001, data=你好 服務端 ~~ 3)
ResponseMessageCodec.decode 被調用 {"data":"re:這里是服務端,已收到客戶端消息:1845339963","id":"f4576cbd-8ee5-438c-ae6d-810b836c177a"}
客戶端收到服務端回復: ResponseData(id=f4576cbd-8ee5-438c-ae6d-810b836c177a, data=re:這里是服務端,已收到客戶端消息:1845339963)
service 得到服務端返回消息: re:這里是服務端,已收到客戶端消息:1845339963
-----------------------------3 結束
客戶端向服務端發送消息: 你好 服務端 ~~ 4
RequestMessageCodec.encode 被調用 RequestData(id=68e67b0f-0c35-4ead-915e-e1890a0c0b53, serviceId=001, data=你好 服務端 ~~ 4)
ResponseMessageCodec.decode 被調用 {"data":"re:這里是服務端,已收到客戶端消息:1845339964","id":"68e67b0f-0c35-4ead-915e-e1890a0c0b53"}
客戶端收到服務端回復: ResponseData(id=68e67b0f-0c35-4ead-915e-e1890a0c0b53, data=re:這里是服務端,已收到客戶端消息:1845339964)
service 得到服務端返回消息: re:這里是服務端,已收到客戶端消息:1845339964
-----------------------------4 結束
服務端日志:
RequestMessageCodec.decode 被調用 {"data":"你好 服務端 ~~ 0","id":"f876eccf-a034-467a-8b5a-4c6dba80cee2","serviceId":"001"}
收到客戶端消息:RequestData(id=f876eccf-a034-467a-8b5a-4c6dba80cee2, serviceId=001, data=你好 服務端 ~~ 0)
ResponseMessageCodec.encode 被調用 ResponseData(id=f876eccf-a034-467a-8b5a-4c6dba80cee2, data=re:這里是服務端,已收到客戶端消息:1845339960)
----------響應結束----------你好 服務端 ~~ 0
RequestMessageCodec.decode 被調用 {"data":"你好 服務端 ~~ 1","id":"bcceaa9b-09be-4dcc-9135-ac14caa365d1","serviceId":"001"}
收到客戶端消息:RequestData(id=bcceaa9b-09be-4dcc-9135-ac14caa365d1, serviceId=001, data=你好 服務端 ~~ 1)
ResponseMessageCodec.encode 被調用 ResponseData(id=bcceaa9b-09be-4dcc-9135-ac14caa365d1, data=re:這里是服務端,已收到客戶端消息:1845339961)
----------響應結束----------你好 服務端 ~~ 1
RequestMessageCodec.decode 被調用 {"data":"你好 服務端 ~~ 2","id":"ab0181b1-b3fe-42b7-ae17-d2a533c56098","serviceId":"001"}
收到客戶端消息:RequestData(id=ab0181b1-b3fe-42b7-ae17-d2a533c56098, serviceId=001, data=你好 服務端 ~~ 2)
ResponseMessageCodec.encode 被調用 ResponseData(id=ab0181b1-b3fe-42b7-ae17-d2a533c56098, data=re:這里是服務端,已收到客戶端消息:1845339962)
----------響應結束----------你好 服務端 ~~ 2
RequestMessageCodec.decode 被調用 {"data":"你好 服務端 ~~ 3","id":"6a4e6061-9ebe-4250-b939-2e5f314096fc","serviceId":"001"}
收到客戶端消息:RequestData(id=6a4e6061-9ebe-4250-b939-2e5f314096fc, serviceId=001, data=你好 服務端 ~~ 3)
ResponseMessageCodec.encode 被調用 ResponseData(id=6a4e6061-9ebe-4250-b939-2e5f314096fc, data=re:這里是服務端,已收到客戶端消息:1845339963)
----------響應結束----------你好 服務端 ~~ 3
RequestMessageCodec.decode 被調用 {"data":"你好 服務端 ~~ 4","id":"69c726e6-a3f1-487a-8455-ada02b4e97ed","serviceId":"001"}
收到客戶端消息:RequestData(id=69c726e6-a3f1-487a-8455-ada02b4e97ed, serviceId=001, data=你好 服務端 ~~ 4)
ResponseMessageCodec.encode 被調用 ResponseData(id=69c726e6-a3f1-487a-8455-ada02b4e97ed, data=re:這里是服務端,已收到客戶端消息:1845339964)
----------響應結束----------你好 服務端 ~~ 4
關于“怎么使用Netty實現類似Dubbo的遠程接口調用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。