JAVA-NIO程序设计完整实例
NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.
?
为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.
TestMain.java:
?
package com.test.web;public class TestMain {/** * @param args */public static void main(String[] args) throws Exception{int port = 30008;ServerControllor sc = new ServerControllor(port);sc.start();Thread.sleep(2000);ClientControllor cc = new ClientControllor("127.0.0.1", port);cc.start();Packet p1 = Packet.wrap("Hello,I am first!");cc.put(p1);Packet p2 = Packet.wrap("Hello,I am second!");cc.put(p2);Packet p3 = Packet.wrap("Hello,I am thread!");cc.put(p3);}}
?
?
ClientControllor.java
?
?
package com.test.web;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.zip.Adler32;import java.util.zip.Checksum;public class ClientControllor {private BlockingQueue<Packet> inner = new LinkedBlockingQueue<Packet>(100);//no any moreprivate Object lock = new Object();private InetSocketAddress remote;private Thread thread = new ClientThread(remote);public ClientControllor(String host,int port){remote = new InetSocketAddress(host, port);}public void start(){if(thread.isAlive() || remote == null){return;}synchronized (lock) {thread.start();}}public boolean put(Packet packet){return inner.offer(packet);}public void clear(){inner.clear();}class ClientThread extends Thread {SocketAddress remote;SocketChannel channel;ClientThread(SocketAddress remote){this.remote = remote;}@Overridepublic void run(){try{try{channel = SocketChannel.open();channel.configureBlocking(true);boolean isSuccess = channel.connect(new InetSocketAddress(30008));if(!isSuccess){while(!channel.finishConnect()){System.out.println("Client is connecting...");}}System.out.println("Client is connected.");//Selector selector = Selector.open();//channel.register(selector, SelectionKey.OP_WRITE);//while(selector.isOpen()){//selector.select();//Iterator<SelectionKey> it = selector.selectedKeys().iterator();//while(it.hasNext()){//SelectionKey key = it.next();//it.remove();//if(!key.isValid()){//continue;//}//if(key.isWritable()){//write();//}//}//}while(channel.isOpen()){write();}}catch(Exception e){e.printStackTrace();}finally{if(channel != null){try{channel.close();}catch(Exception ex){ex.printStackTrace();}}}}catch(Exception e){e.printStackTrace();inner.clear();}}private void write() throws Exception{Packet packet = inner.take();synchronized (lock) {ByteBuffer body = packet.getBuffer();//ByteBuffer head = ByteBuffer.allocate(4);head.putInt(body.limit());head.flip();while(head.hasRemaining()){channel.write(head);}Checksum checksum = new Adler32();while(body.hasRemaining()){checksum.update(body.get());}body.rewind();while(body.hasRemaining()){channel.write(body);}long cks = checksum.getValue();ByteBuffer tail = ByteBuffer.allocate(8);tail.putLong(cks);tail.flip();while(tail.hasRemaining()){channel.write(tail);}}}}}
?
?
Handler.java(接口,面向设计):
?
package com.test.web;import java.nio.channels.SocketChannel;public interface Handler {public void handle(SocketChannel channel);}
?
?
Packet.java
?
package com.test.web;import java.io.Serializable;import java.nio.ByteBuffer;import java.nio.charset.Charset;public class Packet implements Serializable {/** * */private static final long serialVersionUID = 7719389291885063462L;private ByteBuffer buffer;private static Charset charset = Charset.defaultCharset();private Packet(ByteBuffer buffer){this.buffer = buffer;}public String getDataAsString(){return charset.decode(buffer).toString();}public byte[] getData(){return buffer.array();}public ByteBuffer getBuffer(){return this.buffer;}public static Packet wrap(ByteBuffer buffer){return new Packet(buffer);}public static Packet wrap(String data){ByteBuffer source = charset.encode(data);return new Packet(source);}}
?
?
ServerControllor.java
?
package com.test.web;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class ServerControllor {private int port;private Thread thread = new ServerThread();;private Object lock = new Object();public ServerControllor(){this(0);}public ServerControllor(int port){this.port = port;}public void start(){if(thread.isAlive()){return;}synchronized (lock) {thread.start();System.out.println("Server starting....");}}class ServerThread extends Thread {private static final int TIMEOUT = 3000;private ServerHandler handler = new ServerHandler();@Overridepublic void run(){try{ServerSocketChannel channel = null;try{channel = ServerSocketChannel.open();channel.configureBlocking(false);channel.socket().setReuseAddress(true);channel.socket().bind(new InetSocketAddress(port));Selector selector = Selector.open();channel.register(selector, SelectionKey.OP_ACCEPT);while(selector.isOpen()){System.out.println("Server is running,port:" + channel.socket().getLocalPort());if(selector.select(TIMEOUT) == 0){continue;}Iterator<SelectionKey> it = selector.selectedKeys().iterator();while(it.hasNext()){SelectionKey key = it.next();it.remove();if(!key.isValid()){continue;}if(key.isAcceptable()){accept(key);}else if(key.isReadable()){read(key);}}}}catch(Exception e){e.printStackTrace();}finally{if(channel != null){try{channel.close();}catch(Exception ex){ex.printStackTrace();}}}}catch(Exception e){e.printStackTrace();}}private void accept(SelectionKey key) throws Exception{SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();socketChannel.configureBlocking(true);//socketChannel.register(key.selector(), SelectionKey.OP_READ);handler.handle(socketChannel);}private void read(SelectionKey key) throws Exception{SocketChannel channel = (SocketChannel)key.channel();//handler.handle(channel);}}}
?
?
ServerHandler.java
?
?
package com.test.web;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.HashMap;import java.util.Map;import java.util.concurrent.Semaphore;import java.util.zip.Adler32;import java.util.zip.Checksum;class ServerHandler implements Handler {private static Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() + 1);private static Map<SocketChannel,Thread> holder = new HashMap<SocketChannel,Thread>(32);@Overridepublic void handle(SocketChannel channel) {synchronized (holder) {if(holder.containsKey(channel)){return;}Thread t = new ReadThread(channel);holder.put(channel, t);t.start();}}static class ReadThread extends Thread{SocketChannel channel;ReadThread(SocketChannel channel){this.channel = channel;}@Overridepublic void run(){try{semaphore.acquire();boolean eof = false;while(channel.isOpen()){//ByteBuffer byteBuffer = new ByteBuffer(1024);ByteBuffer head = ByteBuffer.allocate(4);//int for data-sizewhile(true){int cb = channel.read(head);if(cb == -1){throw new RuntimeException("EOF error,data lost!");}if(isFull(head)){break;}}head.flip();int dataSize = head.getInt();if(dataSize <= 0){throw new RuntimeException("Data format error,something lost???");}ByteBuffer body = ByteBuffer.allocate(dataSize);while(true){int cb = channel.read(body);if(cb == -1){throw new RuntimeException("EOF error,data lost!");}else if(cb == 0 && this.isFull(body)){break;}}ByteBuffer tail = ByteBuffer.allocate(8);//int for data-sizewhile(true){int cb = channel.read(tail);if(cb == -1){eof = true;}if(isFull(tail)){break;}}tail.flip();long sck = tail.getLong();Checksum checksum = new Adler32();checksum.update(body.array(), 0, dataSize);long cck = checksum.getValue();if(sck != cck){throw new RuntimeException("Sorry,some data lost or be modified,please check!");}body.flip();Packet packet = Packet.wrap(body);System.out.println(packet.getDataAsString());if(eof){break;}}}catch(Exception e){e.printStackTrace();}finally{if(channel != null){try{channel.close();}catch(Exception ex){ex.printStackTrace();}}holder.remove(channel);semaphore.release();}}private boolean isFull(ByteBuffer byteBuffer){return byteBuffer.position() == byteBuffer.capacity() ? true : false;}}}
?
?
--End--