Apache MINA 1.x 中的ByteBuffer对象使用的问题
最近在为公司做一个消息中心的项目,项目中使用了Apache的MINA 1.7的版本,为消息中心做通讯接口。
[size=13px; line-height: 20px; ][size=x-small;] Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序,MINA 所支持的功能也在进一步的扩展中。[/size][/size]
使用MINA组件确实可以提高通讯的并发性能以及可靠性,在1.7版本中,MINA封装了JDK中的ByteBuffer,虽然调用上方便了不少,但是如果不正当的使用它,容易引起性能上的问题,甚至会出现Out of Memory的问题。这次我就遇到了Out of Memory的问题,后来经过我的仔细分析和调试,终于发现了这条臭虫。
首先来看一下如何搭建MINA的TCP的服务端:
// output log to consoleorg.apache.log4j.BasicConfigurator.configure();acceptor = new SocketAcceptor(); // Create a service configuration SocketAcceptorConfig cfg = new SocketAcceptorConfig(); cfg.setReuseAddress(true); // add a protocol codec, the protocol is wrapper from 0x00 to 0xff, and between 0x00 and 0xff is a message. cfg.getFilterChain().addLast( "protocolFilter", new ProtocolCodecFilter( new WrapperTagProtocolCodecFactory())); // add logger filter cfg.getFilterChain().addLast("logger", new LoggingFilter()); acceptor .bind(new InetSocketAddress(PORT), new IoHandlerAdapter(){@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {// do something...}@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {session.write("response something").join();} }, cfg);
public class WrapperTagProtocolCodecFactory extendsDemuxingProtocolCodecFactory {public WrapperTagProtocolCodecFactory(){super.register(WrapperTagDecoder.class);super.register(WrapperTagEncoder.class);}public WrapperTagProtocolCodecFactory(String charset){super.register(new WrapperTagDecoder(charset));super.register(new WrapperTagEncoder(charset));}}
public class WrapperTagDecoder extends MessageDecoderAdapter {private String charset = "gb2312";private Logger logger = Logger.getLogger(WrapperTagDecoder.class);public WrapperTagDecoder(String charset){this.charset = charset;}public WrapperTagDecoder(){}public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {return MessageDecoderResult.OK;}public MessageDecoderResult decode(IoSession session, ByteBuffer in,ProtocolDecoderOutput out) throws Exception {InputStream stream = in.asInputStream();int head = stream.read();if(head == -1){throw new IOException("read -1 byte");}logger.info("Read head 0x00.");int b = -1;ByteArrayOutputStream os = new ByteArrayOutputStream();while((b = stream.read()) != 0xff){os.write(b);}logger.info("Read rear 0xff.");String message = new String(os.toByteArray(), charset);out.write(message);logger.debug("Read message:" + message + ", charset:" + charset);return MessageDecoderResult.OK;}}
public class WrapperTagEncoder implements MessageEncoder {private static final Set<Class<?>> TYPES;private String charset = "gb2312";static { Set<Class<?>> types = new HashSet<Class<?>>(); types.add(String.class); TYPES = Collections.unmodifiableSet(types); }public WrapperTagEncoder(String charset){this.charset = charset;}public WrapperTagEncoder(){}public Set<Class<?>> getMessageTypes() {return TYPES;}public void encode(IoSession session, Object message,ProtocolEncoderOutput out) throws Exception {ByteBuffer buf = ByteBuffer.allocate(256); // Enable auto-expand for easier encoding buf.setAutoExpand(true); buf.put((byte)0x00); buf.put(message.toString().getBytes(charset)); buf.put((byte)0xff); buf.flip(); out.write(buf);}
@Override public int read() { if (ByteBuffer.this.hasRemaining()) { return ByteBuffer.this.get() & 0xff; } else { return -1; } } @Override public int read(byte[] b, int off, int len) { int remaining = ByteBuffer.this.remaining(); if (remaining > 0) { int readBytes = Math.min(remaining, len); ByteBuffer.this.get(b, off, readBytes); return readBytes; } else { return -1; } }
public class WrapperTagDecoder extends MessageDecoderAdapter {private String charset = "gb2312";private Logger logger = Logger.getLogger(WrapperTagDecoder.class);private boolean readRear = false;private boolean readHead = false;private int receivedBufferSize = 1024;private ByteArrayOutputStream os = new ByteArrayOutputStream();public WrapperTagDecoder(String charset){this.charset = charset;}public int getReceivedBufferSize() {return receivedBufferSize;}public void setReceivedBufferSize(int receivedBufferSize) {this.receivedBufferSize = receivedBufferSize;}public WrapperTagDecoder(){}public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {return MessageDecoderResult.OK;}public MessageDecoderResult decode(IoSession session, ByteBuffer in,ProtocolDecoderOutput out) throws Exception {InputStream stream = in.asInputStream();if(!readHead){int head = stream.read();if(head == -1){throw new IOException("read -1 byte");}readHead = true;logger.info("Read head 0x00.");}byte[] buffer = new byte[receivedBufferSize];int length = stream.read(buffer);if(length == -1)return MessageDecoderResult.OK;if(buffer[length - 1] == -1){readRear = true;logger.info("Read rear 0xff.");os.write(buffer, 0, length - 1);}else{os.write(buffer, 0, length);}if(readRear){String message = new String(os.toByteArray(), charset);out.write(message);logger.debug("Read message:" + message + ", charset:" + charset);readRear = false;readHead = false;os.reset();}return MessageDecoderResult.OK;}}