2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

java nio非常難駕馭,就像我在上一篇文章中處理的檔案伺服器那樣,也只是考慮並處理了部分情況,然而可能還是要出錯,可擴充套件性也不好。

netty就是這樣的一種框架,讓Java nio變得:

網路伺服器程式設計變得容易

可用性變高

擴充套件性好

netty的基本工作方式

那麼,Netty究竟是怎麼執行的? Netty使用

多Reactor

多執行緒模型。

這種模型是把Reactor執行緒拆分了mainReactor和subReactor兩個部分,mainReactor只處理連線事件,讀寫事件交給subReactor來處理。mainRactor只處理連線事件,一個埠用一個執行緒來處理。處理讀寫事件的subReactor個數一般和作業系統核數相關,一個連線對應一個執行緒。業務邏輯由業務執行緒池處理。

本文會引用一個例子,先談談netty使用的基本資料結構,然後梳理清楚使用netty建立連線的過程。

從一個例子開始

maven包依賴

io。netty netty-all 4。1。72。Final複製程式碼

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

server

server可以認為和我在nio的實現裡面的區別是:acceptor單獨一個執行緒池,其他io事件或者任務一個執行緒池。然而我當時沒有這麼實現,只是給業務流程一個執行緒池。

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

netty server 的程式碼示意如下:

public class NettyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup parentGroup = new NioEventLoopGroup(); //NettyRuntime。availableProcessors() * 2 執行緒數 EventLoopGroup childGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap。group(parentGroup, childGroup) 。channel(NioServerSocketChannel。class) 。childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch。pipeline(); pipeline。addLast(new StringDecoder());//解碼為字串 pipeline。addLast(new StringEncoder());//編碼為二進位制 pipeline。addLast(new DemoSocketServerHandler()); } }); ChannelFuture future = bootstrap。bind(8888)。sync(); System。out。println(“future。channel() = ” + future。channel()); System。out。println(“伺服器已啟動。。。”); future。channel()。closeFuture()。sync(); } finally { parentGroup。shutdownGracefully(); childGroup。shutdownGracefully(); } }}複製程式碼

server bind之後會啟動一個執行緒阻塞在select,等待著連線了。

client

netty client的編碼模型簡單很多,如下:

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

netty client 的程式碼示意如下:

Bootstrap bootstrap = new Bootstrap();bootstrap。group(eventLoopGroup) 。channel(NioSocketChannel。class) 。handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch。pipeline(); pipeline。addLast(new StringDecoder(CharsetUtil。UTF_8)); pipeline。addLast(new DemoSocketClientHandler()); pipeline。addLast(new StringEncoder(CharsetUtil。UTF_8)); } });ChannelFuture future = bootstrap。connect(“localhost”, 8888)。sync();複製程式碼

客戶端也是一個連線池,每個連線一個執行緒,這裡只使用一個

有點浪費

了,但是這裡只是一個簡單的demo,暫且這樣處理吧。

demo總結

可以看到,總體編碼簡單易懂,但要明白具體的執行機制,卻要費一番功夫。下面先介紹demo中用到的基本資料結構,然後再試圖弄清楚netty用於連線的機理。

資料結構

如果已經比較瞭解這塊的資料結構,可直接跳到流程部分。

Bootstrap

ServerBootstrap是為netty伺服器設定服務的,像上面這樣透過ServerBootstrap就可以配置出一個完善的netty服務來

Bootstrap是為netty客戶端設定服務的,像上面這樣透過Bootstrap就可以配置出一個完善的netty客戶端

Channel

可以認為

channel是一個連線

,Channel聚合了一組功能,不但包括網路IO操作,還包括獲取該Channel的eventloop、以及獲取緩衝分配器allocator, 和pipeline等。所以channel是netty裡面最重要的資料結構。

NioEventLoop

NioEventLoopGroup,主要管理 eventLoop 的生命週期,可以理解為是一個執行緒池。

NioEventLoop 中維護了一個執行緒和任務佇列,支援非同步提交執行任務,執行緒啟動時會呼叫 NioEventLoop 的 run 方法,執行I/O任務和非 I/O 任務:

I/O 任務,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法觸發。

非IO 任務,新增到 taskQueue 中的任務,如 register0、bind0 等任務以及一些定時任務,由 runAllTasks 方法觸發。

兩種任務的執行時間比由變數 ioRatio 控制,預設為 50,則表示允許非 IO 任務執行的時間與 IO 任務的執行時間相等。

非同步的api

