編者注:Netty是JAVA領(lǐng)域有名的開源網(wǎng)絡(luò)庫,特點(diǎn)是高性能和高擴(kuò)展性,因此很多流行的框架都是基于它來構(gòu)建的,比如我們熟知的Dubbo、Rocketmq、Hadoop等,針對高性能RPC,一般都是基于Netty來構(gòu)建,比如soft-bolt??傊痪湓挘琂ava小伙伴們需要且有必要學(xué)會使用Netty并理解其實(shí)現(xiàn)原理。
關(guān)于Netty的入門講解可參考:Netty 入門,這一篇文章就夠了
Netty的啟動流程(ServerBootstrap),就是創(chuàng)建NioEventLoopGroup(內(nèi)部可能包含多個NioEventLoop,每個eventLoop是一個線程,內(nèi)部包含一個FIFO的taskQueue和Selector)和ServerBootstrap實(shí)例,并進(jìn)行bind的過程(bind流程涉及到channel的創(chuàng)建和注冊),之后就可以對外提供服務(wù)了。
Netty的啟動流程中,涉及到多個操作,比如register、bind、注冊對應(yīng)事件等,為了不影響main線程執(zhí)行,這些工作以task的形式提交給NioEventLoop,由NioEventLoop來執(zhí)行這些task,也就是register、bind、注冊事件等操作。
NioEventLoop(準(zhǔn)確來說是SingleThreadEventExecutor)中包含了private volatile Thread thread,該thread變量的初始化是在new的線程第一次執(zhí)行run方式時(shí)才賦值的,這種形式挺新穎的。

Netty啟動流程圖如下所示:

大致了解了Netty啟動流程之后,下面就按照Netty啟動流程中涉及到的源碼來進(jìn)行分析。
netty啟動流程分為server端和client端,不同之處就是前者監(jiān)聽端口,對外提供服務(wù)(socket->bind->listen操作),對應(yīng)類ServerBootstrap;后者主動去連接遠(yuǎn)端端口(socket->connect),對應(yīng)類Bootstrap。
server端啟動流程
server端啟動流程可以理解成創(chuàng)建ServerBootstrap實(shí)例的過程,就以下面代碼為例進(jìn)行分析(echo服務(wù)):
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // bossGroup處理connect事件 // workerGroup處理read/write事件 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NIOServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { // 當(dāng)連接建立后(register到childWorkerGroup前)初始化channel.pipeline ch.pipeline().addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
EventLoopGroup創(chuàng)建
EventLoopGroup中可能包含了多個EventLoop,EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應(yīng)一個線程,其內(nèi)部會維護(hù)一個selector和taskQueue,負(fù)責(zé)處理客戶端請求和內(nèi)部任務(wù),內(nèi)部任務(wù)如ServerSocketChannel注冊和ServerSocket綁定操作等。關(guān)于NioEventLoop,后續(xù)專門寫一篇文章分析,這里就不再展開,只需知道個大概即可,其架構(gòu)圖如下:

