首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

Mina 之 入门篇(一)

2014-06-07 
Mina 之 入门篇(1)最近自己在工作之余做一个金融类的项目(类似股票),采用mina开源框架进行服务端和客户端

Mina 之 入门篇(1)

最近自己在工作之余做一个金融类的项目(类似股票),采用mina开源框架进行服务端和客户端之间交互。开始学学mina。转入正题...

(1)首先需要引入以下几个包:

?

mina-core-2.0.x.jar,slf4j-api-1.6.x.jar,slf4j-log4j12-1.6.x.jar

这里需要注意下:slf4j-api-1.6.x.jar,slf4j-log4j12-1.6.x.jar 必须是同样二级版本号,即1.6版本,如果一个是1.5版本、一个是1.6版本,运行时会报错:java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format,原因是包冲突。

(2)创建一个主服务类:TradeSocket

public class TradeSocket extends SocketBase implements ISocket {.....@Override    public boolean initSocket(Config config) {        logger.info("正在初始化 tradeSocket");        TradeSocketConfig tradeConfig = (TradeSocketConfig) config;        this.setSocket_IP(tradeConfig.getIp());        logger.info("正在初始化 tradeSocket的Ip");        this.setSocket_port(tradeConfig.getPost());        logger.info("正在初始化 tradeSocket的端口");        this.setHandlerAdapter(new TradeSocketHandler());        logger.info("正在初始化 tradeSocket的处理器");        this.setAcceptor(new NioSocketAcceptor());        logger.info("正在初始化 tradeSocket的接收器");        logger.info("成功初始化 tradeSocket");        logger.info("tradeSocket服务IP:" + this.getSocket_IP());        logger.info("tradeSocket服务端口号:" + this.getSocket_port());        return true;    }    @Override    public boolean startSocket() {        boolean result = false;        if (this.isStart() == false) {            initSocket(this.tradeSocketConfig);            // 设置过滤器,ObjectSerializationCodecFactory可以对象序列化方式在服务端和客户端之间传递数据            this.getAcceptor().getFilterChain().addLast(DGConstants.SERVER_SOCKET_FILTER_CHIN, new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));             // 设置读取数据的缓冲区大小            this.getAcceptor().getSessionConfig().setReadBufferSize(2048);            // 设置长连接            this.getAcceptor().getSessionConfig().setKeepAlive(true);            // 读写通道10秒内无操作进入空闲状态            this.getAcceptor().getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);            this.getAcceptor().setHandler(this.getHandlerAdapter());            try {                this.getAcceptor().bind(new InetSocketAddress(this.getSocket_IP(), this.getSocket_port()));                this.setStart(true);                result = true;                logger.info("成功启动 tradeSocket");            } catch (IOException e) {                logger.error(e.getMessage(), e);            }        }        return result;    }    @Override    public boolean closeSocket() {        this.getAcceptor().dispose();        this.setStart(false);        logger.error("关闭了交易Socket");        return false;    }

?(3)创建一个handler来管理事件,是业务处理关注点

?

?

public class TradeSocketHandler extends IoHandlerAdapter {public static Logger logger = Logger.getLogger(TradeSocketHandler.class);    @Override    public void sessionCreated(IoSession session) throws Exception {        super.sessionCreated(session);        logger.info("服务端与客户端创建连接...");    }    @Override    public void sessionOpened(IoSession session) throws Exception {        InetSocketAddress remoteAddress = (InetSocketAddress) session.getRemoteAddress();        String clientIp = remoteAddress.getAddress().getHostAddress();        logger.info("服务器打开Session ID=" + session.getId());        logger.info( "服务端与客户端["+session.getRemoteAddress()+"]连接打开...");//        ConnectResponseMessage connectResponseMessage = new ConnectResponseMessage();//        connectResponseMessage.setConnectResponse(ConnectResponseMessage.ConnectSuccess);//        session.write(connectResponseMessage);                    }    @Override    public void sessionClosed(IoSession session) throws Exception {        super.sessionClosed(session);        logger.info("服务端与客户端断开连接");    }    @Override    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {        super.sessionIdle(session, status);        logger.info("服务端进入空闲状态...");    }    @Override    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {        logger.error(cause.getMessage(), cause);        super.exceptionCaught(session, cause);    }    @Override    public void messageReceived(IoSession session, Object message) throws Exception {        if (message instanceof BaseMessage) {            BaseMessage baseMessage = (BaseMessage) message;            logger.info("服务器收到的消息为:" + baseMessage);            if (baseMessage.getMsgType() == MessageType.LoginMessage_Type) {                LoginMessage loginMessage = (LoginMessage) baseMessage;                UserConnect userConnect = new UserConnect();                userConnect.setTradeSession(session);                loginMessage.setUserid(loginMessage.getUserid() + session.hashCode());                ServerCache cache = ServerCache.getInstance();                cache.addUserConnect(loginMessage.getUserid(), userConnect);                userConnect.getTradeSession().setAttribute("userID", loginMessage.getUserid());                userConnect.getTradeSession().setAttribute("sessionID", session.hashCode());                //这里主要是处理各种业务                ServerReceivedMessageProcess.getInstance().processMessage(loginMessage);            }        }    }    @Override    public void messageSent(IoSession session, Object message) throws Exception {        super.messageSent(session, message);    }

?ServerReceivedMessageProcess.getInstance().processMessage(.....);方法是根据从客户端传过来不同message,实例化出不同处理类。

(4)创建客户端主类

?

public class ClientTradeSocket extends ClientSocketBase {    private static Logger logger = Logger.getLogger(ClientTradeSocket.class);    private static ClientTradeSocket instance;    public static ClientTradeSocket getInstance() {        if (instance == null) {            instance = new ClientTradeSocket();        }        return instance;    }    private ClientTradeSocket() {    }    @Override    public boolean closedSocket() {        this.getConnector().dispose();        this.setStart(false);        logger.info("客户关闭了与服务器的连接");        return true;    }    @Override    public void initSocket() {        this.setHandlerAdapter(new ClientTradeHandler());        this.setConnector(new NioSocketConnector());        logger.info("客户初始化了与服务器的连接");    }    @Override    public void startSocket() {        this.initSocket();        try {            if (this.isStart() == false) {                /**                 * 创建接收数据的过滤器                 */                DefaultIoFilterChainBuilder chain = this.getConnector().getFilterChain();                chain.addLast(DGConstants.SERVER_SOCKET_FILTER_CHIN, new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));                this.getConnector().setHandler(this.getHandlerAdapter());                /**                 * 设置连接超时的时间 为:一分钟 这个超时不可以过小                 */                this.getConnector().setConnectTimeoutMillis(ClientSocketBase.timeOutMillis);                logger.info("ip/port:" + this.getServer_IP_Port() + "/" + this.getServer_Socket_Port());                InetSocketAddress socketAddress = new InetSocketAddress(this.getServer_IP_Port(), this.getServer_Socket_Port());                /**                 * 建立连接                 */                this.connectFuture = this.getConnector().connect(socketAddress);                //等待连接,这句话比较重要,如果没有,则后面的this.connectFuture.isConnected()为false的。                //原因在于:connect方法是异步的,awaitUninterruptibly方式阻塞主线程,等待服务端返回,从而实现同步                //如果不用awaitUninterruptibly,可以采用this.connectFuture.addListener方法进行回调                 this.connectFuture.awaitUninterruptibly();                //连接后,获得session                if (this.connectFuture.isConnected()) {                    logger.info("已经建立用户连接");                    this.setStart(true);                    logger.info("客户开启了与服务器的连接");                    ConnectSession session = ConnectSession.getInstance();                    //将session保存到缓存中                    session.setTradeSession(this.connectFuture.getSession());                } else {                    logger.info("不能建立用户连接");                    this.setStart(false);                    ConnectSession session = ConnectSession.getInstance();                    session.setTradeSession(null);                }            }        } catch (Exception e) {            logger.error(e.getMessage(), e);            this.setStart(false);        }    }}

?(5)客户端业务处理类handler

?

?

public class ClientTradeHandler extends IoHandlerAdapter {    private static Logger logger = Logger.getLogger(ClientTradeHandler.class);    private String user;    public void setUser(String user) {        this.user = user;    }    public String getUser() {        return this.user;    }    /**     *     * 当接口中其他方法抛出异常未被捕获时触发此方法     */    @Override    public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception {    }    /**     * 当接收到消息后被触发     *     */    @Override    public void messageReceived(IoSession ioSession, Object msg) throws Exception {        BaseMessage message = (BaseMessage) msg;        logger.info("接受到得消息为:" + message);        if(message!=null) {            if(message instanceof ConnectResponseMessage) {                ConnectResponseMessage connectResponseMessage = (ConnectResponseMessage)message;                if(connectResponseMessage.getConnectResponse() == ConnectResponseMessage.ConnectSuccess) {                     ConnectSession clientConnect = ConnectSession.getInstance();                    clientConnect.setTradeSession(ioSession);                }            } else {                //根据服务端返回的message,实例化不同处理类                ClientReceivedMessageProcess messageProcess = new ClientReceivedMessageProcess();                messageProcess.processMessage(message);            }        }           }    /**     * 当发送消息后被触发     *     */    @Override    public void messageSent(IoSession ioSession, Object msg) throws Exception {    }    /**     * 当会话关闭时被触发     *     */    @Override    public void sessionClosed(IoSession ioSession) throws Exception {        logger.info("与服务器断开连接!!");//        EIMTrayIcon trayIcon = EIMTrayIcon.getInStance();//        trayIcon.showIcon(EIMClientConfig.OffLineTryIcon_Type);    }    /**     * 当会话创建时被触发     *     */    @Override    public void sessionCreated(IoSession ioSession) throws Exception {    }    /**     *     * 当会话空闲时被触发     */    @Override    public void sessionIdle(IoSession ioSession, IdleStatus msg) throws Exception {    }    /**     *     * 当会话开始时被触发     */    @Override    public void sessionOpened(IoSession ioSession) throws Exception {        logger.info("会话已经打开");    }}

?

最后,可以进行运行测试了。。。。

热点排行