Netty4服务端启动源码分析-NioEventLoop实现的线程运行逻辑
在netty服务端启动源码分析-线程创建一文中已分析SingleThreadEventExecutor所持有的线程的运行逻辑由NioEventLoop实现,那么本文就着手分析NioEventLoop实现的线程运行逻辑:
?
// NioEventLoopprotected void run() { for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
?分析如下:
?
// NioEventLoop void selectNow() throws IOException { try { selector.selectNow(); } finally { // restore wakup state if needed if (wakenUp.get()) { selector.wakeup(); } } }?否则,执行阻塞的select()方法
// NioEventLoop private void select() throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) { // Selected something, // waken up by user, or // the task queue has a pending task. break; } if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = System.nanoTime(); } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } }?下面分析阻塞的select方法:
//SingleThreadEventExecutorprotected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek(); if (delayedTask == null) { return SCHEDULE_PURGE_INTERVAL; } return delayedTask.delayNanos(currentTimeNanos);}//ScheduledFutureTaskpublic long delayNanos(long currentTimeNanos) { return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME)); }public long deadlineNanos() { return deadlineNanos; }
timeoutMillis1- timeoutMillis1=0
? ? ?即:timeoutMillis2 < 0。因此第二次不会再进行select,直接跳出循环并返回
?
否则,如果连续多次返回0,说明每次调用selector.select(timeoutMillis)后根本就没有阻塞timeoutMillis时间,而是立即就返回了,且结果为0.?这说明触发了epool cpu100%的bug(https://github.com/netty/netty/issues/327)。解决方案就是对selector重新rebuildpublic void rebuildSelector() { if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (;;) { try { for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); key.channel().register(newSelector, interestOps, a); nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } } catch (ConcurrentModificationException e) { // Probably due to concurrent modification of the key set. continue; } break; } selector = newSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }? ? ? Rebuild的本质:其实就是重新创建一个selector,然后将原来的那个selector中已注册的所有channel重新注册到新的selector中,并将老的selectionKey全部cancel掉,最后将的selector关闭。对selector进行rebuild之后,还需要重新调用selectNow方法,检查是否有已ready的selectionKey.// NioEventLoop private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }?
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }? ? ??此处仅分析processSelectedKeysOptimized方法,对于这两个方法的区别暂时放下,后续再分析吧。processSelectedKeysOptimized的执行逻辑基本上就是循环处理每个select出来的selectionKey,每个selectionKey的处理首先根据attachment的类型来进行分发处理发:如果类型为AbstractNioChannel,则执行一种逻辑;其他,则执行另外一种逻辑。此处,本文仅分析类型为AbstractNioChannel的处理逻辑,另一种逻辑的分析暂时放下,后续再分析。
? ?在判断attachment的类型前,首先需要弄清楚这个attatchment是何时关联到selectionKey上的?还记得socket一文中分析的register0任务吗??AbstractNioChannel类中有如下代码:
selectionKey?=?javaChannel().register(eventLoop().selector,?0,?this);?
? ?此处将this(即AbstractNioChannel)作为attachment关联到selectionKey
//NioEventLoopprivate static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } int readyOps = -1; try { readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { processWritable(ch); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { if (readyOps != -1 && (readyOps & SelectionKey.OP_WRITE) != 0) { unregisterWritableTasks(ch); } unsafe.close(unsafe.voidPromise()); } }? ? ? ?终于见到熟悉nio处理代码了,它根据selecionKey的readyOps的值进行分发,下一篇文章将分析readyOps为accept时的处理逻辑。关于final NioUnsafe unsafe = ch.unsafe(),还记得socket一文中分析的:NioUnsafe由AbstractChannel的子类AbstractNioMessageChannel实例化,其类型为NioMessageUnsafe,它里面定义了read方法,即readyOps为accept的处理逻辑。//NioEventLoopprotected boolean runAllTasks(long timeoutNanos) { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null) { return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }?首先分析fetchFromDelayedQueue()方法,由父类SingleThreadEventExecutor实现
// SingleThreadEventExecutorprivate void fetchFromDelayedQueue() { long nanoTime = 0L; for (;;) { ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek(); if (delayedTask == null) { break; } if (nanoTime == 0L) { nanoTime = ScheduledFutureTask.nanoTime(); } if (delayedTask.deadlineNanos() <= nanoTime) { delayedTaskQueue.remove(); taskQueue.add(delayedTask); } else { break; } } }
? ? ? ?其功能是将延迟任务队列(delayedTaskQueue)中已经超过延迟执行时间的任务迁移到非IO任务队列(taskQueue)中.然后依次从taskQueue取出任务执行,每执行64个任务,就进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行
?
?
总结:NioEventLoop实现的线程执行逻辑做了以下事情
?