Apache mina设置默认的write和read buffer,以及奥秘
最近在做一个项目,用到mina,但是对于mina发送文件,或者报文分包发送有很多不明白的。查看了很多资料,其中找到一位仁兄的发送文件的代码,是一个客户端上传文件到服务器的例子。
作为本文的引子:
客户端程序
package test;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import org.apache.mina.core.RuntimeIoException;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.session.IoSession;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class ClientMain {/** * @author daijun ,Nov 26, 2009 * @param args * @throws InterruptedException * @throws IOException */public static void main(String[] args) throws InterruptedException, IOException {try {NioSocketConnector connector = new NioSocketConnector();DefaultIoFilterChainBuilder chain = connector.getFilterChain();connector.setHandler(new FileSenderHandler());ConnectFuture connectFuture = connector.connect(new InetSocketAddress("127.0.0.1", 3333));IoSession session = null;for (;;) {try {ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 3333));future.awaitUninterruptibly();session = future.getSession();break;} catch (RuntimeIoException e) {System.err.println("Failed to connect.");e.printStackTrace();Thread.sleep(5000);}}File f = new File("e:/20100425171.jpg");// System.out.println(f.length());FileInputStream fin = new FileInputStream(f);FileChannel fc = fin.getChannel();ByteBuffer bb = ByteBuffer.allocate(2048 * 1000);boolean flag = true;while (true) {// 不间断发送会导致buffer异常if (!flag) {Thread.sleep(1000);}bb.clear();int i = fc.read(bb);System.out.println(i);if (i == -1) {System.out.println("exit");break;}// 包装成自己的iobufferIoBuffer ib = IoBuffer.wrap(bb);bb.flip();session.write(ib);flag = false;}session.close(true);} catch (Throwable e) {e.printStackTrace();}}}
package test;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IoSession;public class FileSenderHandler extends IoHandlerAdapter {@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {}@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {session.close(true);}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println("hidsda");super.messageSent(session, message);}}
package test;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;import org.apache.mina.transport.socket.SocketAcceptor;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class ServerMain {/** * @author daijun ,Nov 26, 2009 * @param args * @throws IOException */public static void main(String[] args) throws IOException {SocketAcceptor acceptor = new NioSocketAcceptor();DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();acceptor.setHandler(new FileReceiveHandler());acceptor.bind(new InetSocketAddress(3333));}}
package test;import java.io.FileOutputStream;import java.nio.channels.FileChannel;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;public class FileReceiveHandler extends IoHandlerAdapter {private static FileChannel fc;@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {IoBuffer ib = (IoBuffer) message;System.out.println(ib.array().length);if (fc == null) {fc = new FileOutputStream("z:\\copyed.rar").getChannel();}fc.write(ib.buf());}@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {System.out.println("over");fc.close();session.close(true);}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {System.out.println("session idle");}}
File f = new File("e:/20100425171.jpg");// System.out.println(f.length());FileInputStream fin = new FileInputStream(f);FileChannel fc = fin.getChannel();ByteBuffer bb = ByteBuffer.allocate(2048 * 1000);boolean flag = true;while (true) {// 不间断发送会导致buffer异常if (!flag) {Thread.sleep(1000);}bb.clear();int i = fc.read(bb);System.out.println(i);if (i == -1) {System.out.println("exit");break;}// 包装成自己的iobufferIoBuffer ib = IoBuffer.wrap(bb);bb.flip();session.write(ib);flag = false;}
@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {IoBuffer ib = (IoBuffer) message;System.out.println(ib.array().length);if (fc == null) {fc = new FileOutputStream("z:\\copyed.rar").getChannel();}fc.write(ib.buf());}
public static void main(String[] args) throws IOException {SocketAcceptor acceptor = new NioSocketAcceptor();DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();acceptor.setHandler(new FileReceiveHandler());acceptor.bind(new InetSocketAddress(3333));//acceptor.getSessionConfig().setReceiveBufferSize(1024);acceptor.getSessionConfig().setReadBufferSize(1024);acceptor.getSessionConfig().setMaxReadBufferSize(8888);System.out.println(acceptor.getSessionConfig().getReadBufferSize());}
acceptor.getSessionConfig().setReadBufferSize(1024);acceptor.getSessionConfig().setMaxReadBufferSize(8888);
private int minReadBufferSize = 64; private int readBufferSize = 2048; private int maxReadBufferSize = 65536; private int idleTimeForRead; private int idleTimeForWrite; private int idleTimeForBoth; private int writeTimeout = 60; private boolean useReadOperation; private int throughputCalculationInterval = 3;
private void read(T session) { IoSessionConfig config = session.getConfig(); IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize()); final boolean hasFragmentation = session.getTransportMetadata() .hasFragmentation(); try { int readBytes = 0; int ret; try { if (hasFragmentation) {//读取SocketChannelImp中的数据,然后装进buf中,SocketChannelImp中的数据多少由发送端决定,可以累积. while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain();//把读取的数据扔到过滤链中(另外一条线程,异步执行).最后反映到IoHandler中 filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) {//第一次以ReadBufferSize去读取,每一次增加读取量为前一次的两倍,直到等于MaxReadBufferSize为止 session.increaseReadBufferSize(); } } } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) { if (!(e instanceof PortUnreachableException) || !AbstractDatagramSessionConfig.class .isAssignableFrom(config.getClass()) || ((AbstractDatagramSessionConfig) config) .isCloseOnPortUnreachable()) scheduleRemove(session); } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } }