首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

netty源代码解析(二)——客户端流程

2012-10-26 
netty源代码解析(2)——客户端流程前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户

netty源代码解析(2)——客户端流程
前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下

ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() throws Exception {ChannelPipeline pipleline = pipeline();pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));pipleline.addLast("handler", handler);return pipleline;}});bootstrap.setOption("receiveBufferSize", 1048576 * 64);bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法//tcp定期发送心跳包 比如IM里边定期探测对方是否下线//只有tcp长连接下才有意义//bootstrap.setOption("child.keepAlive", true);ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));Channel channel = future.awaitUninterruptibly().getChannel();

客户端事件处理顺序如下:
UpStream.ChannelState.OPEN(已经open)–>DownStream.ChannelState.BOUND(需要绑定)——>DownStream.CONNECTED(需要连接)—–>UpStream.ChannelState.BOUND(已经绑定)——->UpStream.CONNECTED(连接成功)

在connect的时候做了如下处理
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {        if (remoteAddress == null) {            throw new NullPointerException("remoteAddress");        }        ChannelPipeline pipeline;        try {            pipeline = getPipelineFactory().getPipeline();        } catch (Exception e) {            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);        }        // Set the options.先创建Channel        Channel ch = getFactory().newChannel(pipeline);        ch.getConfig().setOptions(getOptions());        // Bind.        if (localAddress != null) {            ch.bind(localAddress);        }        // Connect. 再进行连接        return ch.connect(remoteAddress);    }

首先要创建出Channel
NioClientSocketChannel(            ChannelFactory factory, ChannelPipeline pipeline,            ChannelSink sink, NioWorker worker) {        super(null, factory, pipeline, sink, newSocket(), worker);        fireChannelOpen(this);    }

紧接着会fire一个ChannelOpen事件,
if (channel.getParent() != null) {            fireChildChannelStateChanged(channel.getParent(), channel);        }        channel.getPipeline().sendUpstream(                new UpstreamChannelStateEvent(                        channel, ChannelState.OPEN, Boolean.TRUE));

这样会出发Upstream的ChannelState.OPEN事件。

接下来要继续connect了
  if (remoteAddress == null) {            throw new NullPointerException("remoteAddress");        }        ChannelFuture future = future(channel, true);        channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(                channel, future, ChannelState.CONNECTED, remoteAddress));        return future;

这样就会出发Downstream的ChannelState.CONNECTED事件。
接下来就要由NioClientSocketPipelineSink来进行处理了
switch (state) {            case OPEN:                if (Boolean.FALSE.equals(value)) {                    channel.worker.close(channel, future);                }                break;            case BOUND:                if (value != null) {                    bind(channel, future, (SocketAddress) value);                } else {                    channel.worker.close(channel, future);                }                break;            case CONNECTED:                if (value != null) {                    connect(channel, future, (SocketAddress) value);                } else {                    channel.worker.close(channel, future);                }                break;            case INTEREST_OPS:                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());                break;

下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的
所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行select
private static final class RegisterTask implements Runnable {        private final Boss boss;        private final NioClientSocketChannel channel;        RegisterTask(Boss boss, NioClientSocketChannel channel) {            this.boss = boss;            this.channel = channel;        }        public void run() {            try {                channel.socket.register(                        boss.selector, SelectionKey.OP_CONNECT, channel);            } catch (ClosedChannelException e) {                channel.worker.close(channel, succeededFuture(channel));            }            int connectTimeout = channel.getConfig().getConnectTimeoutMillis();            if (connectTimeout > 0) {                channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;            }        }    }

register方法
void register(NioClientSocketChannel channel) {            Runnable registerTask = new RegisterTask(this, channel);            Selector selector;            synchronized (startStopLock) {                if (!started) {                    // Open a selector if this worker didn't start yet.                    try {                        this.selector = selector =  Selector.open();                    } catch (Throwable t) {                        throw new ChannelException(                                "Failed to create a selector.", t);                    }                    // Start the worker thread with the new Selector.                    boolean success = false;                    try {                        DeadLockProofWorker.start(                                bossExecutor,                                new ThreadRenamingRunnable(                                        this, "New I/O client boss #" + id + '-' + subId));                        success = true;                    } finally {                        if (!success) {                            // Release the Selector if the execution fails.                            try {                                selector.close();                            } catch (Throwable t) {                                logger.warn("Failed to close a selector.", t);                            }                            this.selector = selector = null;                            // The method will return to the caller at this point.                        }                    }                } else {                    // Use the existing selector if this worker has been started.                    selector = this.selector;                }                assert selector != null && selector.isOpen();                started = true;                boolean offered = registerTaskQueue.offer(registerTask);                assert offered;            }

RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。

然后run方法处理感兴趣的事件
public void run() {            boolean shutdown = false;            Selector selector = this.selector;            long lastConnectTimeoutCheckTimeNanos = System.nanoTime();            for (;;) {                wakenUp.set(false);                try {                    int selectedKeyCount = selector.select(500);                    .......            processRegisterTaskQueue();                    if (selectedKeyCount > 0) {                        processSelectedKeys(selector.selectedKeys());                    }

在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {            for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {                SelectionKey k = i.next();                i.remove();                if (!k.isValid()) {                    close(k);                    continue;                }                if (k.isConnectable()) {                    connect(k);                }            }        }

将连接上的Channel注册到worker中,交给worker去注册read和write
private void connect(SelectionKey k) {            NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();            try {                if (ch.socket.finishConnect()) {                    k.cancel();                    ch.worker.register(ch, ch.connectFuture);                }            } catch (Throwable t) {                ch.connectFuture.setFailure(t);                fireExceptionCaught(ch, t);                k.cancel(); // Some JDK implementations run into an infinite loop without this.                ch.worker.close(ch, succeededFuture(ch));            }        }

在这一系列初始化都完成之后,channel就可以拿来write和接收read数据了。

热点排行