nio程式設計的時候討論到的是

非阻塞的api

,非阻塞是不夠方便的,往往要和迴圈放在一起操作,比如之前的檔案伺服器。

netty的設計卻不同,主要需要使用到

非同步的api

,這裡談到的非同步的api其實是一種

軟體設計

上的事情,引入這個,對於Java nio編碼帶來了極大的幫助。

下面先了解一下什麼是非同步的api

Future

Future可能大家已經非常熟知了,Future是JDK中的介面,當引入執行緒池的時候,Future也引入了,可以用來表示提交的任務的結果。 Future介面提供兩個方法:

get: 同步阻塞當前呼叫的執行緒,直到結果被設定

cancel:取消非同步操作,但是結果是未知的。如果操作已經完成,或者傳送其他未知的原因拒絕取消,取消操作將會失敗。

ChannelFuture和Promise介面

因為netty的操作和函式都和channel相關,故而netty裡面給自己的Future介面命名為ChannelFuture

netty的api是建立在future之上的。

比如這個操作bootstrap。connect(“localhost”, 8888)就是非同步的,bootstrap。bind(8888)是非同步的

netty提供的write flush read bind 等函式也都是非同步的

ChannelPromise繼承了ChannelFuture介面和Promise介面。

bind和connect呼叫之後會進入這段函式:

ChannelFuture regFuture = config()。group()。register(channel);if (regFuture。cause() != null) { if (channel。isRegistered()) { channel。close(); } else { channel。unsafe()。closeForcibly(); }}public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this));}複製程式碼

這個register函數里面生成了一個DefaultChannelPromise的例項new DefaultChannelPromise(channel, this),是實現了ChannelFuture介面的。

呼叫者可透過addListener系列設定毀掉,另一邊,非同步執行的地方透過setFail,setSuccess修改channelFuture的狀態,trySuccess函式會呼叫listener的函式執行。

Channel的大動脈——pipeline

channel例項有很多成員,包括parent,id,unsafe和pipeline。 其中pipeline是Channel的大動脈。

channel初始化的時候使用newChannelPipeline初始化了pipeline

protected AbstractChannel(Channel parent) { this。parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();}複製程式碼

看看pipeline的初始化吧

protected DefaultChannelPipeline(Channel channel) { this。channel = ObjectUtil。checkNotNull(channel, “channel”); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head。next = tail; tail。prev = head;}複製程式碼

在pipeline初始化的過程中,包括:

tail的初始化,TailContext,繼承了AbstractChannelHandlerContext tail實現了ChannelInboundHandler

head 的初始化,HeadContext,繼承了AbstractChannelHandlerContext head實現了ChannelOutboundHandler, ChannelInboundHandler 所以不管是inbound還是outbound的事件都會經過head的處理。

這樣就構造了一個雙向連結串列 head。next=tail,tail。prev=head,然後,透過pipiline的呼叫就可以使用這個雙向連結串列繼續處理了。而且,可以看到head和tail不僅有context的功能,也有Handler的功能。

ChannelHandlerContext

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, handler。getClass()); this。handler = handler; } @Override public ChannelHandler handler() { return handler; }}複製程式碼

context是pipieline連結串列中的一個節點 context提供了一些方法

DefaultChannelHandlerContext初始化了handler,讓Handler處理真正的邏輯

AbstractChannelHandlerContext對應的pipeline、executor、prev和next

AbstractChannelHandlerContext 提供了使得鏈路運轉的方法

findContextOutbound/Inbound

inbound找下一個MASK_ONLY_INOUND的next

ounbound 找下一個MASK_ONLY_OUTBOUND的 prev

一系列的invoke和fire方法

fireChannelActive,透過findContextInbound找到下一個context,再透過invoke進行具體的呼叫

invokeChannelActive(ctx)是上一個ctx呼叫下一個ctx,ctx呼叫本身的方法是不帶引數的invokeChannelActive

static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next。executor(); if (executor。inEventLoop()) { next。invokeChannelActive(); } else { executor。execute(new Runnable() { @Override public void run() { next。invokeChannelActive(); } }); }}private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler())。channelActive(this); } catch (Throwable t) { invokeExceptionCaught(t); } } else { fireChannelActive(); }}複製程式碼

除了fire系列,還有bind/connect/disconnect/close/disregister/read/write/flush等方法,和fire系列功能類似,只是方向不同,透過findContextOutbound找到下一個context,invoke進行真正的呼叫。

ChannelHandler

每一個

