首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

netty源代码解析(一)——服务端流程

2012-10-31 
netty源代码解析(1)——服务端流程今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。nett

netty源代码解析(1)——服务端流程
今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。
netty服务端启动代码如下

ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {ChannelPipeline pipleline = pipeline();//默认最大传输帧大小为16Mpipleline.addLast("encode", new ObjectEncoder(1048576 * 16));pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));pipleline.addLast("handler", handler);return pipleline;}});//设置缓冲区为64Mbootstrap.setOption("receiveBufferSize", 1048576 * 64);bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法//tcp定期发送心跳包 比如IM里边定期探测对方是否下线//只有tcp长连接下才有意义//bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(port));

服务端事件处理顺序如下:
UpStream.ChannelState.OPEN—–>DownStream.ChannelState.BOUND(需要绑定)
——–>UpStream.ChannelState.BOUND(已经绑定)——>DownStream.CONNECTED(需要连接)——->UpStream.CONNECTED(连接成功)

在bind的时候做了如下处理
public Channel bind(final SocketAddress localAddress) {        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        final BlockingQueue<ChannelFuture> futureQueue =            new LinkedBlockingQueue<ChannelFuture>();        ChannelHandler binder = new Binder(localAddress, futureQueue);        ChannelHandler parentHandler = getParentHandler();

这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。
接着
ChannelPipeline bossPipeline = pipeline();        bossPipeline.addLast("binder", binder);        if (parentHandler != null) {            bossPipeline.addLast("userHandler", parentHandler);        }        Channel channel = getFactory().newChannel(bossPipeline);

这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用
fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码
 @Override        public void channelOpen(                ChannelHandlerContext ctx,                ChannelStateEvent evt) {            try {                evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());                // Split options into two categories: parent and child.                Map<String, Object> allOptions = getOptions();                Map<String, Object> parentOptions = new HashMap<String, Object>();                for (Entry<String, Object> e: allOptions.entrySet()) {                    if (e.getKey().startsWith("child.")) {                        childOptions.put(                                e.getKey().substring(6),                                e.getValue());                    } else if (!e.getKey().equals("pipelineFactory")) {                        parentOptions.put(e.getKey(), e.getValue());                    }                }                // Apply parent options.                evt.getChannel().getConfig().setOptions(parentOptions);            } finally {                ctx.sendUpstream(evt);            }            boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress)            assert finished;        }

bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。
 public void sendDownstream(ChannelEvent e) {        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);        if (tail == null) {            try {                getSink().eventSunk(this, e);                return;            } catch (Throwable t) {                notifyHandlerException(e, t);                return;            }        }        sendDownstream(tail, e);    }

接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。
 public void eventSunk(            ChannelPipeline pipeline, ChannelEvent e) throws Exception {        Channel channel = e.getChannel();        if (channel instanceof NioServerSocketChannel) {            handleServerSocket(e);        } else if (channel instanceof NioSocketChannel) {            handleAcceptedSocket(e);        }    }

nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。

看下ServerSocketChannel的处理
private void handleServerSocket(ChannelEvent e) {        if (!(e instanceof ChannelStateEvent)) {            return;        }        ChannelStateEvent event = (ChannelStateEvent) e;        NioServerSocketChannel channel =            (NioServerSocketChannel) event.getChannel();        ChannelFuture future = event.getFuture();        ChannelState state = event.getState();        Object value = event.getValue();        switch (state) {        case OPEN:            if (Boolean.FALSE.equals(value)) {                close(channel, future);            }            break;        case BOUND:            if (value != null) {                bind(channel, future, (SocketAddress) value);            } else {                close(channel, future);            }            break;        }    }

主要是处理bind事件,
private void bind(            NioServerSocketChannel channel, ChannelFuture future,            SocketAddress localAddress) {        boolean bound = false;        boolean bossStarted = false;        try {            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());            bound = true;            future.setSuccess();            fireChannelBound(channel, channel.getLocalAddress());            //取出一个boss线程,然后交给Boss类去处理。            Executor bossExecutor =                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;            DeadLockProofWorker.start(                    bossExecutor,                    new ThreadRenamingRunnable(                            new Boss(channel),                            "New I/O server boss #" + id + " (" + channel + ')'));            bossStarted = true;        } catch (Throwable t) {            future.setFailure(t);            fireExceptionCaught(channel, t);        } finally {            if (!bossStarted && bound) {                close(channel, future);            }        }    }

看下Boss类,它实现了Runnable接口
private final Selector selector;        private final NioServerSocketChannel channel;        Boss(NioServerSocketChannel channel) throws IOException {            this.channel = channel;            selector = Selector.open();            boolean registered = false;            try {                channel.socket.register(selector, SelectionKey.OP_ACCEPT);                registered = true;            } finally {                if (!registered) {                    closeSelector();                }            }            channel.selector = selector;

代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。

再看下Boss类的run方法
public void run() {            final Thread currentThread = Thread.currentThread();            channel.shutdownLock.lock();            try {                for (;;) {                    try {                        if (selector.select(1000) > 0) {                            selector.selectedKeys().clear();                        }                        SocketChannel acceptedSocket = channel.socket.accept();                        if (acceptedSocket != null) {                            registerAcceptedChannel(acceptedSocket, currentThread);                        }                    } catch (SocketTimeoutException e) {                        // Thrown every second to get ClosedChannelException                        // raised.                    } catch (CancelledKeyException e) {                        // Raised by accept() when the server socket was closed.                    } catch (ClosedSelectorException e) {                        // Raised by accept() when the server socket was closed.                    } catch (ClosedChannelException e) {                        // Closed as requested.                        break;                    } catch (Throwable e) {                        logger.warn(                                "Failed to accept a connection.", e);                        try {                            Thread.sleep(1000);                        } catch (InterruptedException e1) {                            // Ignore                        }                    }                }            } finally {                channel.shutdownLock.unlock();                closeSelector();            }        }

这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {            try {                ChannelPipeline pipeline =                    channel.getConfig().getPipelineFactory().getPipeline();                NioWorker worker = nextWorker(); //获取一个NioWorker                //将Channel注册到NioWorker上去                worker.register(new NioAcceptedSocketChannel(                        channel.getFactory(), pipeline, channel,                        NioServerSocketPipelineSink.this, acceptedSocket,                        worker, currentThread), null);            } catch (Exception e) {                logger.warn(                        "Failed to initialize an accepted socket.", e);                try {                    acceptedSocket.close();                } catch (IOException e2) {                    logger.warn(                            "Failed to close a partially accepted socket.",                            e2);                }            }        }

当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。

热点排行