首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

深入显出Netty之四 Client请求处理

2012-12-16 
深入浅出Netty之四Client请求处理前2篇分析了echo server端的运行机制,本篇同样以echo client为例,分析net

深入浅出Netty之四 Client请求处理

前2篇分析了echo server端的运行机制,本篇同样以echo client为例,分析netty的nio客户端的运行机制。

总体来说client端和server端的处理是类似的,NioWorker是重用的,也就意味着client和server的读写机制是一样的,都是通过worker线程来管理的。所不同的是Boss线程,server端的boss线程一个bind端口起一个,主要负责接收新请求,而client端的boss线程是一个可配置的数组,一个connect端口分配一个,主要负责connect过程,如果connect成功则将channle注册到worker线程中处理。在Client同样有PipelineSink,叫做NioClientSocketPipelineSink,也是负责底层IO和pipeline之间的交互。

EchoClient代码:

?

        // 初始化Bootstrap和NioClientSocketChannelFactory,这一步将启动nioWorker线程,并初始化NioClientSocketPipelineSink,并将Boss线程创建          ClientBootstrap bootstrap = new ClientBootstrap(                  new NioClientSocketChannelFactory(                          Executors.newCachedThreadPool(),                          Executors.newCachedThreadPool()));            // 用户自定义的pipeline工厂          bootstrap.setPipelineFactory(new ChannelPipelineFactory() {              public ChannelPipeline getPipeline() throws Exception {                  return Channels.pipeline(                          new EchoClientHandler(firstMessageSize));              }          });            // 异步创建连接          ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));            //等待连接关闭          future.getChannel().getCloseFuture().awaitUninterruptibly();            // 关闭资源,线程池等          bootstrap.releaseExternalResources();

具体connect过程:

一.创建client的channel

?

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {......//拿用户自定义的pipeline        ChannelPipeline pipeline;        try {            pipeline = getPipelineFactory().getPipeline();        } catch (Exception e) {            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);        }        // 从ChannelFactory中,创建Channel,对于client来说factory是NioClientSocketChannelFactory,Channel是NioClientSocketChannel        Channel ch = getFactory().newChannel(pipeline);        // 通过channel连接        return ch.connect(remoteAddress);    }

?二.创建channel时,分配worker

?

 public SocketChannel newChannel(ChannelPipeline pipeline) {        return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());    }

?三.创建内部的SocketChannel,触发ChannelOpen事件,不过echo client的handler没有对channelOpen事件做处理

?

