netty time 例子
//TIME 服务器端协议实现
?
package com.bigdata.jboss.basic;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelFuture;import org.jboss.netty.channel.ChannelFutureListener;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ChannelStateEvent;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class TimeServerHandler extends SimpleChannelHandler{@Overridepublic void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception {Channel channel = e.getChannel();ChannelBuffer buffer = ChannelBuffers.buffer(4);buffer.writeInt((int) (System.currentTimeMillis()/1000));ChannelFuture future = channel.write(buffer);future.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {future.getChannel().close();}});}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception {e.getCause().printStackTrace();e.getChannel().close();}}
? ?1.channelConnected 方法,当连接已经建立后,会调用。会返回给客户端以秒显示的整数
? ?2.ChannelBuffers是一个帮助类(非常有用),buffer方法会近回4字节的ChannelBuffer实例。
? ?3.Channel的write方法会返回ChannelFuture实例,ChannelFuture实例会等待客户端接收到数据后,然后关闭连接。
? ?4.获得客户端读取完数据的通知,是通过给ChannelFuture添加ChannelFutureListener实例,来获得客户端读取完成的时 ? ? ?机。也可以简化注册监听f.addListener(ChannelFutureListener.CLOSE);
package com.bigdata.jboss.basic;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelFactory;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;public class TimeServer {public static void main(String[] args) {ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());ServerBootstrap bootstrap = new ServerBootstrap(factory);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() throws Exception {return Channels.pipeline(new TimeServerHandler());}});bootstrap.setOption("child.tcpNoDelay", true);bootstrap.setOption("child.keepAlive", true);bootstrap.bind(new InetSocketAddress(8080));System.out.println("Time Server started");}}? ?//TIME 客户端协议实现
package com.bigdata.jboss.basic;import java.util.Date;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class TimeClientHandler extends SimpleChannelHandler {@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e)throws Exception {ChannelBuffer buffer = (ChannelBuffer) e.getMessage();long date = buffer.readInt()*1000L;e.getChannel().close();System.out.println(new Date(date));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception {e.getCause().printStackTrace();e.getChannel().close();}}?
//TIME 客户端
?
package com.bigdata.jboss.basic;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ClientBootstrap;import org.jboss.netty.channel.ChannelFactory;import org.jboss.netty.channel.ChannelFuture;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;public class TimeClient {public static void main(String[] args) {String host = args[0];int port = Integer.parseInt(args[1]);ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());ClientBootstrap bootstrap = new ClientBootstrap(factory);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() throws Exception {return Channels.pipeline(new TimeClientHandler());}});bootstrap.setOption("tcpNoDelay", true);bootstrap.setOption("keepAlive", true);ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));future.awaitUninterruptibly();if(false == future.isSuccess()){future.getCause().printStackTrace();}future.getChannel().getCloseFuture().awaitUninterruptibly();factory.releaseExternalResources();}}
? ?1.NioClientSocketChannelFactory是客户端的ChannelFactory类。
? ?2.ClientBootStrapo是客户端的BootStrap。
? ?3.客户端TCP/IP参数无"child."前缀。
? ?4.使用connect连接服务器。
? ?5.ClientBootstrap返回一个ChannelFuture,指明当连接成功或失败。
? ?6.等待连接尝试是否成功或失败。
? ?7.如果失败,会输出异常信息。
? ?8.等待closeFuture,连接使用完成。
? ?9.ChannelFactory释放所有资源,如线程池等。
?
//客户端增强
原因:以流传输的TCP/IP,OS的Receive Buffer Queue不是以包来缓存,而是缓存bytes,当Traffic比较大的时候,会造成取的Frames不完整,需要判断接收到的数据是否完整。
?
//TIME Decoder
?
package com.bigdata.jboss.basic;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.frame.FrameDecoder;public class TimeDecoder extends FrameDecoder{@Overrideprotected Object decode(ChannelHandlerContext ctx, Channel channel,ChannelBuffer buffer) throws Exception {Object result = null;if(buffer.readableBytes() < 4){result = null;}else{result = buffer.readBytes(4);}return result;}}
package com.bigdata.jboss.basic;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ClientBootstrap;import org.jboss.netty.channel.ChannelFactory;import org.jboss.netty.channel.ChannelFuture;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;public class TimeClientImprovment {public static void main(String[] args) {String host = args[0];int port = Integer.parseInt(args[1]);ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());ClientBootstrap bootstrap = new ClientBootstrap(factory);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() throws Exception {// impromentreturn Channels.pipeline(new TimeDecoder(), new TimeClientHandler());}});bootstrap.setOption("tcpNoDelay", true);bootstrap.setOption("keepAlive", true);ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));future.awaitUninterruptibly();if(false == future.isSuccess()){future.getCause().printStackTrace();}future.getChannel().getCloseFuture().awaitUninterruptibly();factory.releaseExternalResources();}}
?
//传递POJO信息
?
?
package com.bigdata.jboss.basic;import java.util.Date;public class UnixTime {private final int value;public UnixTime(int value) {super();this.value = value;}public int getValue() {return value;}@Overridepublic String toString() {return new Date(value*1000L).toString();}}
package com.bigdata.jboss.basic;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelFuture;import org.jboss.netty.channel.ChannelFutureListener;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ChannelStateEvent;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class TimeServerHandler extends SimpleChannelHandler{@Overridepublic void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception {Channel channel = e.getChannel();ChannelBuffer buffer = ChannelBuffers.buffer(4);buffer.writeInt((int) (System.currentTimeMillis()/1000));ChannelFuture future = channel.write(buffer);future.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {future.getChannel().close();}});}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception {e.getCause().printStackTrace();e.getChannel().close();}}
package com.bigdata.jboss.basic;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class UnixTimeEncoder extends SimpleChannelHandler{@Overridepublic void writeRequested(ChannelHandlerContext ctx, MessageEvent e)throws Exception {ChannelBuffer buffer = ChannelBuffers.buffer(4);UnixTime unixTime = (UnixTime) e.getMessage();buffer.writeInt(unixTime.getValue());Channels.write(ctx, e.getFuture(),buffer);}}
package com.bigdata.jboss.basic;import java.net.InetSocketAddress;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelFactory;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;public class UnixTimeServer {/** * @param args */public static void main(String[] args) {ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());ServerBootstrap bootstrap = new ServerBootstrap(factory);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() throws Exception {return Channels.pipeline(new UnixTimeServerHandler(),new UnixTimeEncoder());}});bootstrap.setOption("child.tcpNoDelay",true);bootstrap.setOption("child.keepAlive",true);bootstrap.bind(new InetSocketAddress(8080));System.out.println("UnixTime Server started");}}
package com.bigdata.jboss.basic;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class UnixTimeClientHandler extends SimpleChannelHandler{@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e)throws Exception { UnixTime unixTime = (UnixTime) e.getMessage(); e.getChannel().close(); System.out.println(unixTime);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception {e.getCause().printStackTrace();e.getChannel().close();}}
package com.bigdata.jboss.basic;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.frame.FrameDecoder;public class UnixTimeClientDecoder extends FrameDecoder{@Overrideprotected Object decode(ChannelHandlerContext ctx, Channel channel,ChannelBuffer buffer) throws Exception {UnixTime result = null;if(buffer.readableBytes() >= 4){result = new UnixTime(buffer.readInt());}return result;}}
package com.bigdata.jboss.basic;import java.net.InetAddress;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ClientBootstrap;import org.jboss.netty.channel.ChannelFactory;import org.jboss.netty.channel.ChannelFuture;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;public class UnixTimeClient {public static void main(String[] args) {String host = args[0];int port = Integer.parseInt(args[1]);ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());ClientBootstrap bootstrap = new ClientBootstrap(factory);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() throws Exception {return Channels.pipeline(new UnixTimeClientDecoder(),new UnixTimeClientHandler());}});bootstrap.setOption("tcpNoDelay", true);bootstrap.setOption("keepAlive",true);ChannelFuture future =bootstrap.connect(new InetSocketAddress(host, port));future.awaitUninterruptibly();if(false == future.isSuccess()){future.getCause().printStackTrace();}future.getChannel().getCloseFuture().awaitUninterruptibly();factory.releaseExternalResources();}}
?
?