首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Netty4服务端起动源码分析-NioEventLoop实现的线程运行逻辑

2013-08-27 
Netty4服务端启动源码分析-NioEventLoop实现的线程运行逻辑在netty服务端启动源码分析-线程创建一文中已分

Netty4服务端启动源码分析-NioEventLoop实现的线程运行逻辑
在netty服务端启动源码分析-线程创建一文中已分析SingleThreadEventExecutor所持有的线程的运行逻辑由NioEventLoop实现,那么本文就着手分析NioEventLoop实现的线程运行逻辑:

?

// NioEventLoopprotected void run() {        for (;;) {            oldWakenUp = wakenUp.getAndSet(false);            try {                if (hasTasks()) {                    selectNow();                } else {                    select();                    if (wakenUp.get()) {                        selector.wakeup();                    }                }                cancelledKeys = 0;                final long ioStartTime = System.nanoTime();                needsToSelectAgain = false;                if (selectedKeys != null) {                    processSelectedKeysOptimized(selectedKeys.flip());                } else {                    processSelectedKeysPlain(selector.selectedKeys());                }                final long ioTime = System.nanoTime() - ioStartTime;                final int ioRatio = this.ioRatio;                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        break;                    }                }            } catch (Throwable t) {                logger.warn("Unexpected exception in the selector loop.", t);                // Prevent possible consecutive immediate failures that lead to                // excessive CPU consumption.                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    // Ignore.                }            }        }    }

?分析如下:

?

    ioEventLoop执行的任务分为两大类:IO任务和非IO任务。IO任务即selectionKey中ready的事件,譬如accept、connect、read、write等;非IO任务则为添加到taskQueue中的任务,譬如之前文章中分析到的register0、bind、channelActive等任务两类任务的执行先后顺序为:IO任务->非IO任务。IO任务由processSelectedKeysOptimized(selectedKeys.flip())或processSelectedKeysPlain(selector.selectedKeys())触发;非IO任务由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发两类任务的执行时间比由变量ioRatio控制,譬如:ioRatio=50(该值为默认值),则表示允许非IO任务执行的时间与IO任务的执行时间相等执行IO任务前,需要先进行select,以判断之前注册过的channel是否已经有感兴趣的事件ready如果任务队列中存在非IO任务,则执行非阻塞的selectNow()方法
    // NioEventLoop  void selectNow() throws IOException {        try {            selector.selectNow();        } finally {            // restore wakup state if needed            if (wakenUp.get()) {                selector.wakeup();            }        }    }
    ?否则,执行阻塞的select()方法
    // NioEventLoop   private void select() throws IOException {        Selector selector = this.selector;        try {            int selectCnt = 0;            long currentTimeNanos = System.nanoTime();            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);            for (;;) {                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;                if (timeoutMillis <= 0) {                    if (selectCnt == 0) {                        selector.selectNow();                        selectCnt = 1;                    }                    break;                }                int selectedKeys = selector.select(timeoutMillis);                selectCnt ++;                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {                    // Selected something,                    // waken up by user, or                    // the task queue has a pending task.                    break;                }                if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                    // The selector returned prematurely many times in a row.                    // Rebuild the selector to work around the problem.                    logger.warn(                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",                            selectCnt);                    rebuildSelector();                    selector = this.selector;                    // Select again to populate selectedKeys.                    selector.selectNow();                    selectCnt = 1;                    break;                }                currentTimeNanos = System.nanoTime();            }            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {                if (logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);                }            }        } catch (CancelledKeyException e) {            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);            }            // Harmless exception - log anyway        }    }
    ?下面分析阻塞的select方法:
