netty服务端启动源码分析-socket
//AbstractBootstrappublic ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort));}public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
? ? ??validate()方法的作用为:校验:bossGroup、BootstrapChannelFactory、childHandler非空。如果childGroup为空,则复用bossGroup,将bossGroup赋值给childGroup。
?
//AbstractBootstrapprivate ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regPromise = initAndRegister(); final Channel channel = regPromise.channel(); final ChannelPromise promise = channel.newPromise(); if (regPromise.isDone()) { doBind0(regPromise, channel, localAddress, promise); } else { regPromise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(future, channel, localAddress, promise); } }); } return promise; }
? ? ?
? ? ? ? ?重点分析里面的initAndRegister()方法
//AbstractBootstrapfinal ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regPromise = channel.newPromise(); group().register(channel, regPromise); if (regPromise.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regPromise; }
? ? ?
a)首先分析以下代码:
final Channel channel = channelFactory().newChannel()
? ? ? channelFactory()方法返回之前创建的BootstrapChannelFactory,里面的newChannel()方法会根据反射创建一个ServerSocketChannel
//BootstrapChannelFactory public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
? ? ? ??注:clazz是在服务端启动的这段代码(b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)中设置的。
// NioServerSocketChannelpublic NioServerSocketChannel() { super(null, newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());}private static ServerSocketChannel newSocket() { try { return ServerSocketChannel.open(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
return ServerSocketChannel.open();
super(null, newSocket(), SelectionKey.OP_ACCEPT);
? ? ? ?它会对NioServerSocketChannel的父类进行初始化:NioServerSocketChannel的父类是AbstractNioMessageChannel,其构造方法仅仅初始化其父类AbstractNioChannel,父类构造方法如下:
//AbstractNioChannelprotected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
? ? ? ?ch.configureBlocking(false)此处就将之前创建的ServerSocketChannel设置为非阻塞模式。
? ? ? ?
? ? ? ?该方法里还有三点需要注意:
? ? ? ?1、super(parent)会调用AbstractNioChannel的父类AbstractChannel的构造方法
// AbstractChannel.javaprotected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
? ? ? ?newUnsafe()是由子类AbstractNioMessageChannel实现的,里面实例化了一个内部类NioMessageUnsafe(注:该类很重要,里面定义了read方法,会触发accept的调用,后面对其重点分析)。
// DefaultChannelPipelinepublic DefaultChannelPipeline(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; TailHandler tailHandler = new TailHandler(); tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); HeadHandler headHandler = new HeadHandler(channel.unsafe()); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head.next = tail; tail.prev = head; }
? ? ? ?DefaultChannelPipeline维护了一个以DefaultChannelHandlerContext为元素的双向链表结构,Head是一个Outbound处理器,而tail是一个Inbound处理器。经过此步骤后,管道中的处理器链表为:Head->tail。
?
b)再来分析以下代码
init(channel)
? ? ? ?该方法由子类ServerBootstrap实现
// ServerBootstrap.javavoid init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
? ? ? ?该方法主要做了两件事:
group().register(channel, regPromise);
? ? ? ?实际是调用MultithreadEventLoopGroup的register方法
//MultithreadEventLoopGrouppublic ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); }
?? ?next方法从bossGroup中选择一个EventExecutor(它实际是一个SingleThreadEventLoop),然后执行register方法
//SingleThreadEventLooppublic ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
? ? ?channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法
//AbstractUnsafepublic final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { closeForcibly(); promise.setFailure(t); } } }
? ? ? ??此处开启了eventloop中的线程(即启动了boss线程),并将register0任务加入到boss线程的队列中意:此时启动服务端的线程即main函数所在的线程正在等待回调AbstractBootstrap中的dobind0方法。
接着分析register0任务具体干了什么事情
//AbstractUnsafeprivate void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } Runnable postRegisterTask = doRegister(); registered = true; promise.setSuccess(); pipeline.fireChannelRegistered(); if (postRegisterTask != null) { postRegisterTask.run(); } if (isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } closeFuture.setClosed(); } }protected Runnable doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return null; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
? ? ? ?看一下doRegister代码,看到这行代码了吧
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
? ? ? ?它会调用java.nio.channels.spi. AbstractSelectableChannel的register方法,?将ServerSocketChannel、0、以及this注册到selector中并得到对应的selectionkey。
? ? ? ?接着再看register0方法中的promise.setSuccess(),将promise设置为success,就会触发异步回调,回调之前main函数所在的线程中为ChannelPromise添加的listner,即AbstractBootstrap的以下代码:
regPromise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(future, channel, localAddress, promise); } });
? ? ? ??该listner所作的事情就是bind了,在下一篇文章再进行阐述。
?
?
? ? ? ? 总结:initAndRegister()方法主要做了以下几件事情:
? ? ? ? 1、 创建服务端监听套接字ServerSocketChannel
? ? ? ? 2、 设置监听套接字为非阻塞
? ? ? ? 3、 设置channel当前感兴趣的事件为SelectionKey.OP_ACCEPT(值为16)
? ? ? ? 4、 创建作用于ServerSocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。
? ? ? ? 5、 设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs。
? ? ? ? 6、 为NioServerSocketChannel对应的管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当某种事件发生时将ServerBootstrapAcceptor加入到管道中。
? ? ? ? 7、 启动了boss线程,并将register0任务加入到boss线程的队列中。而register0做的事情为:将ServerSocketChannel、0、注册到selector中并得到对应的selectionkey。然后触发绑定端口的操作,该内容在下一篇文章中分析。