首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

netty服务端起动源码分析-socket

2013-08-13 
netty服务端启动源码分析-socket//AbstractBootstrappublic ChannelFuture bind(int inetPort) {return bi

netty服务端启动源码分析-socket
//AbstractBootstrappublic ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort));}public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }

? ? ??validate()方法的作用为:校验:bossGroup、BootstrapChannelFactory、childHandler非空。如果childGroup为空,则复用bossGroup,将bossGroup赋值给childGroup。

?

//AbstractBootstrapprivate ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regPromise = initAndRegister(); final Channel channel = regPromise.channel(); final ChannelPromise promise = channel.newPromise(); if (regPromise.isDone()) { doBind0(regPromise, channel, localAddress, promise); } else { regPromise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(future, channel, localAddress, promise); } }); } return promise; }

? ? ?

? ? ? ? ?重点分析里面的initAndRegister()方法

//AbstractBootstrapfinal ChannelFuture initAndRegister() {        final Channel channel = channelFactory().newChannel();        try {            init(channel);        } catch (Throwable t) {            channel.unsafe().closeForcibly();            return channel.newFailedFuture(t);        }        ChannelPromise regPromise = channel.newPromise();        group().register(channel, regPromise);        if (regPromise.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        return regPromise;    }

? ? ?

a)首先分析以下代码:

final Channel channel = channelFactory().newChannel()

? ? ? channelFactory()方法返回之前创建的BootstrapChannelFactory,里面的newChannel()方法会根据反射创建一个ServerSocketChannel

//BootstrapChannelFactory       public T newChannel() {            try {                return clazz.newInstance();            } catch (Throwable t) {                throw new ChannelException("Unable to create Channel from class " + clazz, t);            }        }

? ? ? ??注:clazz是在服务端启动的这段代码(b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)中设置的。

// NioServerSocketChannelpublic NioServerSocketChannel() { super(null, newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());}private static ServerSocketChannel newSocket() { try { return ServerSocketChannel.open(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }

return ServerSocketChannel.open();

super(null, newSocket(), SelectionKey.OP_ACCEPT);

? ? ? ?它会对NioServerSocketChannel的父类进行初始化:NioServerSocketChannel的父类是AbstractNioMessageChannel,其构造方法仅仅初始化其父类AbstractNioChannel,父类构造方法如下:

//AbstractNioChannelprotected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        super(parent);        this.ch = ch;        this.readInterestOp = readInterestOp;        try {            ch.configureBlocking(false);        } catch (IOException e) {            try {                ch.close();            } catch (IOException e2) {                if (logger.isWarnEnabled()) {                    logger.warn(                            "Failed to close a partially initialized socket.", e2);                }            }            throw new ChannelException("Failed to enter non-blocking mode.", e);        }    }

? ? ? ?ch.configureBlocking(false)此处就将之前创建的ServerSocketChannel设置为非阻塞模式。

? ? ? ?

? ? ? ?该方法里还有三点需要注意:

? ? ? ?1、super(parent)会调用AbstractNioChannel的父类AbstractChannel的构造方法

// AbstractChannel.javaprotected AbstractChannel(Channel parent) {        this.parent = parent;        unsafe = newUnsafe();        pipeline = new DefaultChannelPipeline(this);    }

? ? ? ?newUnsafe()是由子类AbstractNioMessageChannel实现的,里面实例化了一个内部类NioMessageUnsafe(注:该类很重要,里面定义了read方法,会触发accept的调用,后面对其重点分析)。

// DefaultChannelPipelinepublic DefaultChannelPipeline(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; TailHandler tailHandler = new TailHandler(); tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); HeadHandler headHandler = new HeadHandler(channel.unsafe()); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head.next = tail; tail.prev = head; }

? ? ? ?DefaultChannelPipeline维护了一个以DefaultChannelHandlerContext为元素的双向链表结构,Head是一个Outbound处理器,而tail是一个Inbound处理器。经过此步骤后,管道中的处理器链表为:Head->tail。

?

b)再来分析以下代码

init(channel)

? ? ? ?该方法由子类ServerBootstrap实现

// ServerBootstrap.javavoid init(Channel channel) throws Exception {        final Map<ChannelOption<?>, Object> options = options();        synchronized (options) {            channel.config().setOptions(options);        }        final Map<AttributeKey<?>, Object> attrs = attrs();        synchronized (attrs) {            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {                @SuppressWarnings("unchecked")                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();                channel.attr(key).set(e.getValue());            }        }        ChannelPipeline p = channel.pipeline();        if (handler() != null) {            p.addLast(handler());        }        final EventLoopGroup currentChildGroup = childGroup;        final ChannelHandler currentChildHandler = childHandler;        final Entry<ChannelOption<?>, Object>[] currentChildOptions;        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;        synchronized (childOptions) {            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));        }        synchronized (childAttrs) {            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));        }        p.addLast(new ChannelInitializer<Channel>() {            @Override            public void initChannel(Channel ch) throws Exception {                ch.pipeline().addLast(new ServerBootstrapAcceptor(                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));            }        });    }