ChannelHandlerContext組合了一個Handler

例項成員

ChannelHandler 是一個介面,處理 I/O 事件或攔截 I/O 操作,並將其轉發到其 ChannelPipeline(業務處理鏈)中的下一個處理程式。

ChannelHandler 本身並沒有提供很多方法,因為這個介面有許多的方法需要實現,方便使用期間,可以繼承它的子類:

ChannelInboundHandler 用於處理入站 I/O 事件。

ChannelOutboundHandler 用於處理出站 I/O 操作。

或者使用以下介面卡類:

ChannelInboundHandlerAdapter 用於處理入站 I/O 事件。

ChannelOutboundHandlerAdapter 用於處理出站 I/O 操作。

ChannelDuplexHandler 用於處理入站和出站事件。

Outbound和Inbound

通常來說,channelXXX,表示inbound事件,否則是outBound事件

比如說ChannelOutboundHandlerAdapter的connect read write flush bind close bind其實也不會向外寫資料,從tail開始處理。

ChannelInboundHandlerAdapter, channelRegistered是從head開始 channelUngistered也是從head開始開始處理起 channelRead,channeReadComplete channelWritabelChanged,這個應該需要通知handler

AbstractNioUnsafe

AbstractNioUnsafe提供了很多方法,一般是透過HeadContext的提供的io方法來呼叫的,主要是底層的nio方法處理,可能會註冊一些定時任務,比如是否連線成功啊,傳送成功啊,沒傳送成功則怎麼處理的。就像connect這個註冊了連線超時的事件。

開啟eventLoop

啟動新執行緒的入口

connect和bind、以及accept操作的時候都會呼叫group。register(channel),這個時候會啟動新執行緒。

bind流程的啟動

final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory。newChannel(); init(channel); } catch (Throwable t) { 。。。 } ChannelFuture regFuture = config()。group()。register(channel); if (regFuture。cause() != null) { 。。。 } return regFuture;}複製程式碼

初始了一個channel之後,group()。register(channel)選擇group裡的一個

eventLoop

,執行它的register函式

promise。channel()。unsafe()。register(this, promise);複製程式碼

if (eventLoop。inEventLoop()) { register0(promise);} else { try { eventLoop。execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger。warn( “Force-closing a channel whose registration task was not accepted by an event loop: {}”, AbstractChannel。this, t); closeForcibly(); closeFuture。setClosed(); safeSetFailure(promise, t); }}複製程式碼

如果還沒有啟動執行緒,則執行execute函式,則會啟動一個新的執行緒。 NioEventLoop的父類SingleThreadEventExecutor的execute函式如下

private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); 。。。}複製程式碼

先addtask,然後啟動執行緒,並執行了

ThreadPerTaskExecutor{@Overridepublic void execute(Runnable command) { threadFactory。newThread(command)。start();}}複製程式碼

在這裡啟動了一個執行緒。

可以倒著看這裡的初始化過程:

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) { ObjectUtil。checkNotNull(executor, “executor”); ObjectUtil。checkNotNull(eventExecutor, “eventExecutor”); return new Executor() { @Override public void execute(final Runnable command) { executor。execute(apply(command, eventExecutor)); } };}複製程式碼

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this。addTaskWakesUp = addTaskWakesUp; this。maxPendingTasks = Math。max(16, maxPendingTasks); this。executor = ThreadExecutorMap。apply(executor, this); taskQueue = newTaskQueue(this。maxPendingTasks); rejectedExecutionHandler = ObjectUtil。checkNotNull(rejectedHandler, “rejectedHandler”);}複製程式碼

也就是說,NioEventLoop初始化的時候使用ThreadExecutorMap。apply(executor, this)初始化了executor成員,executor成員是一個ThreadPerTaskExecutor型別。

register沒有註冊SelectionKey

啟動新執行緒後,執行register0任務。 在connect和bind、以及accept的時候都會呼叫register,但是各自關心的SelectionKey並不是在register0任務裡面註冊的。SelectionKey是要用的時候才註冊。

@Overrideprotected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel()。register(eventLoop()。unwrappedSelector(), 0, this); return; } 複製程式碼

doRegister並沒有真的去註冊SelectionKey,因為這裡傳入的引數是0。

連線的流程

我們這裡認為連線有3個流程,server bind一個埠並監聽這個埠,client發起連線,然後服務端透過accept和客戶端建立連線。

server bind

如上面eventLoop真正開啟的時候,我們當時舉了bind的例子。bind的時候選擇了一個NIOEventloop去執行,這個start之後就會啟動NIOEventloop run的迴圈了。

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

