Netty源码学习-ServerBootstrap启动及事件处理过程
Netty是采用了Reactor模式的多线程版本,建议先看下面这篇文章了解一下Reactor模式:
http://bylijinnan.iteye.com/blog/1992325
Netty的启动及事件处理的流程,基本上是按照上面这篇文章来走的
文章里面提到的操作,每一步都能在Netty里面找到对应的代码
其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;
而Reactor里面的Handler就对应Netty里面各ChannelHandler(在worker里面跑)
由于流程涉及到比较多的类和方法,我提取一下Netty的骨架:
ServerBootstrap.bind(localAddress)|-->newServerSocketChannel & fireChannelOpen(得到ServerSocketChannel[server])-->Binder.channelOpen|-->Channels.bind(that is : sendDownstream of ChannelState.BOUND)-->In DefaultChannelPipeline, No downstreamHandler, jump to NioServerSocketPipelineSink.bind(关键)|-->1.do the REAL java.net.ServerSocket.bind(server绑定端口) 2.start bossThread in bossExecutor 3.do "accept & dispatch" in a endless loop of bossThread(得到SocketChannel[client]) |--> registerAcceptedChannel, start worker in workerPool |-->worker.run |-->processSelectedKeys(selector.selectedKeys()) |--> read & fireMessageReceived(开始调用各Handler)
public class ServerBootstrap extends Bootstrap {public Channel bind(final SocketAddress localAddress) {final BlockingQueue<ChannelFuture> futureQueue =new LinkedBlockingQueue<ChannelFuture>();ChannelHandler binder = new Binder(localAddress, futureQueue);ChannelPipeline bossPipeline = Channels.pipeline();bossPipeline.addLast("binder", binder);/*===OPEN===NioServerSocketChannelFactory.newChannel返回一个NioServerSocketChannel在NioServerSocketChannel的构造函数里,调用ServerSocketChannel.open()并触发channelOpen事件这个事件由上面的“binder”来处理并返回Future(非阻塞),详见Binder最后将Future放入futureQueue,以便在接下来的while循环里面取*/Channel channel = getFactory().newChannel(bossPipeline);// Wait until the future is available.ChannelFuture future = null;boolean interrupted = false;do {try {future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);} catch (InterruptedException e) {interrupted = true;}} while (future == null);//处理中断的一种方式,详见《Java并发编程实践》if (interrupted) {Thread.currentThread().interrupt();}// Wait for the future.future.awaitUninterruptibly();return channel;}//主要是处理channelOpen事件private final class Binder extends SimpleChannelUpstreamHandler {private final SocketAddress localAddress;private final BlockingQueue<ChannelFuture> futureQueue;private final Map<String, Object> childOptions =new HashMap<String, Object>();Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {this.localAddress = localAddress;this.futureQueue = futureQueue;}public void channelOpen(ChannelHandlerContext ctx,ChannelStateEvent evt) {try {//处理各种option,例如keep alive,nodelay等等,省略代码} finally {ctx.sendUpstream(evt);}/*重点在这里这里bind方法只是触发sendDownstream(ChannelState.BOUND)而此时pipeline里面还没有ChannelDownstreamHandler(只有一个handler:“binder”):public void sendDownstream(ChannelEvent e) {DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);if (tail == null) {try {getSink().eventSunk(this, e);return;} }sendDownstream(tail, e);}因此ChannelState.BOUND会去到pipeline里面的sink,在sink里面执行最终的java.net.ServerSocket.bind操作详见NioServerSocketPipelineSink.bind*/boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));assert finished;}}}
class NioServerSocketPipelineSink extends AbstractNioChannelSink { private void bind( NioServerSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { try {//在这里执行真正的===BIND=== channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound = true; Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;//java.net.ServerSocket.bind完成,接下来可以accept了,详见Boss类的run方法//===BOSS start===,放入线程池里跑(bossExecutor) DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(new Boss(channel), "New I/O server boss #" + id + " (" + channel + ')')); bossStarted = true; } }private final class Boss implements Runnable { private final Selector selector; private final NioServerSocketChannel channel;/*===REGISTER[server]===注意到每新建一个Boss,就会新建一个selector*/ Boss(NioServerSocketChannel channel) throws IOException { this.channel = channel; selector = Selector.open();channel.socket.register(selector, SelectionKey.OP_ACCEPT);registered = true; channel.selector = selector; }/*===ACCEPT&DISPATCH===boss不断地接受Client的连接并将连接成功的SocketChannel交由worker处理*/ public void run() { for (;;) { SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket == null) { break; }//把acceptedSocket交由worker处理 registerAcceptedChannel(acceptedSocket, currentThread); } }/*这里面的worker(implements Runnable)就相当于“Reactor Pattern”里面“Handler”handler需要两方面信息:selector和acceptedSocket,其中后者已经传递过来了,而selector则在worker.register里新创建一个*/private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline();//从WorkerPool里面取:workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]//可见worker是re-used的 NioWorker worker = nextWorker();/*值得注意的是new NioAcceptedSocketChannel(...)包含了一个关键操作:将pipeline与channel关联起来,一对一;见AbstractChannel类:protected AbstractChannel(Channel parent, ChannelFactory factory,ChannelPipeline pipeline, ChannelSink sink) {this.parent = parent;this.factory = factory;this.pipeline = pipeline;id = allocateId(this);pipeline.attach(this, sink);}*/ worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); } }}
void register(AbstractNioChannel<?> channel, ChannelFuture future) {//只是创建Runnable,未启动。在worker的run方法中,processRegisterTaskQueue时候才执行 Runnable registerTask = createRegisterTask(channel, future);//在start()里面启动worker线程 Selector selector = start(); boolean offered = registerTaskQueue.offer(registerTask); assert offered; if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } private Selector start() { synchronized (startStopLock) { if (!started) { selector = Selector.open(); //===WORKER start=== DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); } } return selector; }private final class RegisterTask implements Runnable { private final NioSocketChannel channel; private final ChannelFuture future; private final boolean server; public void run() { try { synchronized (channel.interestOpsLock) {//===REGISTER[client]===初始的state(getRawInterestOps)是OP_READ channel.channel.register(selector, channel.getRawInterestOps(), channel); } fireChannelConnected(channel, remoteAddress);} } }
public void run() { for (;;) {//===SELECT=== SelectorUtil.select(selector); processRegisterTaskQueue(); processEventQueue(); processWriteTaskQueue();//在这里面,就会遍历selectedKey并调用相关联的handler processSelectedKeys(selector.selectedKeys()); }}private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey k = i.next();i.remove();int readyOps = k.readyOps();//下面的“与”操作等价于k.isReadableif ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {//执行读操作if (!read(k)) {continue;}}//执行写操作if ((readyOps & SelectionKey.OP_WRITE) != 0) {writeFromSelectorLoop(k);}}}/*主要是两个操作:1.从channel里面读取数据2.读取完成后,fireMessageReceived,从channel(k.attachment) 可以得到与它关联的pipeline,从而触发pipeline里面的handler*/protected boolean read(SelectionKey k) {final SocketChannel ch = (SocketChannel) k.channel();final NioSocketChannel channel = (NioSocketChannel) k.attachment();final ReceiveBufferSizePredictor predictor =channel.getConfig().getReceiveBufferSizePredictor();final int predictedRecvBufSize = predictor.nextReceiveBufferSize();int ret = 0;int readBytes = 0;boolean failure = true;ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);try {while ((ret = ch.read(bb)) > 0) {readBytes += ret;if (!bb.hasRemaining()) {break;}}failure = false;} if (readBytes > 0) {bb.flip();final ChannelBufferFactory bufferFactory =channel.getConfig().getBufferFactory();final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);buffer.setBytes(0, bb);buffer.writerIndex(readBytes);recvBufferPool.release(bb);// Update the predictor.predictor.previousReceiveBufferSize(readBytes);// Fire the event.fireMessageReceived(channel, buffer);} return true;}