? ? ? ?该方法主要做了两件事:

group().register(channel, regPromise);

? ? ? ?实际是调用MultithreadEventLoopGroup的register方法

//MultithreadEventLoopGrouppublic ChannelFuture register(Channel channel, ChannelPromise promise) {        return next().register(channel, promise);    }

?? ?next方法从bossGroup中选择一个EventExecutor(它实际是一个SingleThreadEventLoop),然后执行register方法

//SingleThreadEventLooppublic ChannelFuture register(final Channel channel, final ChannelPromise promise) {        if (channel == null) {            throw new NullPointerException("channel");        }        if (promise == null) {            throw new NullPointerException("promise");        }        channel.unsafe().register(this, promise);        return promise;    }

? ? ?channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法

//AbstractUnsafepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {            if (eventLoop == null) {                throw new NullPointerException("eventLoop");            }            if (isRegistered()) {                promise.setFailure(new IllegalStateException("registered to an event loop already"));                return;            }            if (!isCompatible(eventLoop)) {                promise.setFailure(                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));                return;            }            AbstractChannel.this.eventLoop = eventLoop;            if (eventLoop.inEventLoop()) {                register0(promise);            } else {                try {                    eventLoop.execute(new Runnable() {                        @Override                        public void run() {                            register0(promise);                        }                    });                } catch (Throwable t) {                    closeForcibly();                    promise.setFailure(t);                }            }        }

? ? ? ??此处开启了eventloop中的线程(即启动了boss线程),并将register0任务加入到boss线程的队列中意:此时启动服务端的线程即main函数所在的线程正在等待回调AbstractBootstrap中的dobind0方法。

接着分析register0任务具体干了什么事情

//AbstractUnsafeprivate void register0(ChannelPromise promise) {            try {                // check if the channel is still open as it could be closed in the mean time when the register                // call was outside of the eventLoop                if (!ensureOpen(promise)) {                    return;                }                Runnable postRegisterTask = doRegister();                registered = true;                promise.setSuccess();                pipeline.fireChannelRegistered();                if (postRegisterTask != null) {                    postRegisterTask.run();                }                if (isActive()) {                    pipeline.fireChannelActive();                }            } catch (Throwable t) {                // Close the channel directly to avoid FD leak.                closeForcibly();                if (!promise.tryFailure(t)) {                    logger.warn(                            "Tried to fail the registration promise, but it is complete already. " +                                    "Swallowing the cause of the registration failure:", t);                }                closeFuture.setClosed();            }        }protected Runnable doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                selectionKey = javaChannel().register(eventLoop().selector, 0, this);                return null;            } catch (CancelledKeyException e) {                if (!selected) {                    eventLoop().selectNow();                    selected = true;                } else {                    throw e;                }            }        }    }

? ? ? ?看一下doRegister代码,看到这行代码了吧

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

? ? ? ?它会调用java.nio.channels.spi. AbstractSelectableChannel的register方法,?将ServerSocketChannel、0、以及this注册到selector中并得到对应的selectionkey。

? ? ? ?接着再看register0方法中的promise.setSuccess(),将promise设置为success,就会触发异步回调,回调之前main函数所在的线程中为ChannelPromise添加的listner,即AbstractBootstrap的以下代码:

regPromise.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    doBind0(future, channel, localAddress, promise);                }            });

? ? ? ??该listner所作的事情就是bind了,在下一篇文章再进行阐述。

?

?

? ? ? ? 总结:initAndRegister()方法主要做了以下几件事情:

? ? ? ? 1、 创建服务端监听套接字ServerSocketChannel

? ? ? ? 2、 设置监听套接字为非阻塞

? ? ? ? 3、 设置channel当前感兴趣的事件为SelectionKey.OP_ACCEPT(值为16)

? ? ? ? 4、 创建作用于ServerSocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。

? ? ? ? 5、 设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs。

? ? ? ? 6、 为NioServerSocketChannel对应的管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当某种事件发生时将ServerBootstrapAcceptor加入到管道中。

? ? ? ? 7、 启动了boss线程,并将register0任务加入到boss线程的队列中。而register0做的事情为:将ServerSocketChannel、0、注册到selector中并得到对应的selectionkey。然后触发绑定端口的操作,该内容在下一篇文章中分析。

热点排行