首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

Netty源码学习-ServerBootstrap起步及事件处理过程

2013-12-20 
Netty源码学习-ServerBootstrap启动及事件处理过程Netty是采用了Reactor模式的多线程版本,建议先看下面这

Netty源码学习-ServerBootstrap启动及事件处理过程
Netty是采用了Reactor模式的多线程版本,建议先看下面这篇文章了解一下Reactor模式:
http://bylijinnan.iteye.com/blog/1992325

Netty的启动及事件处理的流程,基本上是按照上面这篇文章来走的
文章里面提到的操作,每一步都能在Netty里面找到对应的代码
其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;
而Reactor里面的Handler就对应Netty里面各ChannelHandler(在worker里面跑)

由于流程涉及到比较多的类和方法,我提取一下Netty的骨架:

ServerBootstrap.bind(localAddress)|-->newServerSocketChannel & fireChannelOpen(得到ServerSocketChannel[server])-->Binder.channelOpen|-->Channels.bind(that is : sendDownstream of ChannelState.BOUND)-->In DefaultChannelPipeline, No downstreamHandler, jump to NioServerSocketPipelineSink.bind(关键)|-->1.do the REAL java.net.ServerSocket.bind(server绑定端口)  2.start bossThread in bossExecutor  3.do "accept & dispatch" in a endless loop of bossThread(得到SocketChannel[client]) |--> registerAcceptedChannel, start worker in workerPool   |-->worker.run |-->processSelectedKeys(selector.selectedKeys())   |--> read & fireMessageReceived(开始调用各Handler)


下面就对照上面的“骨架”,把关键的代码拿出来读一下
其中关键的步骤,我用“===[关键步骤]===”的形式标记出来了