首先执行delayNanos(currentTimeNanos):计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟执行的时间).注意:(每个SingleThreadEventExecutor都持有一个延迟执行任务的优先队列:final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>()),在启动线程的时候会往队列中加入一个任务)。最终的结果近似为:1秒钟-(当前时间-delayedTask创建的时间)。如果队列中没有任何任务,则默认返回1秒钟。
//SingleThreadEventExecutorprotected long delayNanos(long currentTimeNanos) {        ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();        if (delayedTask == null) {            return SCHEDULE_PURGE_INTERVAL;        }        return delayedTask.delayNanos(currentTimeNanos);}//ScheduledFutureTaskpublic long delayNanos(long currentTimeNanos) {        return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));    }public long deadlineNanos() {        return deadlineNanos;    }
    如果当前时间已经超过到期执行时间后的500000纳秒(这个数字是如何定的?),则说明被延迟执行的任务不能再延迟了:如果在进入这个方法后还没有执行过selectNow方法(由标记selectCnt是否为0来判断),则先执行非阻塞的selectNow方法,然后立即返回;否则,立即返回 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?如果当前时间没有超过到期执行时间后的500000L纳秒,则说明被延迟执行的任务还可以再延迟,所以可以让select的阻塞时间长一点(说不定多出的这点时间就能select到一个ready的IO任务),故执行阻塞的selector.select(timeoutMillis)方法如果已经存在ready的selectionKey,或者该selector被唤醒,或者此时非IO任务队列加入了新的任务,则立即返回否则,本次执行selector.select(timeoutMillis)方法后的结果selectedKeys肯定为0,如果连续返回0的select次数还没有超过SELECTOR_AUTO_REBUILD_THRESHOLD(默认值为512),则继续下一次for循环。注意,根据以下算法:long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L。随着currentTimeNanos的增大,在进入第二次for循环时,正常情况下(即:在没有selectionKey已ready的情况下,selector.select(timeoutMillis)确实阻塞了timeoutMillis毫秒才返回0)计算出的timeoutMillis肯定小于0,计算如下:? ? ? ?假设第一次和第二次进入for循环时的当前时间分currentTimeNanos1,currentTimeNanos2,由于在第一次循环中select阻塞了timeoutMillis1毫秒,所以currentTimeNanons2纳秒 > currentTimeNanos1纳秒+timeoutMillis1毫秒. ? ??那么,第二次的timeoutMillis2 = ?(selectDeadLineNanos – currentTimeNanos2 + 500000) / 1000000 < ?(selectDeadLineNanos – (currentTimeNanos1+timeoutMillis1*1000000)+ 500000) / 1000000 =

    timeoutMillis1- timeoutMillis1=0

    ? ? ?即:timeoutMillis2 < 0。因此第二次不会再进行select,直接跳出循环并返回

    ?

    否则,如果连续多次返回0,说明每次调用selector.select(timeoutMillis)后根本就没有阻塞timeoutMillis时间,而是立即就返回了,且结果为0.?这说明触发了epool cpu100%的bug(https://github.com/netty/netty/issues/327)。解决方案就是对selector重新rebuild
    public void rebuildSelector() {        if (!inEventLoop()) {            execute(new Runnable() {                @Override                public void run() {                    rebuildSelector();                }            });            return;        }        final Selector oldSelector = selector;        final Selector newSelector;        if (oldSelector == null) {            return;        }        try {            newSelector = openSelector();        } catch (Exception e) {            logger.warn("Failed to create a new Selector.", e);            return;        }        // Register all channels to the new Selector.        int nChannels = 0;        for (;;) {            try {                for (SelectionKey key: oldSelector.keys()) {                    Object a = key.attachment();                    try {                        if (key.channel().keyFor(newSelector) != null) {                            continue;                        }                        int interestOps = key.interestOps();                        key.cancel();                        key.channel().register(newSelector, interestOps, a);                        nChannels ++;                    } catch (Exception e) {                        logger.warn("Failed to re-register a Channel to the new Selector.", e);                        if (a instanceof AbstractNioChannel) {                            AbstractNioChannel ch = (AbstractNioChannel) a;                            ch.unsafe().close(ch.unsafe().voidPromise());                        } else {                            @SuppressWarnings("unchecked")                            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                            invokeChannelUnregistered(task, key, e);                        }                    }                }            } catch (ConcurrentModificationException e) {                // Probably due to concurrent modification of the key set.                continue;            }            break;        }        selector = newSelector;        try {            // time to close the old selector as everything else is registered to the new one            oldSelector.close();        } catch (Throwable t) {            if (logger.isWarnEnabled()) {                logger.warn("Failed to close the old Selector.", t);            }        }        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");    }
    ? ? ? Rebuild的本质:其实就是重新创建一个selector,然后将原来的那个selector中已注册的所有channel重新注册到新的selector中,并将老的selectionKey全部cancel掉,最后将的selector关闭。对selector进行rebuild之后,还需要重新调用selectNow方法,检查是否有已ready的selectionKey.// NioEventLoop private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }?
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {        // check if the set is empty and if so just return to not create garbage by        // creating a new Iterator every time even if there is nothing to process.        // See https://github.com/netty/netty/issues/597        if (selectedKeys.isEmpty()) {            return;        }        Iterator<SelectionKey> i = selectedKeys.iterator();        for (;;) {            final SelectionKey k = i.next();            final Object a = k.attachment();            i.remove();            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            if (!i.hasNext()) {                break;            }            if (needsToSelectAgain) {                selectAgain();                selectedKeys = selector.selectedKeys();                // Create the iterator again to avoid ConcurrentModificationException                if (selectedKeys.isEmpty()) {                    break;                } else {                    i = selectedKeys.iterator();                }            }        }    }
    ? ? ??此处仅分析processSelectedKeysOptimized方法,对于这两个方法的区别暂时放下,后续再分析吧。processSelectedKeysOptimized的执行逻辑基本上就是循环处理每个select出来的selectionKey,每个selectionKey的处理首先根据attachment的类型来进行分发处理发:如果类型为AbstractNioChannel,则执行一种逻辑;其他,则执行另外一种逻辑。此处,本文仅分析类型为AbstractNioChannel的处理逻辑,另一种逻辑的分析暂时放下,后续再分析。

    ? ?在判断attachment的类型前,首先需要弄清楚这个attatchment是何时关联到selectionKey上的?还记得socket一文中分析的register0任务吗??AbstractNioChannel类中有如下代码:

    selectionKey?=?javaChannel().register(eventLoop().selector,?0,?this);?

    ? ?此处将this(即AbstractNioChannel)作为attachment关联到selectionKey

    //NioEventLoopprivate static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } int readyOps = -1; try { readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { processWritable(ch); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { if (readyOps != -1 && (readyOps & SelectionKey.OP_WRITE) != 0) { unregisterWritableTasks(ch); } unsafe.close(unsafe.voidPromise()); } }? ? ? ?终于见到熟悉nio处理代码了,它根据selecionKey的readyOps的值进行分发,下一篇文章将分析readyOps为accept时的处理逻辑。关于final NioUnsafe unsafe = ch.unsafe(),还记得socket一文中分析的:NioUnsafe由AbstractChannel的子类AbstractNioMessageChannel实例化,其类型为NioMessageUnsafe,它里面定义了read方法,即readyOps为accept的处理逻辑。//NioEventLoopprotected boolean runAllTasks(long timeoutNanos) { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null) { return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }

    ?首先分析fetchFromDelayedQueue()方法,由父类SingleThreadEventExecutor实现

    // SingleThreadEventExecutorprivate void fetchFromDelayedQueue() {        long nanoTime = 0L;        for (;;) {            ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();            if (delayedTask == null) {                break;            }            if (nanoTime == 0L) {                nanoTime = ScheduledFutureTask.nanoTime();            }            if (delayedTask.deadlineNanos() <= nanoTime) {                delayedTaskQueue.remove();                taskQueue.add(delayedTask);            } else {                break;            }        }    }

    ? ? ? ?其功能是将延迟任务队列(delayedTaskQueue)中已经超过延迟执行时间的任务迁移到非IO任务队列(taskQueue)中.然后依次从taskQueue取出任务执行,每执行64个任务,就进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行

    ?

    ?

    总结:NioEventLoop实现的线程执行逻辑做了以下事情

      先后执行IO任务和非IO任务,两类任务的执行时间比由变量ioRatio控制,默认是非IO任务允许执行和IO任务相同的时间如果taskQueue存在非IO任务,或者delayedTaskQueue存在已经超时的任务,则执行非阻塞的selectNow()方法,否则执行阻塞的select(time)方法如果阻塞的select(time)方法立即返回0的次数超过某个值(默认为512次),说明触发了epoll的cpu 100% bug,通过对selector进行rebuild解决:即重新创建一个selector,然后将原来的selector中已注册的所有channel重新注册到新的selector中,并将老的selectionKey全部cancel掉,最后将老的selector关闭如果select的结果不为0,则依次处理每个ready的selectionKey,根据readyOps的值,进行不同的分发处理,譬如accept、read、write、connect等执行完IO任务后,再执行非IO任务,其中会将delayedTaskQueue已超时的任务加入到taskQueue中。每执行64个任务,就进行耗时检查,如果已执行时间超过通过ioRatio和之前执行IO任务的耗时计算出来的非IO任务预计执行时间,则停止执行剩下的非IO任务

    ?

热点排行