NioClientSocketChannel(            ChannelFactory factory, ChannelPipeline pipeline,            ChannelSink sink, NioWorker worker) {//创建socketChannel,并配置成异步模式        super(null, factory, pipeline, sink, newSocket(), worker);        //触发open事件        fireChannelOpen(this);    }

?四.创建socketChannel过程

?

private static SocketChannel newSocket() {        SocketChannel socket;//创建SocketChannel        try {            socket = SocketChannel.open();        } catch (IOException e) {            throw new ChannelException("Failed to open a socket.", e);        }        boolean success = false;//使用异步模式        try {            socket.configureBlocking(false);            success = true;        } catch (IOException e) {            throw new ChannelException("Failed to enter non-blocking mode.", e);        } finally {           ......        }        return socket;    }

?五.通过Channel进行connect

?

 public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {        if (remoteAddress == null) {            throw new NullPointerException("remoteAddress");        }//是否连接成功的future        ChannelFuture future = future(channel, true);//触发connected的downstream事件,对于echo client来说,由于没有downstream handler,所以直接被PipelineSink处理了        channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(                channel, future, ChannelState.CONNECTED, remoteAddress));        return future;    }

?六.NioClientSocketPipelineSink拿到CONNECTED的downstream事件,处理connect

?

case CONNECTED:                if (value != null) {                    connect(channel, future, (SocketAddress) value);                } else {                    channel.worker.close(channel, future);                }                break;
private void connect(            final NioClientSocketChannel channel, final ChannelFuture cf,            SocketAddress remoteAddress) {        try {    //尝试连接,如果成功,则直接将channel注册到worker线程中            if (channel.channel.connect(remoteAddress)) {                channel.worker.register(channel, cf);            }     //尝试连接失败,则将channel注册到某个boss线程中处理,该boss线程会接管该channel的connect过程    else {                channel.getCloseFuture().addListener(new ChannelFutureListener() {                    public void operationComplete(ChannelFuture f)                            throws Exception {                        if (!cf.isDone()) {                            cf.setFailure(new ClosedChannelException());                        }                    }                });                cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);                channel.connectFuture = cf;                nextBoss().register(channel);            }        } catch (Throwable t) {            cf.setFailure(t);            fireExceptionCaught(channel, t);            channel.worker.close(channel, succeededFuture(channel));        }    }

七.具体register过程

void register(NioClientSocketChannel channel) {    //client的注册任务,让boss线程监听connect事件            Runnable registerTask = new RegisterTask(this, channel);            Selector selector;            synchronized (startStopLock) {        //初始化boss线程,并启动之                if (!started) {                    //                     try {//创建一个Selector                        this.selector = selector =  Selector.open();                    } catch (Throwable t) {                        throw new ChannelException(                                "Failed to create a selector.", t);                    }                    // 启动boss线程                    boolean success = false;                    try {                        DeadLockProofWorker.start(bossExecutor,                                new ThreadRenamingRunnable(this,                                        "New I/O client boss #" + id + '-' + subId));                        success = true;                    } finally {......                started = true;//异步提交注册任务                boolean offered = registerTaskQueue.offer(registerTask);                assert offered;            }    //根据超时时间,启动一个超时提醒任务            int timeout = channel.getConfig().getConnectTimeoutMillis();            if (timeout > 0) {                if (!channel.isConnected()) {                    channel.timoutTimer = timer.newTimeout(wakeupTask,                            timeout, TimeUnit.MILLISECONDS);                }            }        }

八.register之后,主线程就返回了,Boss线程异步执行

?

    public void run() {            boolean shutdown = false;            int selectReturnsImmediately = 0;            Selector selector = this.selector;            // use 80% of the timeout for measure            final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;            boolean wakenupFromLoop = false;            for (;;) {                wakenUp.set(false);                try {                    long beforeSelect = System.nanoTime();    //select,500ms超时                    int selected = SelectorUtil.select(selector);    .......    //处理注册任务,将channel注册到自己的selector范围中                    processRegisterTaskQueue();    //处理IO就绪事件,这里是connect事件                    processSelectedKeys(selector.selectedKeys());                    // 处理超时,如果超时,则设置future失败                    long currentTimeNanos = System.nanoTime();                    processConnectTimeout(selector.keys(), currentTimeNanos);                   ......        }

?

?九.注册任务执行

?

public void run() {            try {//注册一个connect key到boss的selector上                channel.channel.register(                        boss.selector, SelectionKey.OP_CONNECT, channel);            } catch (ClosedChannelException e) {                channel.worker.close(channel, succeededFuture(channel));            }    //设置超时点            int connectTimeout = channel.getConfig().getConnectTimeoutMillis();            if (connectTimeout > 0) {                channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;            }        }

?

?十.boss处理IO,client是connect就绪

?

private void processSelectedKeys(Set<SelectionKey> selectedKeys) {           ......            for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {                SelectionKey k = i.next();                i.remove();                if (!k.isValid()) {                    close(k);                    continue;                }                try {    //如果connect就位,判断connect是否确实完成                    if (k.isConnectable()) {                        connect(k);                    }                } catch (Throwable t) {                   ......                }            }        }
private void connect(SelectionKey k) throws IOException {            NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();    //如果连接成功,则将channel注册上worker线程中处理读写            if (ch.channel.finishConnect()) {                k.cancel();                if (ch.timoutTimer != null) {                    ch.timoutTimer.cancel();                }                ch.worker.register(ch, ch.connectFuture);            }        }

?

?十一.注册过程

?

public void run() {           ......//默认监听read事件                synchronized (channel.interestOpsLock) {                    channel.channel.register(                            selector, channel.getRawInterestOps(), channel);                }//通知等待线程成功                if (future != null) {                    channel.setConnected();                    future.setSuccess();                }                ......//触发ChannelConnected事件                fireChannelConnected(channel, remoteAddress);            ......            }        }

?十二.echo client handler处理channelConnected事件,开始发送数据

?

 public void channelConnected(              ChannelHandlerContext ctx, ChannelStateEvent e) {          // Send the first message.  Server will not send anything here          // because the firstMessage's capacity is 0.          e.getChannel().write(firstMessage);      }

?十三.write之后的过程client端和server是一样的,可以参考前一篇server端的读写

?

?

热点排行