任務0:register0

在bind流程裡面,任務register0執行完之後加了一些任務,就是下面的任務1和任務2。

任務1: 新增handler-ServerBootstrapAcceptor

這個任務的新增點是ServerBootStrap的init方法,這是register0 裡面addhandler的一個方法設定的。

ch。eventLoop()。execute(new Runnable() { @Override public void run() { pipeline。addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); 複製程式碼

pipeline。addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs))給當前的pipeline加了一個handler ServerBootstrapAcceptor,這個就是後面講accept流程裡面要用到的。

任務2:pipeline。bind

這個任務的新增點是doBind0方。

private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered。 Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation。 channel。eventLoop()。execute(new Runnable() { @Override public void run() { if (regFuture。isSuccess()) { channel。bind(localAddress, promise)。addListener(ChannelFutureListener。CLOSE_ON_FAILURE); } else { promise。setFailure(regFuture。cause()); } } });}複製程式碼

bind是出站事件,channel會呼叫

pipeline。bind(localAddress, promise)複製程式碼

最終這裡是呼叫了unsafe的bind,然後又添加了一個任務。

bind0也會失敗,比如埠本身就被佔用,就會呼叫:

channel。bind(localAddress, promise)。addListener(ChannelFutureListener。CLOSE_ON_FAILURE)ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future。isSuccess()) { future。channel()。close(); } }};複製程式碼

任務3:fireChannelActive

unsafe。bind註冊的任務是:

new Runnable() { @Override public void run() { pipeline。fireChannelActive(); }}複製程式碼

這個任務會讓head HeadContext的channelActive方法去註冊accept的key。

public void channelActive(ChannelHandlerContext ctx) { ctx。fireChannelActive(); readIfIsAutoRead();}複製程式碼

readIfIsAutoRead呼叫了read這個outBound事件,一直觸發到head,也就是unsafe執行了doRead

在doRead這裡呼叫了 doBeginRead,註冊了SelectionKey。OP_ACCEPT

protected void doBeginRead() throws Exception { // Channel。read() or ChannelHandlerContext。read() was called final SelectionKey selectionKey = this。selectionKey; if (!selectionKey。isValid()) { return; } readPending = true; final int interestOps = selectionKey。interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey。interestOps(interestOps | readInterestOp); }}複製程式碼

在AbstractNioChannel裡面readInterestOp為SelectionKey。OP_ACCEPT,這是因為這個channel初始化的時候設定了。

public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey。OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel()。socket());}複製程式碼

bind流程總結

bind流程裡面觸發了四種任務,任務是先進先出的。觸發了一個inbound呼叫channelActive,觸發了兩個outbound呼叫bind和read。

client connect

上面整理了bind的流程,是從任務新增的角度來講的。現在可以看看bind的流程是怎樣的。

流程

connect的整體流程是:

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

流程和bind基本相同,略有區別。這是因為connect呼叫之後添加了一些回撥函式:

public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { ObjectUtil。checkNotNull(remoteAddress, “remoteAddress”); validate(); return doResolveAndConnect(remoteAddress, localAddress);}複製程式碼

doResolveAndConnect新增的listener

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture。addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure。 Throwable cause = future。cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel。 promise。setFailure(cause); } else { // Registration was successful, so set the correct executor to use。 // See https://github。com/netty/netty/issues/2586 promise。registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } }});複製程式碼

在註冊完成之後,回撥這個函式doResolveAndConnect0。而doResolveAndConnect0添加了一個任務:

{ @Override public void run() { if (localAddress == null) { channel。connect(remoteAddress, connectPromise); } else { channel。connect(remoteAddress, localAddress, connectPromise); } connectPromise。addListener(ChannelFutureListener。CLOSE_ON_FAILURE); }}複製程式碼

也就是真正的連線事件channel。connect

觸發pipeline connect

connect是出站事件,整體的pipiline走向為:

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

上面的任務呼叫的就是pipeline。connect,透過這樣的呼叫鏈最後交給了unsafe的connect函式去處理,會呼叫Java NIO的connect處理。

告警和重連處理

unsafe的connect處理如下:

public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try {。。。 if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout。 int connectTimeoutMillis = config()。getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop()。schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel。this。connectPromise; if (connectPromise != null && !connectPromise。isDone() && connectPromise。tryFailure(new ConnectTimeoutException( “connection timed out: ” + remoteAddress))) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit。MILLISECONDS); } 。。。 } }。。。}複製程式碼

