首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > Apache >

Apache MINA学习(3)

2012-09-03 
Apache MINA学习(三)使用 ProtocolCodecFilterProtocolCodecFilter 用来在字节流和消息对象之间互相转换。

Apache MINA学习(三)
使用 ProtocolCodecFilter

ProtocolCodecFilter 用来在字节流和消息对象之间互相转换。当该过滤器接收到字节流的时候,需要首先判断消息的边界,然后把表示一条消息的字节提取出来,通过一定的逻辑转换成消息对象,再把消息对象往后传递,交给 I/O 处理器来执行业务逻辑。这个过程称为“解码”。与“解码”对应的是“编码”过程。在“编码”的时候,过滤器接收到的是消息对象,通过与“解码”相反的逻辑,把消息对象转换成字节,并反向传递,交给 I/O 服务来执行 I/O 操作。

在“编码”和“解码”中的一个重要问题是如何在字节流中判断消息的边界。通常来说,有三种办法解决这个问题:

使用固定长度的消息。这种方式实现起来比较简单,只需要每次读取特定数量的字节即可。
使用固定长度的消息头来指明消息主体的长度。比如每个消息开始的 4 个字节的值表示了后面紧跟的消息主体的长度。只需要首先读取该长度,再读取指定数量的字节即可。
使用分隔符。消息之间通过特定模式的分隔符来分隔。每次只要遇到该模式的字节,就表示到了一个消息的末尾。
具体到示例应用来说,客户端和服务器之间的通信协议比较复杂,有不同种类的消息。每种消息的格式都不相同,同类消息的内容也不尽相同。因此,使用固定长度的消息头来指明消息主体的长度就成了最好的选择。

示例应用中的每种消息主体由两部分组成,第一部分是固定长度的消息类别名称,第二部分是每种消息的主体内容。图 4 中给出了示例应用中一条完整的消息的结构。


图 4. 示例应用中消息的结构

AbstractTetrisCommand用来描述联机游戏示例应用中的消息。它是一个抽象类,是所有具体消息的基类。其具体实现如 清单 5 所示。


清单 5. 联机游戏示例应用中的消息 AbstractTetrisCommand
public abstract class AbstractTetrisCommand implements TetrisCommand { 
    public abstract String getName();

    public abstract byte[] bodyToBytes() throws Exception;

    public abstract void bodyFromBytes(byte[] bytes) throws Exception;

    public byte[] toBytes() throws Exception {
        byte[] body = bodyToBytes();
        int commandNameLength = Constants.COMMAND_NAME_LENGTH;
        int len = commandNameLength + body.length;
        byte[] bytes = new byte[len];
        String name = StringUtils.rightPad(getName(), commandNameLength,
            Constants.COMMAND_NAME_PAD_CHAR);
        name = name.substring(0, commandNameLength);
        System.arraycopy(name.getBytes(), 0, bytes, 0, commandNameLength);
        System.arraycopy(body, 0, bytes, commandNameLength, body.length);
        return bytes;
    }
}

如 清单 5 所示,AbstractTetrisCommand 中定义了 3 个抽象方法:getName、bodyToBytes 和 bodyFromBytes,分别用来获取消息的名称、把消息的主体转换成字节数组和从字节数组中构建消息。bodyToBytes对应于前面提到的“编码”过程,而 bodyFromBytes对应于“解码”过程。每种具体的消息都应该实现这 3 个方法。AbstractTetrisCommand 中的方法 toBytes 封装了把消息的主体转换成字节数组的逻辑,在字节数组中,首先是长度固定为 Constants.COMMAND_NAME_LENGTH的消息类别名称,紧接着是每种消息特定的主体内容,由 bodyToBytes 方法来生成。

在介绍完示例应用中的消息格式之后,下面将讨论具体的“编码”和“解码”过程。“编码”过程由编码器来完成,编码器需要实现 org.apache.mina.filter.codec.ProtocolEncoder 接口,一般来说继承自 org.apache.mina.filter.codec.ProtocolEncoderAdapter 并覆写所需的方法即可。清单 6 中给出了示例应用中消息编码器 CommandEncoder 的实现。


清单 6. 联机游戏示例应用中消息编码器 CommandEncoder
public class CommandEncoder extends ProtocolEncoderAdapter {

    public void encode(IoSession session, Object message,
        ProtocolEncoderOutput out) throws Exception {
        AbstractTetrisCommand command = (AbstractTetrisCommand) message;
        byte[] bytes = command.toBytes();
        IoBuffer buf = IoBuffer.allocate(bytes.length, false);
 
        buf.setAutoExpand(true);
        buf.putInt(bytes.length);
        buf.put(bytes);
 
        buf.flip();
        out.write(buf);
    }
}

在 清单 6 中,encode 方法封装了编码的逻辑。由于 AbstractTetrisCommand的 toBytes已经完成了到字节数组的转换,encode 方法直接使用即可。首先写入消息主体字节数组的长度,再是字节数组本身,就完成了编码的过程。

与编码过程相比,解码过程要相对复杂一些。具体的实现如 清单 7 所示。


清单 7. 联机游戏示例应用中消息解码器 CommandDecoder
public class CommandDecoder extends CumulativeProtocolDecoder {