Netty的Server端是从ServerBootstrap.bind方法开始的:
public class ServerBootstrap extends Bootstrap {public Channel bind(final SocketAddress localAddress) {final BlockingQueue<ChannelFuture> futureQueue =new LinkedBlockingQueue<ChannelFuture>();ChannelHandler binder = new Binder(localAddress, futureQueue);ChannelPipeline bossPipeline = Channels.pipeline();bossPipeline.addLast("binder", binder);/*===OPEN===NioServerSocketChannelFactory.newChannel返回一个NioServerSocketChannel在NioServerSocketChannel的构造函数里,调用ServerSocketChannel.open()并触发channelOpen事件这个事件由上面的“binder”来处理并返回Future(非阻塞),详见Binder最后将Future放入futureQueue,以便在接下来的while循环里面取*/Channel channel = getFactory().newChannel(bossPipeline);// Wait until the future is available.ChannelFuture future = null;boolean interrupted = false;do {try {future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);} catch (InterruptedException e) {interrupted = true;}} while (future == null);//处理中断的一种方式,详见《Java并发编程实践》if (interrupted) {Thread.currentThread().interrupt();}// Wait for the future.future.awaitUninterruptibly();return channel;}//主要是处理channelOpen事件private final class Binder extends SimpleChannelUpstreamHandler {private final SocketAddress localAddress;private final BlockingQueue<ChannelFuture> futureQueue;private final Map<String, Object> childOptions =new HashMap<String, Object>();Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {this.localAddress = localAddress;this.futureQueue = futureQueue;}public void channelOpen(ChannelHandlerContext ctx,ChannelStateEvent evt) {try {//处理各种option,例如keep alive,nodelay等等,省略代码} finally {ctx.sendUpstream(evt);}/*重点在这里这里bind方法只是触发sendDownstream(ChannelState.BOUND)而此时pipeline里面还没有ChannelDownstreamHandler(只有一个handler:“binder”):public void sendDownstream(ChannelEvent e) {DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);if (tail == null) {try {getSink().eventSunk(this, e);return;} }sendDownstream(tail, e);}因此ChannelState.BOUND会去到pipeline里面的sink,在sink里面执行最终的java.net.ServerSocket.bind操作详见NioServerSocketPipelineSink.bind*/boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));assert finished;}}}


NioServerSocketPipelineSink:

class NioServerSocketPipelineSink extends AbstractNioChannelSink { private void bind(            NioServerSocketChannel channel, ChannelFuture future,            SocketAddress localAddress) {        try {//在这里执行真正的===BIND===            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());            bound = true;            Executor bossExecutor =                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;//java.net.ServerSocket.bind完成,接下来可以accept了,详见Boss类的run方法//===BOSS start===,放入线程池里跑(bossExecutor)            DeadLockProofWorker.start(bossExecutor,                    new ThreadRenamingRunnable(new Boss(channel),                            "New I/O server boss #" + id + " (" + channel + ')'));            bossStarted = true;        }     }private final class Boss implements Runnable {        private final Selector selector;        private final NioServerSocketChannel channel;/*===REGISTER[server]===注意到每新建一个Boss,就会新建一个selector*/        Boss(NioServerSocketChannel channel) throws IOException {            this.channel = channel;            selector = Selector.open();channel.socket.register(selector, SelectionKey.OP_ACCEPT);registered = true;            channel.selector = selector;        }/*===ACCEPT&DISPATCH===boss不断地接受Client的连接并将连接成功的SocketChannel交由worker处理*/        public void run() {                        for (;;) {                            SocketChannel acceptedSocket = channel.socket.accept();                            if (acceptedSocket == null) {                                break;                            }//把acceptedSocket交由worker处理                            registerAcceptedChannel(acceptedSocket, currentThread);                        }        }/*这里面的worker(implements Runnable)就相当于“Reactor Pattern”里面“Handler”handler需要两方面信息:selector和acceptedSocket,其中后者已经传递过来了,而selector则在worker.register里新创建一个*/private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {                ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline();//从WorkerPool里面取:workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]//可见worker是re-used的                NioWorker worker = nextWorker();/*值得注意的是new NioAcceptedSocketChannel(...)包含了一个关键操作:将pipeline与channel关联起来,一对一;见AbstractChannel类:protected AbstractChannel(Channel parent, ChannelFactory factory,ChannelPipeline pipeline, ChannelSink sink) {this.parent = parent;this.factory = factory;this.pipeline = pipeline;id = allocateId(this);pipeline.attach(this, sink);}*/                worker.register(new NioAcceptedSocketChannel(                        channel.getFactory(), pipeline, channel,                        NioServerSocketPipelineSink.this, acceptedSocket,                        worker, currentThread), null);        }    }}


worker.register,主要工作是创建registerTask(implements Runnable)并放入registerTaskQueue
对应的类是NioWorker 和AbstractNioWorker:
void register(AbstractNioChannel<?> channel, ChannelFuture future) {//只是创建Runnable,未启动。在worker的run方法中,processRegisterTaskQueue时候才执行        Runnable registerTask = createRegisterTask(channel, future);//在start()里面启动worker线程        Selector selector = start();        boolean offered = registerTaskQueue.offer(registerTask);        assert offered;        if (wakenUp.compareAndSet(false, true)) {            selector.wakeup();        }    } private Selector start() {        synchronized (startStopLock) {            if (!started) {                 selector = Selector.open(); //===WORKER start===                DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O  worker #" + id));            }        }        return selector;    }private final class RegisterTask implements Runnable {        private final NioSocketChannel channel;        private final ChannelFuture future;        private final boolean server;        public void run() {            try {                synchronized (channel.interestOpsLock) {//===REGISTER[client]===初始的state(getRawInterestOps)是OP_READ                    channel.channel.register(selector, channel.getRawInterestOps(), channel);                }                fireChannelConnected(channel, remoteAddress);}        }    }


worker线程的run操作:

 public void run() {        for (;;) {//===SELECT===                SelectorUtil.select(selector);                processRegisterTaskQueue();                processEventQueue();                processWriteTaskQueue();//在这里面,就会遍历selectedKey并调用相关联的handler                processSelectedKeys(selector.selectedKeys());        }}private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey k = i.next();i.remove();int readyOps = k.readyOps();//下面的“与”操作等价于k.isReadableif ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {//执行读操作if (!read(k)) {continue;}}//执行写操作if ((readyOps & SelectionKey.OP_WRITE) != 0) {writeFromSelectorLoop(k);}}}/*主要是两个操作:1.从channel里面读取数据2.读取完成后,fireMessageReceived,从channel(k.attachment) 可以得到与它关联的pipeline,从而触发pipeline里面的handler*/protected boolean read(SelectionKey k) {final SocketChannel ch = (SocketChannel) k.channel();final NioSocketChannel channel = (NioSocketChannel) k.attachment();final ReceiveBufferSizePredictor predictor =channel.getConfig().getReceiveBufferSizePredictor();final int predictedRecvBufSize = predictor.nextReceiveBufferSize();int ret = 0;int readBytes = 0;boolean failure = true;ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);try {while ((ret = ch.read(bb)) > 0) {readBytes += ret;if (!bb.hasRemaining()) {break;}}failure = false;} if (readBytes > 0) {bb.flip();final ChannelBufferFactory bufferFactory =channel.getConfig().getBufferFactory();final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);buffer.setBytes(0, bb);buffer.writerIndex(readBytes);recvBufferPool.release(bb);// Update the predictor.predictor.previousReceiveBufferSize(readBytes);// Fire the event.fireMessageReceived(channel, buffer);} return true;} 


热点排行