博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty框架学习之路(五)—— EventLoop及事件循环机制
阅读量:4223 次
发布时间:2019-05-26

本文共 10929 字,大约阅读时间需要 36 分钟。

在前面的博文中,我们大致分析了解了Channel及其相关概念。在Netty的线程模型中,每个channel都有唯一的一个eventLoop与之相绑定,那么在这篇博文中我们来看一下EvenLoop及其相关概念。

在传统的Java NIO编程中,我们经常使用到如下代码:

public static void main(String[] args) {        try {            //创建选择器            Selector selector = Selector.open();            //打开通道            ServerSocketChannel channel = ServerSocketChannel.open();            //开启非阻塞模式            channel.configureBlocking(false);            //服务端socket监听指定端口            channel.socket().bind(new InetSocketAddress(port), 1024);            // 将 channel 注册到 selector 中,            // 通常我们都是先注册一个 OP_ACCEPT 事件,             // 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中。            channel.register(selector, SelectionKey.OP_ACCEPT);            while (true){                // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作                selector.select(500);                // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.                Set
keys = selector.selectedKeys(); Iterator
it = keys.iterator(); while (it.hasNext()){ SelectionKey key = it.next(); // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理。 it.remove(); try { if(key.isAcceptable()) { //处理新的请求 三次握手 建立连接 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中. sc.register(selector, SelectionKey.OP_READ); } ……………… }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } }catch (IOException e){ e.printStackTrace(); } }

上述操作中的第一步通过Selector.open() 打开一个 Selector,我们以NioServerSocketChannel为例,当创建NioServerSocketChannel时,Netty通过反射调用NioServerSocketChannel的无参数构造方法(具体过程后面专门介绍):

channel = this.channelFactory.newChannel();

NioSocketChannel的无参数构造方法如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() {    this(DEFAULT_SELECTOR_PROVIDER);}public NioServerSocketChannel(SelectorProvider provider) {    this(newSocket(provider));}private static NioServerSocketChannel newSocket(SelectorProvider provider) {    try {        //调用 SelectorProvider.openSocketChannel() 来打开一个新的 Java NIO SocketChannel:        return provider.openSocketChannel();    } catch (IOException var2) {        throw new ChannelException("Failed to open a socket.", var2);    }}

第二步 将 Channel 注册到 Selector 中, 并设置需要监听的事件。在channel的注册过程中(具体过程后面专门介绍),会调用AbstractUnsafe.register0方法:

private void register0(ChannelPromise promise) {    ……    boolean firstRegistration = neverRegistered;    doRegister();    neverRegistered = false;    registered = true;    safeSetSuccess(promise);    pipeline.fireChannelRegistered();    // Only fire a channelActive if the channel has never been registered. This prevents firing    // multiple channel actives if the channel is deregistered and re-registered.    if (firstRegistration && isActive()) {        pipeline.fireChannelActive();    }}

register0 又调用了 AbstractNioChannel.doRegister方法:

protected void doRegister() throws Exception {    // 省略错误处理    selectionKey = javaChannel().register(eventLoop().selector, 0, this);}

此处的参数0说明仅仅将 Channel 注册到 Selector 中, 但是不设置interest set。那到底在哪里设置的呢?其实在NioServerSocketChannel的构造方法中:

public NioServerSocketChannel(ServerSocketChannel channel) {    //表示关注OP_ACCEPT事件    super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}

第一、二步都完成了,那么第三步循环部分在哪呢?事实上 NioEventLoop 本身就是一个 SingleThreadEventExecutor,因此 NioEventLoop 的启动 其实就是 NioEventLoop 所绑定的本地 Java 线程的启动。在SingleThreadEventExecutor.doStartThread方法中创建线程并调用SingleThreadEventExecutor.this.run()方法,而run方法为抽象方法,具体实现在NioEventLoop的run方法中。

protected void run() {        for (;;) {            try {                //通过hasTasks方法判断当前taskQueue是否为空                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        select(wakenUp.getAndSet(false));                        if (wakenUp.get()) {                            selector.wakeup();                        }                    default:                        // fallthrough                }                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                if (ioRatio == 100) {                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    }                     ……                }           }    }    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;    }    private final IntSupplier selectNowSupplier = new IntSupplier() {        @Override        public int get() throws Exception {            return selectNow();        }    };

此处for(;;) 所构成的死循环构成了NioEventLoop事件循环的核心。这里有两个方法需要注意,selector.selectNow()会检查当前是否有就绪的 IO 事件,如果有,则返回就绪 IO 事件的个数,如果没有,则返回0。selectNow() 是立即返回的,不会阻塞当前线程;selector.select(timeoutMillis)会阻塞住当前线程的,timeoutMillis 是阻塞的超时时间。

代码中有个名为ioRatio的属性,它表示的是此线程分配给 IO 操作所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间)。计算公式:

设 IO 操作耗时为 ioTime, ioTime 占的时间比例为 ioRatio, 则:    ioTime / ioRatio = taskTime / taskRatio    taskRatio = 100 - ioRatio    => taskTime = ioTime * (100 - ioRatio) / ioRatio

再来看IO处理过程,即processSelectedKeys方法,

private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }

这个方法中会根据 selectedKeys 字段是否为空,而分别调用 processSelectedKeysOptimized 或 processSelectedKeysPlain。 其实两者没有太大的区别,此处以 processSelectedKeysOptimized 为例分析一下工作流程。

private void processSelectedKeysOptimized() {        for (int i = 0; i < selectedKeys.size; ++i) {            final SelectionKey k = selectedKeys.keys[i];            // null out entry in the array to allow to have it GC'ed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.keys[i] = null;            final Object a = k.attachment();            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask
task = (NioTask
) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }

代码中k.attachment()返回值是什么呢?其实我们可以猜测一下应该是附着在SelectionKey的事物,联想到在selector上注册channel时候指定了SelectionKey,可以想到返回值其实就是channel自身。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        ……        try {            int readyOps = k.readyOps();            //OP_CONNECT事件            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking                // See https://github.com/netty/netty/issues/924                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);                unsafe.finishConnect();            }            //OP_WRITE事件            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write                ch.unsafe().forceFlush();            }            //OP_READ事件            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }

OP_WRITE 可写事件比较简单,没有详细分析的必要了。这里写代码片

OP_READ事件处理过程有点长,具体可以看一下read方法:

public final void read() {            final ChannelConfig config = config();            final ChannelPipeline pipeline = pipeline();            final ByteBufAllocator allocator = config.getAllocator();            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();            allocHandle.reset(config);            ByteBuf byteBuf = null;            boolean close = false;            try {                do {                    byteBuf = allocHandle.allocate(allocator);                    allocHandle.lastBytesRead(doReadBytes(byteBuf));                    if (allocHandle.lastBytesRead() <= 0) {                        // nothing was read. release the buffer.                        byteBuf.release();                        byteBuf = null;                        close = allocHandle.lastBytesRead() < 0;                        break;                    }                    allocHandle.incMessagesRead(1);                    readPending = false;                    pipeline.fireChannelRead(byteBuf);                    byteBuf = null;                } while (allocHandle.continueReading());                allocHandle.readComplete();                pipeline.fireChannelReadComplete();                if (close) {                    closeOnRead(pipeline);                }            } catch (Throwable t) {                handleReadException(pipeline, byteBuf, t, close, allocHandle);            } finally {                ……            }        }

归纳一下大概做了三件事情:分配 ByteBuf;从 SocketChannel 中读取数据;调用 pipeline.fireChannelRead 发送一个 inbound 事件。如果了解过channel相关内容,产生inbound事件之后便是channelPipeline的事了,具体如何处理请翻阅之前的博文。

OP_CONNECT 事件处理过程:将 OP_CONNECT 从就绪事件集中清除;调用 unsafe.finishConnect() 通知上层连接已建立。

unsafe.finishConnect方法最后会调用到 pipeline().fireChannelActive(),产生一个 inbound 事件,通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)。

到了这里, 我们整个 NioEventLoop 的 IO 操作部分已经了解完了

转载地址:http://ppgmi.baihongyu.com/

你可能感兴趣的文章
隔离太无聊?每天一个数据科学项目,数据集都准备好了!
查看>>
干货 | 国家信息中心杜平谈关于数字化的几点思考
查看>>
收藏!机器学习算法优缺点综述
查看>>
新冠肺炎数据里学到的四个数据分析和机器学习知识
查看>>
11个问题助你彻底搞懂工业互联网
查看>>
疫情之下,职场人自降薪资期待求自保 | 2020Q1报告
查看>>
线上讲座丨罗杰:前沿——NISQ时代下的工程超导量子计算机
查看>>
如何评估一项技术是否值得长期投入?
查看>>
洞悉2020年数据团队建设,我们和清华、领英一起搞了个大事情,你也可以参与!...
查看>>
独家 | 教你使用torchlayers 来构建PyTorch 模型(附链接)
查看>>
如何在远程会议的时候静音吃薯片?微软团队用AI去除视频噪声
查看>>
刷B站的年轻人,到底在刷什么?
查看>>
独家 | 简单三步实现Python脚本超参数调优(附代码)
查看>>
一位中国博士把整个 CNN 都给可视化了,可交互有细节,每次卷积 ReLU 池化都清清楚楚...
查看>>
教你动手推导Self-Attention!(附代码)
查看>>
机器学习丨15个最流行的GitHub机器学习项目
查看>>
数据=新生产要素,数据安全之墙如何建?
查看>>
2020年全国信息安全标准化技术委员会大数据安全标准特别工作组全体会议即将召开...
查看>>
50位全球专家畅谈人工智能治理进程——结伴前行,合作共赢(附下载)
查看>>
独家 | 强化学习必知二要素——计算效率和样本效率
查看>>