首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > Apache >

Apache mina设立默认的write和read buffer,以及奥秘

2012-07-25 
Apache mina设置默认的write和read buffer,以及奥秘最近在做一个项目,用到mina,但是对于mina发送文件,或者

Apache mina设置默认的write和read buffer,以及奥秘
最近在做一个项目,用到mina,但是对于mina发送文件,或者报文分包发送有很多不明白的。查看了很多资料,其中找到一位仁兄的发送文件的代码,是一个客户端上传文件到服务器的例子。

作为本文的引子:

客户端程序

package test;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import org.apache.mina.core.RuntimeIoException;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.session.IoSession;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class ClientMain {/** * @author daijun ,Nov 26, 2009 * @param args * @throws InterruptedException * @throws IOException */public static void main(String[] args) throws InterruptedException, IOException {try {NioSocketConnector connector = new NioSocketConnector();DefaultIoFilterChainBuilder chain = connector.getFilterChain();connector.setHandler(new FileSenderHandler());ConnectFuture connectFuture = connector.connect(new InetSocketAddress("127.0.0.1", 3333));IoSession session = null;for (;;) {try {ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 3333));future.awaitUninterruptibly();session = future.getSession();break;} catch (RuntimeIoException e) {System.err.println("Failed to connect.");e.printStackTrace();Thread.sleep(5000);}}File f = new File("e:/20100425171.jpg");// System.out.println(f.length());FileInputStream fin = new FileInputStream(f);FileChannel fc = fin.getChannel();ByteBuffer bb = ByteBuffer.allocate(2048 * 1000);boolean flag = true;while (true) {// 不间断发送会导致buffer异常if (!flag) {Thread.sleep(1000);}bb.clear();int i = fc.read(bb);System.out.println(i);if (i == -1) {System.out.println("exit");break;}// 包装成自己的iobufferIoBuffer ib = IoBuffer.wrap(bb);bb.flip();session.write(ib);flag = false;}session.close(true);} catch (Throwable e) {e.printStackTrace();}}}


客户端的handler,其实没啥用,就是那位仁兄的代码

package test;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IoSession;public class FileSenderHandler extends IoHandlerAdapter {@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {}@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {session.close(true);}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println("hidsda");super.messageSent(session, message);}}



Server端代码:
package test;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;import org.apache.mina.transport.socket.SocketAcceptor;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class ServerMain {/** * @author daijun ,Nov 26, 2009 * @param args * @throws IOException */public static void main(String[] args) throws IOException {SocketAcceptor acceptor = new NioSocketAcceptor();DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();acceptor.setHandler(new FileReceiveHandler());acceptor.bind(new InetSocketAddress(3333));}}



Server端Handler:
package test;import java.io.FileOutputStream;import java.nio.channels.FileChannel;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;public class FileReceiveHandler extends IoHandlerAdapter {private static FileChannel fc;@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {IoBuffer ib = (IoBuffer) message;System.out.println(ib.array().length);if (fc == null) {fc = new FileOutputStream("z:\\copyed.rar").getChannel();}fc.write(ib.buf());}@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {System.out.println("over");fc.close();session.close(true);}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {System.out.println("session idle");}}



对于mina的普通机制,大家可以网上找资料,这里不重点这里提到。



通过上述的Client代码发送一个大图片文件(我选择了1.6MB左右的文件)
File f = new File("e:/20100425171.jpg");// System.out.println(f.length());FileInputStream fin = new FileInputStream(f);FileChannel fc = fin.getChannel();ByteBuffer bb = ByteBuffer.allocate(2048 * 1000);boolean flag = true;while (true) {// 不间断发送会导致buffer异常if (!flag) {Thread.sleep(1000);}bb.clear();int i = fc.read(bb);System.out.println(i);if (i == -1) {System.out.println("exit");break;}// 包装成自己的iobufferIoBuffer ib = IoBuffer.wrap(bb);bb.flip();session.write(ib);flag = false;}



而服务端的的handler部分
@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {IoBuffer ib = (IoBuffer) message;System.out.println(ib.array().length);if (fc == null) {fc = new FileOutputStream("z:\\copyed.rar").getChannel();}fc.write(ib.buf());}


被调用多次,输出结果如下
2048
4096
8192
16384
32768
65536
65536
65536
65536
65536
65536
65536
65536
65536
65536
…………(省略)


才发现Mina对大文件发送有大小限制,如果什么不设置的情况下,如果一个文件的大小超过
65536个字节,将被切片发送。