    protected boolean doDecode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        if (in.prefixedDataAvailable(4, Constants.MAX_COMMAND_LENGTH)) {
            int length = in.getInt();
            byte[] bytes = new byte[length];
            in.get(bytes);
            int commandNameLength = Constants.COMMAND_NAME_LENGTH;
            byte[] cmdNameBytes = new byte[commandNameLength];
            System.arraycopy(bytes, 0, cmdNameBytes, 0, commandNameLength);
            String cmdName = StringUtils.trim(new String(cmdNameBytes));
            AbstractTetrisCommand command = TetrisCommandFactory
                .newCommand(cmdName);
            if (command != null) {
                byte[] cmdBodyBytes = new byte[length - commandNameLength];
                System.arraycopy(bytes, commandNameLength, cmdBodyBytes, 0,
                    length - commandNameLength);
                command.bodyFromBytes(cmdBodyBytes);
                out.write(command);
            }
            return true;
        } else {
            return false;
        }
    }
}

在 清单 7 中可以看到,解码器 CommandDecoder 继承自 CumulativeProtocolDecoder。这是 Apache MINA 提供的一个帮助类,它会自动缓存所有已经接收到的数据,直到编码器认为可以开始进行编码。这样在实现自己的编码器的时候,就只需要考虑如何判断消息的边界即可。如果一条消息的后续数据还没有接收到,CumulativeProtocolDecoder会自动进行缓存。在之前提到过,解码过程的一个重要问题是判断消息的边界。对于固定长度的消息来说,只需要使用 Apache MINA 的 IoBuffer的 remaining方法来判断当前缓存中的字节数目,如果大于消息长度的话,就进行解码;对于使用固定长度消息头来指明消息主体的长度的情况,IoBuffer提供了 prefixedDataAvailable方法来满足这一需求。prefixedDataAvailable会检查当前缓存中是否有固定长度的消息头,并且由此消息头指定长度的消息主体是否已经全部在缓存中。如果这两个条件都满足的话,说明一条完整的消息已经接收到,可以进行解码了。解码的过程本身并不复杂,首先读取消息的类别名称,然后通过 TetrisCommandFactory.newCommand方法来生成一个该类消息的实例,接着通过该实例的 bodyFromBytes方法就可以从字节数组中恢复消息的内容,得到一个完整的消息对象。每次成功解码一个消息对象,需要调用 ProtocolDecoderOutput的 write把此消息对象往后传递。消息对象会通过过滤器链,最终达到 I/O 处理器,在 IoHandler.messageReceived中接收到此消息对象。如果当前缓存的数据不足以用来解码一条消息的话,doDecode只需要返回 false即可。接收到新的数据之后,doDecode会被再次调用。

过滤器链

过滤器只有在添加到过滤器链中的时候才起作用。过滤器链是过滤器的容器。过滤器链与 I/O 会话是一一对应的关系。org.apache.mina.core.filterchain.IoFilterChain是 Apache MINA 中过滤器链的接口,其中提供了一系列方法对其中包含的过滤器进行操作,包括查询、添加、删除和替换等。如 表 5 所示。


表 5. IoFilterChain 接口的方法
方法说明
addFirst(String name, IoFilter filter)将指定名称的过滤器添加到过滤器链的开头。
addLast(String name, IoFilter filter)将指定名称的过滤器添加到过滤器链的末尾。
contains(String name)判断过滤器链中是否包含指定名称的过滤器。
get(String name)从过滤器链中获取指定名称的过滤器。
remove(String name)从过滤器链中删除指定名称的过滤器。
replace(String name, IoFilter newFilter)用过滤器 newFilter替换掉过滤器链中名为 name的过滤器。
getSession()获取与过滤器链一一对应的 I/O 会话。
在介绍完 I/O 过滤器和过滤器链之后,下面介绍 I/O 处理器。





回页首

I/O 处理器

I/O 事件通过过滤器链之后会到达 I/O 处理器。I/O 处理器中与 I/O 事件对应的方法会被调用。Apache MINA 中 org.apache.mina.core.service.IoHandler是 I/O 处理器要实现的接口,一般情况下,只需要继承自 org.apache.mina.core.service.IoHandlerAdapter并覆写所需方法即可。IoHandler接口的方法如 表 6 所示。


表 6. IoHandler 接口的方法
方法说明
sessionCreated(IoSession session)当有新的连接建立的时候,该方法被调用。
sessionOpened(IoSession session)当有新的连接打开的时候,该方法被调用。该方法在 sessionCreated之后被调用。
sessionClosed(IoSession session)当连接被关闭的时候,此方法被调用。
sessionIdle(IoSession session, IdleStatus status)当连接变成闲置状态的时候,此方法被调用。
exceptionCaught(IoSession session, Throwable cause)当 I/O 处理器的实现或是 Apache MINA 中有异常抛出的时候,此方法被调用。
messageReceived(IoSession session, Object message)当接收到新的消息的时候,此方法被调用。
messageSent(IoSession session, Object message)当消息被成功发送出去的时候,此方法被调用。
对于 表 6 中的方法,有几个需要重点的说明一下。首先是 sessionCreated 和 sessionOpened 的区别。sessionCreated方法是由 I/O 处理线程来调用的,而 sessionOpened 是由其它线程来调用的。因此从性能方面考虑,不要在 sessionCreated 方法中执行过多的操作。对于 sessionIdle,默认情况下,闲置时间设置是禁用的,也就是说 sessionIdle 并不会被调用。可以通过 IoSessionConfig.setIdleTime(IdleStatus, int) 来进行设置。

Apache MINA 中的基本概念已经介绍完了,下面介绍状态机的使用。

热点排行