真的夠可以的,基於Netty實現了RPC框架

RPC全稱Remote Procedure Call,即遠端過程呼叫,對於呼叫者無感知這是一個遠端呼叫功能。目前流行的開源RPC 框架有阿里的Dubbo、Google 的 gRPC、Twitter 的Finagle 等。本次RPC框架的設計主要參考的是阿里的Dubbo,這裡Netty 基本上是作為架構的技術底層而存在的,主要完成高效能的網路通訊,從而實現高效的遠端呼叫。

Dubbo的架構與Spring

其實在之前的文章中《談談京東的服務框架》,探討過Dubbo的組成和架構。

真的夠可以的,基於Netty實現了RPC框架

真的夠可以的,基於Netty實現了RPC框架

另外使用Dubbo最方便的地方在於它可以和Spring非常方便的整合,Dubbo對於配置的最佳化也是隨著Spring一脈相承的,從最早的XML形式到後來的註解方式以及自動裝配,都是在不斷地簡化開發過程來提高開發效率。

Dubbo在Spring框架中的工作流程:

1、Spring的IOC容器啟動

2、把服務註冊到註冊中心(zookeeper軟體)中

3、消費者啟動時會把它需要用到的服務從註冊中心拉取下來

4、提供者的地址發生改變時,註冊中心會馬上通知消費者

5、根據註冊中心中的服務地址直接就可以呼叫提供者了,如果呼叫了提供者,就會把提供者的地址主動快取起來

6、監控消費者呼叫提供者的次數

RPC實現的關鍵

1、序列化與反序列化

在遠端過程呼叫時,客戶端跟服務端是不同的程序,甚至有時候客戶端用Java,服務端用C++。這時候就需要客戶端把引數先轉成一個位元組流,傳給服務端後,再把位元組流轉成自己能讀取的格式,這個過程叫序列化和反序列化,同理,從服務端返回的值也需要序列化反序列化的過程。在序列化的時候,我們選擇Netty自身的物件序列化器。

真的夠可以的,基於Netty實現了RPC框架

2、資料網路傳輸

解決了序列化的問題,那麼剩下的就是如何把資料引數傳到生產者,網路傳輸層需要把序列化後的引數位元組流傳給服務端,然後再把序列化後的呼叫結果傳回客戶端,雖然大部分RPC框架都採用了TCP作為傳輸協議,其實UDP也可以作為傳輸協議的,基於TCP和UDP我們可以自定義任意規則的協議,加之我們要使用NIO通訊方式作為高效能網路服務的前提,於是Netty似乎更符合我們Java程式設計師的口味,Netty真香!

3、告訴註冊中心我要調誰

現在呼叫引數的序列化和網路傳輸都已經具備,但是還有個問題,那就是消費者要呼叫誰的問題,一個函式或者方法,我們可以理解為一個服務,這些服務註冊在註冊中心上面,只有當消費者告訴註冊中心要呼叫誰,才可以進行遠端呼叫。所以不但要把將要呼叫的服務的引數傳過去,也要把要呼叫的服務資訊傳過去。

簡易RPC框架的架構

真的夠可以的,基於Netty實現了RPC框架

Dubbo 核心模組主要有四個:Registry 註冊中心、Provider 服務提供者、Consumer 服務消費者、Monitor監控,為了方便直接砍掉了監控模組,同時把服務提供者模組與註冊中心模組寫在一起,透過實現自己的簡易IOC容器,完成對服務提供者的例項化。

關於使用Netty進行Socket程式設計的部分可以參考Netty的官網 或者我之前的部落格《Netty編碼實戰與Channel生命週期》,在這裡Netty的編碼技巧和方式不作為本文的重點。

RPC框架編碼實現

首先需要引入的依賴如下(Netty + Lombok):

io。netty netty-all 4。1。6。Final org。projectlombok lombok 1。16。8

1、Registry與Provider

目錄結構如下:

