Netty 是一個在 Java 生態里應用非常廣泛的的網絡編程工具包,它在 2004 年誕生到現在依然是火的一塌糊涂,光在 github 上就有 30000 多個項目在用它。所以要想更好地掌握網絡編程,我想就繞不開 Netty。所以今天我們就來分析分析 Netty 內部網絡模塊的工作原理。
友情提示,本文算上代碼將近有兩三萬字,比較長,如果時間緊迫中間部分可以跳著看。第一節和最后的第六節建議必讀。當然直接拖到尾部收藏點贊點轉發,也是 ok 的,哈哈!
另外,今天又給大家申請到了贊助,在文末給大家申請了5本冰河的新書《深入理解高并發編程》,抽獎送給大家。
(相關資料圖)
我們首先找一個 Netty 的例子,本篇文章整體都是圍繞這個例子來展開敘述的。我們下載 Netty 的源碼,并在 examples 中找到 echo 這個 demo。同時,為了防止代碼更新導致對本文敘述的影響,我們切到 4.1 分支上來。
# git checkout https://github.com/netty/netty.git# git checkout -b 4.1# cd example/src/main/java/io/netty/example/echo
在這個 demo 的 EchoServer 中,展示了使用 Netty 寫 Server 的經典用法。(飛哥在文章中會在不影響核心邏輯的表達上,對原始代碼盡心適當的精簡,比如下面代碼中的 try 就被我丟了)
public final class EchoServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer 如果你是一個 Java 新手,或者干脆像飛哥一樣沒用 Netty 寫過服務,相信上述代碼基本是看不懂的。究其根本原因是相比 C/C++ ,Java 的封裝程度比較高。Java 語言本身的 JVM 中 NIO 對網絡的封裝就已經屏蔽了很多底層的概念了,再加上 Netty 又封裝了一層,所以 Java 開發者常用的一些術語和概念和其它語言出入很大。 比如上面代碼中的 Channel、NioEventLoopGroup 等都是其它語言中所沒見過的。不過你也不用感到害怕,因為這其中的每一個概念都是 socket、進程等底層概念穿了一身不同的衣服而已。接下來我們分別細了解一下這些概念。 如果你沒接觸過 Netty,可以簡單把 NioEventLoopGroup 理解為一個線程池就可以。每一個 NioEventLoopGroup 內部包含一個或者多個 NioEventLoop。 其中 NioEventLoop 是對線程、epoll 等概念進行了一個集中的封裝。 首先,EventLoop 本身就是一個線程。為什么這么說,我們通過看 NioEventLoop 的繼承關系就能看出來。NioEventLoop 繼承于 SingleThreadEventLoop,而 SingleThreadEventLoop 又繼承于 SingleThreadEventExecutor。SingleThreadEventExecutor 實現了在 Netty 中對本地線程的抽象。 public abstract class SingleThreadEventExecutor extends ... { private volatile Thread thread; private final Queue 在 SingleThreadEventExecutor 中不但封裝了線程對象 Thread,而且還配置了一個任務隊列 taskQueue,用于其它線程向它來放置待處理的任務。 另外 NioEventLoopEventLoop 以 selector 的名義封裝了 epoll(在 Linux 操作系統下)。 在 NioEventLoop 對象內部,會有 selector 成員定義。這其實就是封裝的 epoll 而來的。我們來看具體的封裝過程。以及 selectedKeys,這是從 selector 上發現的待處理的事件列表。 public final class NioEventLoop extends SingleThreadEventLoop{ // selector private Selector selector; private Selector unwrappedSelector; // selector 上發現的各種待處理事件 private SelectedSelectionKeySet selectedKeys;} NioEventLoopGroup 在構造的時候,會調用 SelectorProvider#provider 來生成 provider,在默認情況下會調用 sun.nio.ch.DefaultSelectorProvider.create 來創建。 //file:java/nio/channels/spi/SelectorProvider.javapublic abstract class SelectorProvider { public static SelectorProvider provider() { // 1. java.nio.channels.spi.SelectorProvider 屬性指定實現類 // 2. SPI 指定實現類 ...... // 3. 默認實現,Windows 和 Linux 下不同 provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; }} 在 Linux 下,默認創建的 provider 使用的就是 epoll。 //file:sun/nio/ch/DefaultSelectorProvider.javapublic class DefaultSelectorProvider { public static SelectorProvider create() { String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); }}1.3 Channel Channel 是 JavaNIO 里的一個概念。大家把它理解成 socket,以及在 socket 之上的一系列操作方法的封裝就可以了。 Java 在 Channel 中把 connect、bind、read、write 等方法都以成員方法的形式給封裝起來了。 public interface Channel extends ... { Channel read(); Channel flush(); ...... interface Unsafe { void bind(SocketAddress localAddress, ...); void connect(SocketAddress remoteAddress, ...); void write(Object msg, ...); ...... }} 另外在 Java 中,習慣把 listen socket 叫做父 channel,客戶端握手請求到達以后創建出來的新連接叫做子 channel,方便區分。 在每個 Channel 對象的內部,除了封裝了 socket 以外,還都一個特殊的數據結構 DefaultChannelPipeline pipeline。在這個 pipeline 里是各種時機里注冊的 handler。 Channel 上的讀寫操作都會走到這個 DefaultChannelPipeline 中,當 channel 上完成 register、active、read、readComplete 等操作時,會觸發 pipeline 中的相應方法。 這個 ChannelPipeline 其實就是一個雙向鏈表,以及鏈表上的各式各樣的操作方法。 public interface ChannelPipeline { ChannelPipeline addFirst(String name, ChannelHandler handler); ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); ChannelPipeline addLast(String name, ChannelHandler handler); ChannelPipeline fireChannelRead(Object msg);}1.5 EchoServer 解讀 現在我們具備了對 Java、對 Netty 的初步理解以后,我們再會后來看一下開篇提到的 EchoServer 源碼。 public final class EchoServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer 在該代碼一開頭,bossGroup = new NioEventLoopGroup(1)這一行是創建了一個只有一個線程的線程池。workerGroup = new NioEventLoopGroup又創建了 worker 線程池,沒有指定數量,Netty 內部會根據當前機器的 CPU 核數來靈活決定。 ServerBootstrap 這是一個腳手架類,是為了讓我們寫起服務器程序來更方便一些。 b.group(bossGroup, workerGroup)這一行是將兩個線程池傳入,第一個作為 boss 只處理 accept 接收新的客戶端連接請求。第二個參數作為 worker 線程池,來處理連接上的請求接收、處理以及結果發送發送。 我們注意下 childHandler是傳入了一個 ChannelInitializer,這是當有新的客戶端連接到達時會回調的一個方法。在這個方法內部,我們給這個新的 chaneel 的 pipeline 上添加了一個處理器 serverHandler,以便收到數據的時候執行該處理器進行請求處理。 上面的幾個方法都是定義,在b.bind方法中真正開始啟動服務,創建父 channel(listen socket),創建 boss 線程。當有新連接到達的時候 boss 線程再創建子 channel,為其 pipeline 添加處理器,并啟動 worker 線程來進行處理。 簡言之 bootstrap.group() .channel() .childHandler() .childOption() 就是在構建 Netty Server 的各種參數。 ServerBootstrap 和其父類 AbstractBootstrap 內部分別定義了兩個 EventLoopGroup group 成員。父類 AbstractBootstrap 的 group 是用來處理 accpet 事件的,ServerBootstrap 下的 childGroup 用來處理其它所有的讀寫等事件。 group() 方法就是把 EventLoopGroup 參數設置到自己的成員上完事。其中如果調用 group() 只傳入了一個線程池,那么將來本服務下的所有事件都由這個線程池來處理。詳情查看飛哥精簡后的源碼。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 2.2 channel 設置 再看 ServerBootstrap#channel 方法 是用來定義一個工廠方法,將來需要創建 channel 的時候都調用該工廠進行創建。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 回頭看本文開頭 demo,.channel(NioServerSocketChannel.class)指的是將來需要創建 channel 的時候,創建 NioServerSocketChannel 這個類型的。 再看 option 方法,只是設置到了 options 成員中而已 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 本文 demo 設置了兩處 handler,一處是 handler,另一處是 childHandler。他們都是分別設置到自己的成員上就完事,看源碼。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends ...... { public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); } public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; }}三、Netty bootstrap 啟動服務 ServerBootstrap 下的 bind 方法是服務啟動過程中非常重要的一個方法。創建父 channel(listen socket),創建 boss 線程,為 boss 線程綁定 Acceptor 處理器,調用系統調用 bind 進行綁定和監聽都是在這里完成的。 先來直接看一下 bind 相關的入口源碼。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap ... { ......}//file:io/netty/bootstrap/AbstractBootstrap.javapublic abstract class AbstractBootstrap ... { public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(...); } private ChannelFuture doBind(final SocketAddress localAddress) { //創建父 channel、初始化并且注冊 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); ...... //如果 Register 已經完成,則直接 doBind0 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; //否則就注冊一個 listener(回調),等 register 完成的時候調用 } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } return promise; } } //創建 channel,對其初始化,并且 register(會創建 parent 線程) final ChannelFuture initAndRegister() { //3.1 創建父 channel(listen socket) channel = channelFactory.newChannel(); //3.2 對父 channel(listen socket)進行初始化 init(channel); //3.3 注冊并啟動 boss 線程 ChannelFuture regFuture = config().group().register(channel); ...... } //3.4 真正的bind private static void doBind0(...) { channel.eventLoop().execute(new Runnable() { @Override public void run() { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ...... } }); }} 在這個過程中,做了如下幾件重要的事情 接下來我們分開來看。 在 initAndRegister() 方法中創建 channel(socket),它調用了 channelFactory.newChannel()。 public abstract class AbstractBootstrap //創建 channel,對其初始化,并且 register(會創建 parent 線程) final ChannelFuture initAndRegister() { //3.1 創建 listen socket channel = channelFactory.newChannel(); ...... }} 回想下 2.2 節的channel 方法,返回的是一個反射 ReflectiveChannelFactory。沒錯這里的 newChannel 就是調用這個工廠方法來創建出來一個 NioServerSocketChannel 對象。 在 initAndRegister 創建除了 channel 之后,需要調用 init 對其進行初始化。 public abstract class AbstractBootstrap final ChannelFuture initAndRegister() { //3.1 創建父 channel(listen socket) //3.2 對父 channel(listen socket)進行初始化 init(channel); ...... }} 在 init() 中對 channel 進行初始化,一是給 options 和 attrs 賦值,二是構建了父 channel 的 pipeline。 //file:src/main/java/io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 在 setChannelOptions 中對 channel 的各種 option 進行設置?;貞浳覀冊谑褂?ServerBootstrap 時可以傳入 SO_BACKLOG,這就是其中的一個 option。在這里會真正設置到 channel(socket)上。 ServerBootstrap b = new ServerBootstrap();b.option(ChannelOption.SO_BACKLOG, 100) 在 init 中,稍微難理解一點是 p.addLast(new ChannelInitializer...)。這一段代碼只是給父 channel 添加一個 handler 而已。其真正的執行要等到 register 后,我們待會再看。 父 channel 在創建完,并且初始化之后,需要注冊到 boss 線程上才可用。 public abstract class AbstractBootstrap final ChannelFuture initAndRegister() { //3.1 創建父 channel(listen socket) //3.2 對父 channel(listen socket)進行初始化 //3.3 注冊并啟動 boss 線程 ChannelFuture regFuture = config().group().register(channel); ...... }} 其中 config().group() 最終會調用到 AbstractBootstrap#group,在這個方法里獲取的是我們傳入進來的 bossGroup。 public abstract class AbstractBootstrap volatile EventLoopGroup group; public final EventLoopGroup group() { return group; }} 其中 bossGroup 是一個 NioEventLoopGroup 實例,所以代碼會進入到 NioEventLoopGroup#register 方法。 public class NioEventLoopGroup extends MultithreadEventLoopGroup {}public abstract class MultithreadEventLoopGroup extends ... { @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public EventLoop next() { return (EventLoop) super.next(); }} 在 NioEventLoopGroup 里包含一個或多個 EventLoop。上面的 next 方法就是從中選擇一個出來,然后將 channel 注冊到其上。 對于本文來講,我們使用的是 NioEventLoopGroup,其內部包含的自然也就是 NioEventLoop,我們繼續查找其 register 方法。 public final class NioEventLoop extends SingleThreadEventLoop //在 eventloop 里注冊一個 channle(socket) public void register(final SelectableChannel ch, ...) { ...... register0(ch, interestOps, task); } //最終調用 channel 的 register private void register0(SelectableChannel ch, int interestOps, NioTask> task) { ch.register(unwrappedSelector, interestOps, task); }} 可見,NioEventLoop 的 register 最后又調用到 channel 的 register 上了。在我們本文中,我們創建的 channel 是 NioServerSocketChannel,我們就依照這條線索來查。 //file:src/main/java/io/netty/channel/AbstractChannel.javapublic abstract class AbstractChannel extends DefaultAttributeMap implements Channel { public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... //關聯自己到 eventLoop AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } ...... } }} 在 channel 的父類 AbstractChannel 中的 register 中,先是把自己關聯到傳入的 eventLoop 上。接著調用 inEventLoop 來判斷線程當前運行的線程是否是 EventExecutor的支撐線程,是則返回直接 register0。 一般來說,服務在啟動的時候都是主線程在運行。這個時候很可能 boss 線程還沒有啟動。所以如果發現當前不是 boss 線程的話,就調用 eventLoop.execute 來啟動 boss 線程。 NioEventLoop 的父類是 SingleThreadEventExecutor, 找到 execute 方法。 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { public void execute(Runnable task) { execute0(task); } private void execute0(@Schedule Runnable task) { execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }} 我們先來看 addTask(task),它是將 task 添加到任務隊列中。等待線程起來以后再運行。 public abstract class SingleThreadEventExecutor extends ... { private final Queue inEventLoop() 是判斷當前線程是不是自己綁定的線程,這時還在主線程中運行,所以 inEventLoop 為假,會進入 startThread 開始為 EventLoop 創建線程。 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { private void startThread() { doStartThread(); ...... } private void doStartThread() { executor.execute(new Runnable() { @Override public void run() { SingleThreadEventExecutor.this.run(); ...... } } } } 在 doStartThread 中調用 Java 線程管理工具 Executor 來啟動 boss 線程。 當線程起來以后就進入了自己的線程循環中了,會遍歷自己的任務隊列,然后開始處理自己的任務。 public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { if (!hasTasks()) { strategy = select(curDeadlineNanos); } //如果有任務的話就開始處理 runAllTasks(0); //任務處理完畢就調用 epoll_wait 等待事件發生 processSelectedKeys(); } }} 前面我們在 3.3 節看到 eventLoop.execute 把一個 Runnable 任務添加到了任務隊列里。當 EventLoop 線程啟動后,它會遍歷自己的任務隊列并開始處理。這時會進入到 AbstractChannel#register0 方法開始運行。 //file:src/main/java/io/netty/channel/AbstractChannel.javapublic abstract class AbstractChannel extends ... { public final void register(...) { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ...... } private void register0(ChannelPromise promise) { doRegister(); ...... }} 函數 doRegister 是在 AbstractNioChannel 類下。 //file:io/netty/channel/nio/AbstractNioChannel.javapublic abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected SelectableChannel javaChannel() { return ch; } public NioEventLoop eventLoop() { return (NioEventLoop) super.eventLoop(); } protected void doRegister() throws Exception { boolean selected = false; for (;;) { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); } }} 上面最關鍵的一句是selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);。這一句就相當于在 C 語言下調用 epoll_ctl 把 listen socket 添加到了 epoll 對象下。 其中 javaChannel 獲取父 channel,相當于 listen socket。unwrappedSelector 獲取 selector,相當于 epoll 對象。register 相當于使用 epoll_ctl 執行 add 操作。 當 channel 注冊完后,前面 init 時注冊的 ChannelInitializer 回調就會被執行。再回頭看它的 回調定義。 //file:src/main/java/io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 在 ChannelInitializer#initChannel 里,又給 boss 線程的 pipeline 里添加了一個任務。該任務是讓其在自己的 pipeline 上注冊一個 ServerBootstrapAcceptor handler。將來有新連接到達的時候,ServerBootstrapAcceptor 將會被執行。 再看 doBind0 方法,調用 channel.bind 完成綁定。 private static void doBind0(...) { channel.eventLoop().execute(new Runnable() { @Override public void run() { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ...... } }); }四、新連接到達 我們再回到 boss 線程的主循環中。 public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); //任務隊列都處理完就開始 select if (!hasTasks()) { strategy = select(curDeadlineNanos); } //處理各種事件 if (strategy > 0) { processSelectedKeys(); } } } private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } // Timeout will only be 0 if deadline is within 5 microsecs long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }} 假如線程任務隊列中的任務都處理干凈了的情況下,boss 線程會調用 select 來發現其 selector 上的各種事件。相當于 C 語言中的 epoll_wait。 當發現有事件發生的時候,例如 OP_WRITE、OP_ACCEPT、OP_READ 等的時候,會進入相應的處理 public final class NioEventLoop extends SingleThreadEventLoop { private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ...... if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } }} 對于服務端的 Unsafe.read() 這里會執行 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() 方法,它會調用 JDK 底層的 ServerSocketChannel.accept() 接收到客戶端的連接后,將其封裝成 Netty 的 NioSocketChannel,再通過 Pipeline 將 ChannelRead 事件傳播出去,這樣 ServerBootstrapAcceptor 就可以在 ChannelRead 回調里處理新的客戶端連接了。 我們直接看 ServerBootstrapAcceptor#ChannelRead。 //file:public class ServerBootstrap extends AbstractBootstrap 在 channelRead 先是獲取到了新創建出來的子 channel,并為其 pipeline 添加 childHandler?;仡^看 1.5 節,childHandler 是我們自定義的。 緊接著調用 childGroup.register(child) 將子 channel 注冊到 workerGroup 上。這個 register 過程和 3.3節、3.5節過程一樣。區別就是前面是父 channel 注冊到 bossGroup 上,這里是子 channel 注冊到 workerGroup上。 在 register 完成后,子 channel 被掛到了 workerGroup 其中一個線程上,相應的線程如果沒有創建也會被創建出來并進入到自己的線程循環中。 當子 channel 注冊完畢的時候,childHandler 中 ChannelInitializer#initChannel 會被執行 public final class EchoServer { public static void main(String[] args) throws Exception { ... ServerBootstrap b = new ServerBootstrap(); b.childHandler(new ChannelInitializer 在 initChannel 把子 channel 的處理類 serverHandler 添加上來了。Netty demo 中對這個處理類的定義非常的簡單,僅僅只是打印出來而已。 public class EchoServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(......) { ctx.write(msg); } ......}五、用戶請求到達 當 worker 線程起來以后,會進入線程循環(boss 線程和 worker 線程的 run 函數是一個)。在循環中會遍歷自己的任務隊列,如果沒有任務可處理,便 select 來觀察自己所負責的 channel 上是否有事件發生。 public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { if (!hasTasks()) { strategy = select(curDeadlineNanos); } //如果有任務的話就開始處理 runAllTasks(0); //任務處理完畢就調用 epoll_wait 等待事件發生 processSelectedKeys(); } } private int select(long deadlineNanos) throws IOException { selector.selectNow(); ...... }} worker 線程會調用 select 發現自己所管理的所有子 channel 上的可讀可寫事件。在發現有可讀事件后,會調用 processSelectedKeys,最后觸發 pipeline 使得 EchoServerHandler 方法開始執行。 public class EchoServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(......) { ctx.write(msg); } ......}六、總結 事實上,Netty 對網絡封裝的比較靈活。既支持單線程 Reactor,也支持多線程 Reactor、還支持主從多線程 Reactor。三種模型對應的用法如下: public static void main(String[] args) throws Exception { //單線程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(1); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); ...... //多線程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); ...... //主從多線程 Reactor EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); ......} 為了表述的更全面,本文飛哥選擇的是最為經典的 主從多線程 Reactor 模式。本文中所描述的內容可以用下面一幅圖來表示。 在 Netty 中的 boss 線程中負責對父 channel(listen socket)上事件的監聽和處理,當有新連接到達的時候,選擇一個 worker 線程把這個子 channel(連接 socket )叫給 worker 線程來處理。 其中 Worker 線程就是等待其管理的所有子 channel(連接 socket)上的事件的監聽和處理。當發現有事件發生的時候,回調用戶設置的 handler 進行處理。在本文的例子中,這個用戶 handler 就是 EchoServerHandler#channelRead。 至此,Netty 網絡模塊的工作核心原理咱們就介紹完了。飛哥一直“鼓吹”內功的好處。只要你具備了堅實的內功,各種語言里看似風牛馬不相及的東西,在底層實際上原理是想通的。我本人從來沒用 Java 開發過服務器程序,更沒碰過 Netty。但是當你多epoll有了深入理解的時候,再看Netty也能很容易看懂,很快就能理解它的核心。這就是鍛煉內功的好處!
X 關閉
X 關閉
- 1轉轉集團發布2022年二季度手機行情報告:二手市場“飄香”
- 2充電寶100Wh等于多少毫安?鐵路旅客禁止、限制攜帶和托運物品目錄
- 3好消息!京東與騰訊續簽三年戰略合作協議 加強技術創新與供應鏈服務
- 4名創優品擬通過香港IPO全球發售4100萬股 全球發售所得款項有什么用處?
- 5亞馬遜云科技成立量子網絡中心致力解決量子計算領域的挑戰
- 6京東綠色建材線上平臺上線 新增用戶70%來自下沉市場
- 7網紅淘品牌“七格格”chuu在北京又開一家店 潮人新寵chuu能紅多久
- 8市場競爭加劇,有車企因經營不善出現破產、退網、退市
- 9北京市市場監管局為企業紓困減負保護經濟韌性
- 10市場監管總局發布限制商品過度包裝標準和第1號修改單