Netty源码学习-ObjectEncoder和ObjectDecoder
Netty中传递对象的思路很直观:
Netty中数据的传递是基于ChannelBuffer(也就是byte[]);
那把对象序列化为字节流,就可以在Netty中传递对象了
相应的从ChannelBuffer恢复对象,就是反序列化的过程
Netty已经封装好ObjectEncoder和ObjectDecoder
先看ObjectEncoder
ObjectEncoder是往外发送对象,因此ObjectEncoder肯定是一个ChannelDownstreamHandler
ObjectEncoder extends OneToOneEncoder,而OneToOneEncoder implements ChannelDownstreamHandler
OneToOneEncoder主要是做两件事情
一是encode
二是把消息发送到紧接着的下一个ChannelDownstreamHandler
其中第一个操作交由子类实现,是一个抽象方法
关键源码如下:
public void handleDownstream( ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { if (!(evt instanceof MessageEvent)) { ctx.sendDownstream(evt); return; } MessageEvent e = (MessageEvent) evt; Object originalMessage = e.getMessage(); Object encodedMessage = encode(ctx, e.getChannel(), originalMessage); if (originalMessage == encodedMessage) { ctx.sendDownstream(evt); } else if (encodedMessage != null) { write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress()); }}
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { ChannelBufferOutputStream bout = new ChannelBufferOutputStream(dynamicBuffer( estimatedLength, ctx.getChannel().getConfig().getBufferFactory())); bout.write(LENGTH_PLACEHOLDER); ObjectOutputStream oout = new CompactObjectOutputStream(bout); oout.writeObject(msg); oout.flush(); oout.close(); ChannelBuffer encoded = bout.buffer(); encoded.setInt(0, encoded.writerIndex() - 4); return encoded; }
public class ObjectDecoder extends LengthFieldBasedFrameDecoder { /*lengthFieldLength = 4,initialBytesToStrip = 4(去掉前面4个字节)*/public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {super(maxObjectSize, 0, 4, 0, 4);this.classResolver = classResolver;}@Overrideprotected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {//根据header(length) + content的格式,取得contentChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer);if (frame == null) {return null;}//反序列化return new CompactObjectInputStream(new ChannelBufferInputStream(frame), classResolver).readObject();}/*重写了父类的这个方法,避免了memory copy。本方法被decode方法调用在LengthFieldBasedFrameDecoder写道:If you are sure that the frame and its content are not accessed afterthe current {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer)}call returns, you can even avoid memory copy by returning the sliced*/@Overrideprotected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) {return buffer.slice(index, length);}}
static String decodeString(ByteBuffer src, Charset charset) static ByteBuffer encodeString(CharBuffer src, Charset charset)
@Override protected Object decode( ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { if (!(msg instanceof ChannelBuffer)) { return msg; } return ((ChannelBuffer) msg).toString(charset); }
return ChannelBuffers.decodeString( toByteBuffer(index, length), charset);
private static ChannelBuffer copiedBuffer(ByteOrder endianness, CharBuffer buffer, Charset charset) { CharBuffer src = buffer; ByteBuffer dst = ChannelBuffers.encodeString(src, charset); ChannelBuffer result = wrappedBuffer(endianness, dst.array()); result.writerIndex(dst.remaining()); return result; }