───src └─main ├─java │ └─edu │ └─xpu │ └─rpc │ ├─api │ │ IRpcCalc。java │ │ IRpcHello。java │ │ │ ├─core │ │ InvokerMessage。java │ │ │ ├─provider │ │ RpcCalcProvider。java │ │ RpcHelloProvider。java │ │ │ └─registry │ MyRegistryHandler。java │ RpcRegistry。java │ └─resources───pom。xml

IRpcCalc。java與IRpcHello。java是兩個Service介面。IRpcCalc。java內容如下,完成模擬業務加、減、乘、除運算

public interface IRpcCalc { // 加 int add(int a, int b); // 減 int sub(int a, int b); // 乘 int mul(int a, int b); // 除 int div(int a, int b);}

IRpcHello。java,測試服務是否可用:

public interface IRpcHello { String hello(String name);}

至此API 模組就定義完成了,非常簡單的兩個介面。接下來,我們要確定傳輸規則,也就是傳輸協議,協議內容當然要自定義,才能體現出Netty 的優勢。

設計一個InvokerMessage類,裡面包含了服務名稱、呼叫方法、引數列表、引數值,這就是我們自定義協議的協議包:

@Datapublic class InvokerMessage implements Serializable { private String className; // 服務名稱 private String methodName; // 呼叫哪個方法 private Class<?>[] params; // 引數列表 private Object[] values; // 引數值}

透過定義這樣的協議類,就能知道我們需要呼叫哪個服務,服務中的哪個方法,方法需要傳遞的引數列表(引數型別+引數值),這些資訊正確傳遞過去了才能拿到正確的呼叫返回值。

接下來建立這兩個服務的具體實現類,IRpcHello的實現類如下:

public class RpcHelloProvider implements IRpcHello { public String hello(String name) { return “Hello, ” + name + “!”; }}

IRpcCalc的實現類如下:

public class RpcCalcProvider implements IRpcCalc { @Override public int add(int a, int b) { return a + b; } @Override public int sub(int a, int b) { return a - b; } @Override public int mul(int a, int b) { return a * b; } @Override public int div(int a, int b) { return a / b; }}

Registry 註冊中心主要功能就是負責將所有Provider的服務名稱和服務引用地址註冊到一個容器中(這裡為了方便直接使用介面類名作為服務名稱,前提是假定我們每個服務只有一個實現類),並對外發布。Registry 應該要啟動一個對外的服務,很顯然應該作為服務端,並提供一個對外可以訪問的埠。先啟動一個Netty服務,建立RpcRegistry 類,RpcRegistry。java的具體程式碼如下:

public class RpcRegistry { private final int port; public RpcRegistry(int port){ this。port = port; } public void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap。group(bossGroup, workGroup) 。channel(NioServerSocketChannel。class) 。childHandler(new ChannelInitializer() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel。pipeline(); // 處理拆包、粘包的編解碼器 pipeline。addLast(new LengthFieldBasedFrameDecoder(Integer。MAX_VALUE, 0, 4, 0, 4)); pipeline。addLast(new LengthFieldPrepender(4)); // 處理序列化的編解碼器 pipeline。addLast(“encoder”, new ObjectEncoder()); pipeline。addLast(“decoder”, new ObjectDecoder(Integer。MAX_VALUE, ClassResolvers。cacheDisabled(null))); // 自己的業務邏輯 pipeline。addLast(new MyRegistryHandler()); } }) 。option(ChannelOption。SO_BACKLOG, 128) 。childOption(ChannelOption。SO_KEEPALIVE, true); // 設定長連線 ChannelFuture channelFuture = serverBootstrap。bind(this。port)。sync(); System。out。println(“RPC Registry start listen at ” + this。port); channelFuture。channel()。closeFuture()。sync(); } catch (Exception e){ e。printStackTrace(); } finally { bossGroup。shutdownGracefully(); workGroup。shutdownGracefully(); } } public static void main(String[] args) { new RpcRegistry(8080)。start(); }}

接下來只需要實現我們自己的Handler即可,建立MyRegistryHandler。java,內容如下:

public class MyRegistryHandler extends ChannelInboundHandlerAdapter { // 在註冊中心註冊服務需要有容器存放 public static ConcurrentHashMap registryMap = new ConcurrentHashMap<>(); // 類名的快取位置 private static final List classCache = new ArrayList<>(); // 約定,只要是寫在provider下所有的類都認為是一個可以對完提供服務的實現類 // edu。xpu。rpc。provider public MyRegistryHandler(){ scanClass(“edu。xpu。rpc。provider”); doRegister(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Object result = new Object(); // 客戶端傳過來的呼叫資訊 InvokerMessage request = (InvokerMessage)msg; // 先判斷有沒有這個服務 String serverClassName = request。getClassName(); if(registryMap。containsKey(serverClassName)){ // 獲取服務物件 Object clazz = registryMap。get(serverClassName); Method method = clazz。getClass()。getMethod(request。getMethodName(), request。getParams()); result = method。invoke(clazz, request。getValues()); System。out。println(“request=” + request); System。out。println(“result=” + result); } ctx。writeAndFlush(result); ctx。close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause。printStackTrace(); ctx。close(); } // 實現簡易IOC容器 // 掃描出包裡面所有的Class private void scanClass(String packageName){ ClassLoader classLoader = this。getClass()。getClassLoader(); URL url = classLoader。getResource(packageName。replaceAll(“\\。”, “/”)); File dir = new File(url。getFile()); File[] files = dir。listFiles(); for (File file: files){ if(file。isDirectory()){ scanClass(packageName + “。” + file。getName()); }else{ // 拿出類名 String className = packageName + “。” + file。getName()。replace(“。class”, “”)。trim(); classCache。add(className); } } } // 把掃描到的Class例項化,放到Map中 // 註冊的服務名稱就叫做介面的名字 [約定優於配置] private void doRegister(){ if(classCache。size() == 0) return; for (String className: classCache){ try { Class<?> clazz = Class。forName(className); // 服務名稱 Class<?> anInterface = clazz。getInterfaces()[0]; registryMap。put(anInterface。getName(), clazz。newInstance()); } catch (Exception e) { e。printStackTrace(); } } }}

在這裡還透過反射實現了簡易的IOC容器,先遞迴掃描provider包底下的類,把這些類的物件作為服務物件放到IOC容器中進行管理,由於IOC是一個Map實現的,所以將類名作為服務名稱,也就是Key,服務物件作為Value。根據消費者傳過來的服務名稱,就可以找到對應的服務,到此,Registry和Provider已經全部寫完了。

2、consumer

目錄結構如下:

└─src ├─main │ ├─java │ │ └─edu │ │ └─xpu │ │ └─rpc │ │ ├─api │ │ │ IRpcCalc。java │ │ │ IRpcHello。java │ │ │ │ │ ├─consumer │ │ │ │ RpcConsumer。java │ │ │ │ │ │ │ └─proxy │ │ │ RpcProxy。java │ │ │ RpcProxyHandler。java │ │ │ │ │ └─core │ │ InvokerMessage。java │ │ │ └─resources └─test └─java└─ pom。xml

在看客戶端的實現之前,先梳理一下RPC流程。API 模組中的介面只在服務端實現了。因此,客戶端呼叫API 中定義的某一個介面方法時,實際上是要發起一次網路請求去呼叫服務端的某一個服務。而這個網路請求首先被註冊中心接收,由註冊中心先確定需要呼叫的服務的位置,再將請求轉發至真實的服務實現,最終呼叫服務端程式碼,將返回值透過網路傳輸給客戶端。整個過程對於客戶端而言是完全無感知的,就像呼叫本地方法一樣,所以必定要對客戶端的API介面做代理,隱藏網路請求的細節。

真的夠可以的,基於Netty實現了RPC框架

由上圖的流程圖可知,要讓使用者呼叫無感知,必須創建出代理類來完成網路請求的操作。

RpcProxy。java如下:

public class RpcProxy { public static T create(Class<?> clazz) { //clazz傳進來本身就是interface MethodProxy proxy = new MethodProxy(clazz); T result = (T) Proxy。newProxyInstance(clazz。getClassLoader(), new Class[]{clazz} , proxy); return result; } private static class MethodProxy implements InvocationHandler { private Class<?> clazz; public MethodProxy(Class<?> clazz) { this。clazz = clazz; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 如果傳進來是一個已實現的具體類 if (Object。class。equals(method。getDeclaringClass())) { try { return method。invoke(this, args); } catch (Throwable t) { t。printStackTrace(); } // 如果傳進來的是一個介面(核心) } else { return rpcInvoke(method, args); } return null; } // 實現介面的核心方法 public Object rpcInvoke(Method method, Object[] args) { // 傳輸協議封裝 InvokerMessage invokerMessage = new InvokerMessage(); invokerMessage。setClassName(this。clazz。getName()); invokerMessage。setMethodName(method。getName()); invokerMessage。setValues(args); invokerMessage。setParams(method。getParameterTypes()); final RpcProxyHandler consumerHandler = new RpcProxyHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap。group(group) 。channel(NioSocketChannel。class) 。option(ChannelOption。TCP_NODELAY, true) 。handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch。pipeline(); pipeline。addLast(“frameDecoder”, new LengthFieldBasedFrameDecoder(Integer。MAX_VALUE, 0, 4, 0, 4)); //自定義協議編碼器 pipeline。addLast(“frameEncoder”, new LengthFieldPrepender(4)); //物件引數型別編碼器 pipeline。addLast(“encoder”, new ObjectEncoder()); //物件引數型別解碼器 pipeline。addLast(“decoder”, new ObjectDecoder(Integer。MAX_VALUE, ClassResolvers。cacheDisabled(null))); pipeline。addLast(“handler”, consumerHandler); } }); ChannelFuture future = bootstrap。connect(“localhost”, 8080)。sync(); future。channel()。writeAndFlush(invokerMessage)。sync(); future。channel()。closeFuture()。sync(); } catch (Exception e) { e。printStackTrace(); } finally { group。shutdownGracefully(); } return consumerHandler。getResponse(); } }}

我們透過傳進來的介面物件,獲得了要呼叫的服務名,服務方法名,引數型別列表,引數列表,這樣就把自定義的RPC協議包封裝好了,只需要把協議包發出去等待結果返回即可,所以為了接收返回值資料還需要自定義一個接收用的Handler,RpcProxyHandlerdiamante如下:

public class RpcProxyHandler extends ChannelInboundHandlerAdapter { private Object result; public Object getResponse() { return result; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System。out。println(“client exception is general”); }}

這樣就算是完成了整個流程,下面開始測試一下吧,測試的RpcConsumer。java程式碼如下:

public class RpcConsumer { public static void main(String[] args) { // 本機之間的正常呼叫 // IRpcHello iRpcHello = new RpcHelloProvider(); // iRpcHello。hello(“Tom”); // 肯定是用動態代理來實現的 // 傳給它介面,返回一個介面的例項,偽代理 IRpcHello rpcHello = RpcProxy。create(IRpcHello。class); System。out。println(rpcHello。hello(“ZouChangLin”)); int a = 10; int b = 5; IRpcCalc iRpcCalc = RpcProxy。create(IRpcCalc。class); System。out。println(String。format(“%d + %d = %d”, a, b, iRpcCalc。add(a, b))); System。out。println(String。format(“%d - %d = %d ”, a, b, iRpcCalc。sub(a, b))); System。out。println(String。format(“%d * %d = %d”, a, b, iRpcCalc。mul(a, b))); System。out。println(String。format(“%d / %d = %d”, a, b, iRpcCalc。div(a, b))); }}

3、效果測試

先開啟Registry,執行埠是8080:

真的夠可以的,基於Netty實現了RPC框架

開啟consumer開始呼叫

真的夠可以的,基於Netty實現了RPC框架

呼叫完成後可以看到呼叫結果正確,並且在Registry這邊也看到了日誌:

真的夠可以的,基於Netty實現了RPC框架

可以發現,簡易RPC框架順利完工!

作者:zchanglin

連結:https://juejin。cn/post/6948351262668636174