EventLoopGroup創(chuàng)建本質(zhì)就是創(chuàng)建多個NioEventLoop,這里創(chuàng)建NioEventLoop就是初始化一個Reactor,包括selector和taskQueue。主要邏輯如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 創(chuàng)建NioEventLoop實(shí)例 children = new EventExecutor[nThreads]; // 初始化NioEventLoop,實(shí)際調(diào)用的是NioEventLoopGroup.newChild方法 for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } // 多個NioEventLoop中選擇策略 chooser = chooserFactory.newChooser(children); } NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // 創(chuàng)建taskQueue super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); // 是不是很熟悉,java nio selector操作 provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrAppedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
EventLoopGroup創(chuàng)建OK后,啟動的第一步就算完成了,接下來該進(jìn)行bind、listen操作了。
ServerBootstrap流程
bind操作
bind操作是ServerBootstrap流程重要的一環(huán),bind流程涉及到NioChannel的創(chuàng)建、初始化和注冊(到Selector),啟動NioEventLoop,之后就可以對外提供服務(wù)了。
public ChannelFuture bind(SocketAddress localAddress) { validate(); // 參數(shù)校驗(yàn) return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 初始化注冊操作 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // 2. doBind0操作 if (regFuture.isDone()) { // register已完成,這里直接調(diào)用doBind0 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // register還未完成,注冊listener回調(diào),在回調(diào)中調(diào)用doBind0 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { /** * channel register完成(注冊到Selector并且調(diào)用了invokeHandlerAddedIfNeeded)之后, * 會調(diào)用safeSetSuccess,觸發(fā)各個ChannelFutureListener,最終會調(diào)用到這里的operationComplete方法 */ @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
這里涉及到2個操作,一個是channel的創(chuàng)建、初始化、注冊操作,另一個是bind操作,下面兵分兩路,分別來講。
注意,這里如果main線程執(zhí)行到regFuture.isDone()時(shí),register還未完成,那么main線程是不會直接調(diào)用bind操作的,而是往regFuture上注冊一個Listenner,這樣channel register完成(注冊到Selector并且調(diào)用了invokeHandlerAddedIfNeeded)之后,會調(diào)用safeSetSuccess,觸發(fā)各個ChannelFutureListener,最終會調(diào)用到這里的operationComplete方法,進(jìn)而在執(zhí)行bind操作。
channel初始化、注冊操作
final ChannelFuture initAndRegister() { Channel channel = null; try { // 1.創(chuàng)建(netty自定義)Channel實(shí)例,并初始化 // channel為 NioServerSocketChannel 實(shí)例,NioServerSocketChannel的父類AbstractNioChannel保存有nio的ServerSocketChannel channel = channelFactory.newChannel(); // 2.初始化channel() init(channel); } catch (Throwable t) { } // 3.向Selector注冊channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
這里重點(diǎn)關(guān)注下初始化channel流程,主要操作是設(shè)置channel屬性、設(shè)置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel注冊到selector之后被回調(diào)的。
/** * 初始channel屬性,也就是ChannelOption對應(yīng)socket的各種屬性。 * 比如 SO_KEEPALIVE SO_RCVBUF ... 可以與linux中的setsockopt函數(shù)對應(yīng)起來。 * 最后將ServerBootstrapAcceptor添加到對應(yīng)channel的ChannelPipeline中。 */@Overridevoid init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } ChannelPipeline p = channel.pipeline(); // 獲取childGroup和childHandler,傳遞給ServerBootstrapAcceptor final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer() { /** * 在register0中,將channel注冊到Selector之后,會調(diào)用invokeHandlerAddedIfNeeded, * 進(jìn)而調(diào)用到這里的initChannel方法 */ @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 這里注冊一個添加ServerBootstrapAcceptor的任務(wù) ch.eventLoop().execute(new Runnable() { @Override public void run() { // 添加ServerBootstrapAcceptor pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
channel初始化之后就該將其注冊到selector,即下面的register流程:
public ChannelFuture register(Channel channel) { // next()挑選一個EventLoop,默認(rèn)輪詢選擇某個NioEventLoop return next().register(channel); } public ChannelFuture register(final ChannelPromise promise) { promise.channel().unsafe().register(this, promise); return promise; } // AbstractChannel public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; // 直接執(zhí)行register0或者以任務(wù)方式提交執(zhí)行 // 啟動時(shí),首先執(zhí)行到這里的是main線程,所以是以任務(wù)的方式來提交執(zhí)行的。 // 也就是說,該任務(wù)是NioEventLoop第一次執(zhí)行的任務(wù),即調(diào)用register0 if (eventLoop.inEventLoop()) { register0(promise); } else { // 往NioEventLoop中(任務(wù)隊(duì)列)添加任務(wù)時(shí),如果NioEventLoop線程還未啟動,則啟動該線程 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } }
register操作
register操作之后伴隨著多個回調(diào)及l(fā)istener的觸發(fā):
// AbstractChannel$AbstractUnsafe private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; // 這里調(diào)用的是AbstractNioChannel.doRegister // 這里將channel注冊上去,并沒有關(guān)注對應(yīng)的事件(read/write事件) doRegister(); neverRegistered = false; registered = true; // 調(diào)用handlerAdd事件,這里就會調(diào)用initChannel方法,設(shè)置channel.pipeline,也就是添加 ServerBootstrapAcceptor pipeline.invokeHandlerAddedIfNeeded(); // 調(diào)用operationComplete回調(diào) safeSetSuccess(promise); // 回調(diào)fireChannelRegistered pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { // 回調(diào)fireChannelActive pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } }
上面代碼中的initChannel回調(diào)也就是設(shè)置對外監(jiān)聽channel的channelHanlder為ServerBootstrapAcceptor;operationComplete回調(diào)也就是觸發(fā)ChannelFutureListener.operationComplete,這里會進(jìn)行后續(xù)的doBind操作。
// AbstractBootstrap private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // doBind0向EventLoop任務(wù)隊(duì)列中添加一個bind任務(wù)來完成后續(xù)操作。 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // bind操作 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } }); }
bind操作
在回顧上面的bind操作代碼,bind操作是在register之后進(jìn)行的,因?yàn)閞egister0是由NioEventLoop執(zhí)行的,所以main線程需要先判斷下future是否完成,如果完成直接進(jìn)行doBind即可,否則添加listener回調(diào)進(jìn)行doBind。

bind操作及后續(xù)初始化操作(channelActive回調(diào)、設(shè)置監(jiān)聽事件)
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { // 調(diào)用底層bind操作 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } // 最后底層bind邏輯bind入?yún)薭acklog,也就是底層會進(jìn)行l(wèi)isten操作 // DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } public void channelActive(ChannelHandlerContext ctx) throws Exception { // 回調(diào)fireChannelActive ctx.fireChannelActive(); // 設(shè)置selectKey監(jiān)聽事件,對于監(jiān)聽端口就是SelectionKey.OP_ACCEPT,對于新建連接就是SelectionKey.OP_READ readIfIsAutoRead(); }
到這里為止整個netty啟動流程就基本接近尾聲,可以對外提供服務(wù)了。