netty源代码解析(1)——服务端流程
今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。
netty服务端启动代码如下
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {ChannelPipeline pipleline = pipeline();//默认最大传输帧大小为16Mpipleline.addLast("encode", new ObjectEncoder(1048576 * 16));pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));pipleline.addLast("handler", handler);return pipleline;}});//设置缓冲区为64Mbootstrap.setOption("receiveBufferSize", 1048576 * 64);bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法//tcp定期发送心跳包 比如IM里边定期探测对方是否下线//只有tcp长连接下才有意义//bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(port));
public Channel bind(final SocketAddress localAddress) { if (localAddress == null) { throw new NullPointerException("localAddress"); } final BlockingQueue<ChannelFuture> futureQueue = new LinkedBlockingQueue<ChannelFuture>(); ChannelHandler binder = new Binder(localAddress, futureQueue); ChannelHandler parentHandler = getParentHandler();
ChannelPipeline bossPipeline = pipeline(); bossPipeline.addLast("binder", binder); if (parentHandler != null) { bossPipeline.addLast("userHandler", parentHandler); } Channel channel = getFactory().newChannel(bossPipeline);
@Override public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent evt) { try { evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); // Split options into two categories: parent and child. Map<String, Object> allOptions = getOptions(); Map<String, Object> parentOptions = new HashMap<String, Object>(); for (Entry<String, Object> e: allOptions.entrySet()) { if (e.getKey().startsWith("child.")) { childOptions.put( e.getKey().substring(6), e.getValue()); } else if (!e.getKey().equals("pipelineFactory")) { parentOptions.put(e.getKey(), e.getValue()); } } // Apply parent options. evt.getChannel().getConfig().setOptions(parentOptions); } finally { ctx.sendUpstream(evt); } boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress) assert finished; }
public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); if (tail == null) { try { getSink().eventSunk(this, e); return; } catch (Throwable t) { notifyHandlerException(e, t); return; } } sendDownstream(tail, e); }
public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { Channel channel = e.getChannel(); if (channel instanceof NioServerSocketChannel) { handleServerSocket(e); } else if (channel instanceof NioSocketChannel) { handleAcceptedSocket(e); } }
private void handleServerSocket(ChannelEvent e) { if (!(e instanceof ChannelStateEvent)) { return; } ChannelStateEvent event = (ChannelStateEvent) e; NioServerSocketChannel channel = (NioServerSocketChannel) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { close(channel, future); } break; } }
private void bind( NioServerSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { boolean bound = false; boolean bossStarted = false; try { channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound = true; future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); //取出一个boss线程,然后交给Boss类去处理。 Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; DeadLockProofWorker.start( bossExecutor, new ThreadRenamingRunnable( new Boss(channel), "New I/O server boss #" + id + " (" + channel + ')')); bossStarted = true; } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { if (!bossStarted && bound) { close(channel, future); } } }
private final Selector selector; private final NioServerSocketChannel channel; Boss(NioServerSocketChannel channel) throws IOException { this.channel = channel; selector = Selector.open(); boolean registered = false; try { channel.socket.register(selector, SelectionKey.OP_ACCEPT); registered = true; } finally { if (!registered) { closeSelector(); } } channel.selector = selector;
public void run() { final Thread currentThread = Thread.currentThread(); channel.shutdownLock.lock(); try { for (;;) { try { if (selector.select(1000) > 0) { selector.selectedKeys().clear(); } SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket != null) { registerAcceptedChannel(acceptedSocket, currentThread); } } catch (SocketTimeoutException e) { // Thrown every second to get ClosedChannelException // raised. } catch (CancelledKeyException e) { // Raised by accept() when the server socket was closed. } catch (ClosedSelectorException e) { // Raised by accept() when the server socket was closed. } catch (ClosedChannelException e) { // Closed as requested. break; } catch (Throwable e) { logger.warn( "Failed to accept a connection.", e); try { Thread.sleep(1000); } catch (InterruptedException e1) { // Ignore } } } } finally { channel.shutdownLock.unlock(); closeSelector(); } }
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { try { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); NioWorker worker = nextWorker(); //获取一个NioWorker //将Channel注册到NioWorker上去 worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); } catch (Exception e) { logger.warn( "Failed to initialize an accepted socket.", e); try { acceptedSocket.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially accepted socket.", e2); } } }