首先呼叫doConnect:

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { boolean connected = SocketUtils。connect(javaChannel(), remoteAddress); if (!connected) { selectionKey()。interestOps(SelectionKey。OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } }}複製程式碼

這裡的doConnect SocketUtils。connect(javaChannel(), remoteAddress)的返回結果connected false,呼叫selectionKey()。interestOps(SelectionKey。OP_CONNECT)

則在connect函數里面走的是else流程

啟動了一個定時任務,看是否超時,預設的超時時間是30s

也就是說這個時候連線還沒有成功,透過Selector監聽SelectionKey。OP_CONNECT,開始監聽IO事件來發起連線。

if ((readyOps & SelectionKey。OP_CONNECT) != 0) { int ops = k。interestOps(); ops &= ~SelectionKey。OP_CONNECT; k。interestOps(ops); unsafe。finishConnect();}複製程式碼

處理這個IO事件的時候,會先

取消註冊SelectionKey.OP_CONNECT

,然後:

doFinishConnect();fulfillConnectPromise(connectPromise, wasActive);複製程式碼

fulfillConnectPromise取消超時定時任務

@Overridepublic boolean cancel(boolean mayInterruptIfRunning) { boolean canceled = super。cancel(mayInterruptIfRunning); if (canceled) { scheduledExecutor()。removeScheduled(this); } return canceled;}複製程式碼

fulfillConnectPromise函數里面還會觸發pipeline。fireChannelActive,主要的功能是註冊讀操作。

channelActive

fireChannelActive執行流程如下:

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

在active呼叫之後,透過pipeline的read註冊readInterestOp,對於serverSocketChannel是16:OP_ACCEPT,socketChannel是1 :OP_READ。

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

connect流程總結

connect流程裡面觸發了oubound connect的呼叫,channelActive inbound的呼叫。並且進行了SelectionKey。OP_CONNECT的IO事件的處理:

連線成功會取消註冊SelectionKey。OP_CONNECT,移除延時任務,註冊SelectionKey。OP_READ事件

連線失敗會不停的重連,直到超時則觸發超時任務

server accept

客戶端connect的時候,server收到accept的event,parentGroup的reactor監聽epoll_wait的accept事件,連線建立完成之後,會觸發一個新的NioEventLoop執行緒去處理這條連線的任務。

bind後的讀事件

if ((readyOps & (SelectionKey。OP_READ | SelectionKey。OP_ACCEPT)) != 0 || readyOps == 0) { unsafe。read();}複製程式碼

監聽到 SelectionKey。OP_ACCEPT於是呼叫unsafe。read()處理。

unsafe。read和pipeline

對於NioServerSocketChannel,繫結的unsafe型別是NioMessageUnsafe。NioMessageUnsafe裡的read函式會呼叫到doReadMessages:

protected int doReadMessages(List buf) throws Exception { SocketChannel ch = SocketUtils。accept(javaChannel()); try { if (ch != null) { buf。add(new NioSocketChannel(this, ch)); return 1; } }。。。}複製程式碼

呼叫doReadMessages之後,會觸發

pipeline。fireChannelRead(readBuf。get(i));複製程式碼

accept整體流程如下:

2022年了,Netty究竟是怎麼執行的-連線流程的深入剖析

我們知道NioMessageChannel已經在bind流程將

ServerBootstrapAcceptor

新增進來作為handler,最終呼叫到這個Handler的channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child。pipeline()。addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup。register(child)。addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future。isSuccess()) { forceClose(child, future。cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}複製程式碼

在這裡,將msg物件轉為Channel型別,將bootstrap設定的childHandler新增到child channel的pipeline。最後 又回到那個熟悉的register:childGroup。register(child)

這樣會在childGroup啟動一個新的NIOEventLoop執行緒,呼叫register0,register0任務裡面觸發了,

pipeline。fireChannelActive();複製程式碼

就像上面提到的ChannelActive反應鏈,裡面觸發了read op的註冊,後面的流程也已經很熟悉了。

accept流程總結

accept流程裡面進行了SelectionKey。OP_ACCEPT的IO事件的處理,觸發了inbound channelRead的呼叫,使用bind過程繫結的ServerBootstrapAcceptor來處理read事件,給新連線分配了新的處理執行緒,並且監聽了可讀事件。

總結

netty客戶端和伺服器透過

bind-connect-accept

這樣的互動建立了一條連線,於是可以進行資料傳輸了。