本文共 10929 字,大约阅读时间需要 36 分钟。
在前面的博文中,我们大致分析了解了Channel及其相关概念。在Netty的线程模型中,每个channel都有唯一的一个eventLoop与之相绑定,那么在这篇博文中我们来看一下EvenLoop及其相关概念。
在传统的Java NIO编程中,我们经常使用到如下代码:
public static void main(String[] args) { try { //创建选择器 Selector selector = Selector.open(); //打开通道 ServerSocketChannel channel = ServerSocketChannel.open(); //开启非阻塞模式 channel.configureBlocking(false); //服务端socket监听指定端口 channel.socket().bind(new InetSocketAddress(port), 1024); // 将 channel 注册到 selector 中, // 通常我们都是先注册一个 OP_ACCEPT 事件, // 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中。 channel.register(selector, SelectionKey.OP_ACCEPT); while (true){ // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作 selector.select(500); // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪. Setkeys = selector.selectedKeys(); Iterator it = keys.iterator(); while (it.hasNext()){ SelectionKey key = it.next(); // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理。 it.remove(); try { if(key.isAcceptable()) { //处理新的请求 三次握手 建立连接 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中. sc.register(selector, SelectionKey.OP_READ); } ……………… }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } }catch (IOException e){ e.printStackTrace(); } }
上述操作中的第一步通过Selector.open() 打开一个 Selector,我们以NioServerSocketChannel为例,当创建NioServerSocketChannel时,Netty通过反射调用NioServerSocketChannel的无参数构造方法(具体过程后面专门介绍):
channel = this.channelFactory.newChannel();
NioSocketChannel的无参数构造方法如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER);}public NioServerSocketChannel(SelectorProvider provider) { this(newSocket(provider));}private static NioServerSocketChannel newSocket(SelectorProvider provider) { try { //调用 SelectorProvider.openSocketChannel() 来打开一个新的 Java NIO SocketChannel: return provider.openSocketChannel(); } catch (IOException var2) { throw new ChannelException("Failed to open a socket.", var2); }}
第二步 将 Channel 注册到 Selector 中, 并设置需要监听的事件。在channel的注册过程中(具体过程后面专门介绍),会调用AbstractUnsafe.register0方法:
private void register0(ChannelPromise promise) { …… boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); }}
register0 又调用了 AbstractNioChannel.doRegister方法:
protected void doRegister() throws Exception { // 省略错误处理 selectionKey = javaChannel().register(eventLoop().selector, 0, this);}
此处的参数0说明仅仅将 Channel 注册到 Selector 中, 但是不设置interest set。那到底在哪里设置的呢?其实在NioServerSocketChannel的构造方法中:
public NioServerSocketChannel(ServerSocketChannel channel) { //表示关注OP_ACCEPT事件 super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
第一、二步都完成了,那么第三步循环部分在哪呢?事实上 NioEventLoop 本身就是一个 SingleThreadEventExecutor,因此 NioEventLoop 的启动 其实就是 NioEventLoop 所绑定的本地 Java 线程的启动。在SingleThreadEventExecutor.doStartThread方法中创建线程并调用SingleThreadEventExecutor.this.run()方法,而run方法为抽象方法,具体实现在NioEventLoop的run方法中。
protected void run() { for (;;) { try { //通过hasTasks方法判断当前taskQueue是否为空 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } …… } } } public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; } private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } };
此处for(;;) 所构成的死循环构成了NioEventLoop事件循环的核心。这里有两个方法需要注意,selector.selectNow()会检查当前是否有就绪的 IO 事件,如果有,则返回就绪 IO 事件的个数,如果没有,则返回0。selectNow() 是立即返回的,不会阻塞当前线程;selector.select(timeoutMillis)会阻塞住当前线程的,timeoutMillis 是阻塞的超时时间。
代码中有个名为ioRatio的属性,它表示的是此线程分配给 IO 操作所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间)。计算公式:
设 IO 操作耗时为 ioTime, ioTime 占的时间比例为 ioRatio, 则: ioTime / ioRatio = taskTime / taskRatio taskRatio = 100 - ioRatio => taskTime = ioTime * (100 - ioRatio) / ioRatio
再来看IO处理过程,即processSelectedKeys方法,
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
这个方法中会根据 selectedKeys 字段是否为空,而分别调用 processSelectedKeysOptimized 或 processSelectedKeysPlain。 其实两者没有太大的区别,此处以 processSelectedKeysOptimized 为例分析一下工作流程。
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTasktask = (NioTask ) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
代码中k.attachment()返回值是什么呢?其实我们可以猜测一下应该是附着在SelectionKey的事物,联想到在selector上注册channel时候指定了SelectionKey,可以想到返回值其实就是channel自身。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { …… try { int readyOps = k.readyOps(); //OP_CONNECT事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //OP_WRITE事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } //OP_READ事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
OP_WRITE 可写事件比较简单,没有详细分析的必要了。这里写代码片
public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { …… } }
归纳一下大概做了三件事情:分配 ByteBuf;从 SocketChannel 中读取数据;调用 pipeline.fireChannelRead 发送一个 inbound 事件。如果了解过channel相关内容,产生inbound事件之后便是channelPipeline的事了,具体如何处理请翻阅之前的博文。
OP_CONNECT 事件处理过程:将 OP_CONNECT 从就绪事件集中清除;调用 unsafe.finishConnect() 通知上层连接已建立。
unsafe.finishConnect方法最后会调用到 pipeline().fireChannelActive(),产生一个 inbound 事件,通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)。
到了这里, 我们整个 NioEventLoop 的 IO 操作部分已经了解完了
转载地址:http://ppgmi.baihongyu.com/