可以参考mina的类
org.apache.mina.core.session.IoSessionConfig
void setReadBufferSize(int size):
这个方法设置读取缓冲的字节数,但一般不需要调用这个方法,因为IoProcessor 会自动调
整缓冲的大小。你可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,这
样无论IoProcessor 无论如何自动调整,都会在你指定的区间。

此外IoSessionConfig还有一个很重要的方法setMaxReadBufferSize,设置最大读缓存内容
官方解析如下:

Sets the maximum size of the read buffer that I/O processor allocates per each read.  I/O processor will not increase the read buffer size to the greater value than this property value.

翻译:一旦设置 I/O processor读缓冲区(buffer)的最大容量, I/O processor的一次读取缓冲的容量无论怎样增加也不会超过此值(maxReadBufferSize)。


下面做一个实验,对ServerMain的main方法改变如下   :
public static void main(String[] args) throws IOException {SocketAcceptor acceptor = new NioSocketAcceptor();DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();acceptor.setHandler(new FileReceiveHandler());acceptor.bind(new InetSocketAddress(3333));//acceptor.getSessionConfig().setReceiveBufferSize(1024);acceptor.getSessionConfig().setReadBufferSize(1024);acceptor.getSessionConfig().setMaxReadBufferSize(8888);System.out.println(acceptor.getSessionConfig().getReadBufferSize());}

增加两行
acceptor.getSessionConfig().setReadBufferSize(1024);acceptor.getSessionConfig().setMaxReadBufferSize(8888);



同样文件,同样操作,可以看到服务器输出如下:
1024
1024
2048
4096
8192
8888
8888
8888
8888
8888
8888
8888
8888
8888
……………………(省略)

上面数据可以看出,读取Buffer的时候,先按照最ReadBufferSize来读取,然后每下一次读取的量都是前一次的两倍,直到与MaxReadBufferSize相同位置。
(mina默认的情况下,最大可以读取缓存大小为65536,最小为64,正常为2048)
请看org.apache.mina.core.session.AbstractIoSessionConfig

    private int minReadBufferSize = 64;    private int readBufferSize = 2048;    private int maxReadBufferSize = 65536;    private int idleTimeForRead;    private int idleTimeForWrite;    private int idleTimeForBoth;    private int writeTimeout = 60;    private boolean useReadOperation;    private int throughputCalculationInterval = 3;


对于读缓冲区增加

摘自类org.apache.mina.core.polling.AbstractPollingIoProcessor<T>

的read方法,这里是mina读取字节流的底层,(再往深一层就是rt.jar包提供sun.nio.ch.SocketChannelImpl类中的read(ByteBuffer paramByteBuffer)方法
这里就是mina通信最底层的原型,有兴趣的读者可以自己研读其代码,这里不冗述.)
下面是我对该代码的一些理解,用注释标明
    private void read(T session) {        IoSessionConfig config = session.getConfig();        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());        final boolean hasFragmentation = session.getTransportMetadata()                .hasFragmentation();        try {            int readBytes = 0;            int ret;            try {                if (hasFragmentation) {//读取SocketChannelImp中的数据,然后装进buf中,SocketChannelImp中的数据多少由发送端决定,可以累积.                    while ((ret = read(session, buf)) > 0) {                        readBytes += ret;                        if (!buf.hasRemaining()) {                            break;                        }                    }                } else {                    ret = read(session, buf);                    if (ret > 0) {                        readBytes = ret;                    }                }            } finally {                buf.flip();            }            if (readBytes > 0) {                IoFilterChain filterChain = session.getFilterChain();//把读取的数据扔到过滤链中(另外一条线程,异步执行).最后反映到IoHandler中                filterChain.fireMessageReceived(buf);                buf = null;                if (hasFragmentation) {                    if (readBytes << 1 < config.getReadBufferSize()) {                        session.decreaseReadBufferSize();                    } else if (readBytes == config.getReadBufferSize()) {//第一次以ReadBufferSize去读取,每一次增加读取量为前一次的两倍,直到等于MaxReadBufferSize为止                        session.increaseReadBufferSize();                    }                }            }                        if (ret < 0) {                scheduleRemove(session);            }        } catch (Throwable e) {            if (e instanceof IOException) {                if (!(e instanceof PortUnreachableException)                        || !AbstractDatagramSessionConfig.class                                .isAssignableFrom(config.getClass())                        || ((AbstractDatagramSessionConfig) config)                                .isCloseOnPortUnreachable())                    scheduleRemove(session);            }                        IoFilterChain filterChain = session.getFilterChain();            filterChain.fireExceptionCaught(e);        }    }


以上注释,说明为什么会每次读取量是前一次的两倍的原因。




下一篇文章还是以本代码解析一下发送端的write的buffer大小的奥秘

热点排行