netty源代码解析(2)——客户端流程
前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下
ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() throws Exception {ChannelPipeline pipleline = pipeline();pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));pipleline.addLast("handler", handler);return pipleline;}});bootstrap.setOption("receiveBufferSize", 1048576 * 64);bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法//tcp定期发送心跳包 比如IM里边定期探测对方是否下线//只有tcp长连接下才有意义//bootstrap.setOption("child.keepAlive", true);ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));Channel channel = future.awaitUninterruptibly().getChannel();
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } ChannelPipeline pipeline; try { pipeline = getPipelineFactory().getPipeline(); } catch (Exception e) { throw new ChannelPipelineException("Failed to initialize a pipeline.", e); } // Set the options.先创建Channel Channel ch = getFactory().newChannel(pipeline); ch.getConfig().setOptions(getOptions()); // Bind. if (localAddress != null) { ch.bind(localAddress); } // Connect. 再进行连接 return ch.connect(remoteAddress); }
NioClientSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { super(null, factory, pipeline, sink, newSocket(), worker); fireChannelOpen(this); }
if (channel.getParent() != null) { fireChildChannelStateChanged(channel.getParent(), channel); } channel.getPipeline().sendUpstream( new UpstreamChannelStateEvent( channel, ChannelState.OPEN, Boolean.TRUE));
if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } ChannelFuture future = future(channel, true); channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent( channel, future, ChannelState.CONNECTED, remoteAddress)); return future;
switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { channel.worker.close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { channel.worker.close(channel, future); } break; case CONNECTED: if (value != null) { connect(channel, future, (SocketAddress) value); } else { channel.worker.close(channel, future); } break; case INTEREST_OPS: channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break;
private static final class RegisterTask implements Runnable { private final Boss boss; private final NioClientSocketChannel channel; RegisterTask(Boss boss, NioClientSocketChannel channel) { this.boss = boss; this.channel = channel; } public void run() { try { channel.socket.register( boss.selector, SelectionKey.OP_CONNECT, channel); } catch (ClosedChannelException e) { channel.worker.close(channel, succeededFuture(channel)); } int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); if (connectTimeout > 0) { channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; } } }
void register(NioClientSocketChannel channel) { Runnable registerTask = new RegisterTask(this, channel); Selector selector; synchronized (startStopLock) { if (!started) { // Open a selector if this worker didn't start yet. try { this.selector = selector = Selector.open(); } catch (Throwable t) { throw new ChannelException( "Failed to create a selector.", t); } // Start the worker thread with the new Selector. boolean success = false; try { DeadLockProofWorker.start( bossExecutor, new ThreadRenamingRunnable( this, "New I/O client boss #" + id + '-' + subId)); success = true; } finally { if (!success) { // Release the Selector if the execution fails. try { selector.close(); } catch (Throwable t) { logger.warn("Failed to close a selector.", t); } this.selector = selector = null; // The method will return to the caller at this point. } } } else { // Use the existing selector if this worker has been started. selector = this.selector; } assert selector != null && selector.isOpen(); started = true; boolean offered = registerTaskQueue.offer(registerTask); assert offered; }
public void run() { boolean shutdown = false; Selector selector = this.selector; long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); for (;;) { wakenUp.set(false); try { int selectedKeyCount = selector.select(500); ....... processRegisterTaskQueue(); if (selectedKeyCount > 0) { processSelectedKeys(selector.selectedKeys()); }
private void processSelectedKeys(Set<SelectionKey> selectedKeys) { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); if (!k.isValid()) { close(k); continue; } if (k.isConnectable()) { connect(k); } } }
private void connect(SelectionKey k) { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); try { if (ch.socket.finishConnect()) { k.cancel(); ch.worker.register(ch, ch.connectFuture); } } catch (Throwable t) { ch.connectFuture.setFailure(t); fireExceptionCaught(ch, t); k.cancel(); // Some JDK implementations run into an infinite loop without this. ch.worker.close(ch, succeededFuture(ch)); } }