Mina 之 入门篇(1)
最近自己在工作之余做一个金融类的项目(类似股票),采用mina开源框架进行服务端和客户端之间交互。开始学学mina。转入正题...
(1)首先需要引入以下几个包:
?
mina-core-2.0.x.jar,slf4j-api-1.6.x.jar,slf4j-log4j12-1.6.x.jar
这里需要注意下:slf4j-api-1.6.x.jar,slf4j-log4j12-1.6.x.jar 必须是同样二级版本号,即1.6版本,如果一个是1.5版本、一个是1.6版本,运行时会报错:java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format,原因是包冲突。
(2)创建一个主服务类:TradeSocket
public class TradeSocket extends SocketBase implements ISocket {.....@Override public boolean initSocket(Config config) { logger.info("正在初始化 tradeSocket"); TradeSocketConfig tradeConfig = (TradeSocketConfig) config; this.setSocket_IP(tradeConfig.getIp()); logger.info("正在初始化 tradeSocket的Ip"); this.setSocket_port(tradeConfig.getPost()); logger.info("正在初始化 tradeSocket的端口"); this.setHandlerAdapter(new TradeSocketHandler()); logger.info("正在初始化 tradeSocket的处理器"); this.setAcceptor(new NioSocketAcceptor()); logger.info("正在初始化 tradeSocket的接收器"); logger.info("成功初始化 tradeSocket"); logger.info("tradeSocket服务IP:" + this.getSocket_IP()); logger.info("tradeSocket服务端口号:" + this.getSocket_port()); return true; } @Override public boolean startSocket() { boolean result = false; if (this.isStart() == false) { initSocket(this.tradeSocketConfig); // 设置过滤器,ObjectSerializationCodecFactory可以对象序列化方式在服务端和客户端之间传递数据 this.getAcceptor().getFilterChain().addLast(DGConstants.SERVER_SOCKET_FILTER_CHIN, new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); // 设置读取数据的缓冲区大小 this.getAcceptor().getSessionConfig().setReadBufferSize(2048); // 设置长连接 this.getAcceptor().getSessionConfig().setKeepAlive(true); // 读写通道10秒内无操作进入空闲状态 this.getAcceptor().getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); this.getAcceptor().setHandler(this.getHandlerAdapter()); try { this.getAcceptor().bind(new InetSocketAddress(this.getSocket_IP(), this.getSocket_port())); this.setStart(true); result = true; logger.info("成功启动 tradeSocket"); } catch (IOException e) { logger.error(e.getMessage(), e); } } return result; } @Override public boolean closeSocket() { this.getAcceptor().dispose(); this.setStart(false); logger.error("关闭了交易Socket"); return false; }
?(3)创建一个handler来管理事件,是业务处理关注点
?
?
public class TradeSocketHandler extends IoHandlerAdapter {public static Logger logger = Logger.getLogger(TradeSocketHandler.class); @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); logger.info("服务端与客户端创建连接..."); } @Override public void sessionOpened(IoSession session) throws Exception { InetSocketAddress remoteAddress = (InetSocketAddress) session.getRemoteAddress(); String clientIp = remoteAddress.getAddress().getHostAddress(); logger.info("服务器打开Session ID=" + session.getId()); logger.info( "服务端与客户端["+session.getRemoteAddress()+"]连接打开...");// ConnectResponseMessage connectResponseMessage = new ConnectResponseMessage();// connectResponseMessage.setConnectResponse(ConnectResponseMessage.ConnectSuccess);// session.write(connectResponseMessage); } @Override public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); logger.info("服务端与客户端断开连接"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { super.sessionIdle(session, status); logger.info("服务端进入空闲状态..."); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error(cause.getMessage(), cause); super.exceptionCaught(session, cause); } @Override public void messageReceived(IoSession session, Object message) throws Exception { if (message instanceof BaseMessage) { BaseMessage baseMessage = (BaseMessage) message; logger.info("服务器收到的消息为:" + baseMessage); if (baseMessage.getMsgType() == MessageType.LoginMessage_Type) { LoginMessage loginMessage = (LoginMessage) baseMessage; UserConnect userConnect = new UserConnect(); userConnect.setTradeSession(session); loginMessage.setUserid(loginMessage.getUserid() + session.hashCode()); ServerCache cache = ServerCache.getInstance(); cache.addUserConnect(loginMessage.getUserid(), userConnect); userConnect.getTradeSession().setAttribute("userID", loginMessage.getUserid()); userConnect.getTradeSession().setAttribute("sessionID", session.hashCode()); //这里主要是处理各种业务 ServerReceivedMessageProcess.getInstance().processMessage(loginMessage); } } } @Override public void messageSent(IoSession session, Object message) throws Exception { super.messageSent(session, message); }
?ServerReceivedMessageProcess.getInstance().processMessage(.....);方法是根据从客户端传过来不同message,实例化出不同处理类。
(4)创建客户端主类
?
public class ClientTradeSocket extends ClientSocketBase { private static Logger logger = Logger.getLogger(ClientTradeSocket.class); private static ClientTradeSocket instance; public static ClientTradeSocket getInstance() { if (instance == null) { instance = new ClientTradeSocket(); } return instance; } private ClientTradeSocket() { } @Override public boolean closedSocket() { this.getConnector().dispose(); this.setStart(false); logger.info("客户关闭了与服务器的连接"); return true; } @Override public void initSocket() { this.setHandlerAdapter(new ClientTradeHandler()); this.setConnector(new NioSocketConnector()); logger.info("客户初始化了与服务器的连接"); } @Override public void startSocket() { this.initSocket(); try { if (this.isStart() == false) { /** * 创建接收数据的过滤器 */ DefaultIoFilterChainBuilder chain = this.getConnector().getFilterChain(); chain.addLast(DGConstants.SERVER_SOCKET_FILTER_CHIN, new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); this.getConnector().setHandler(this.getHandlerAdapter()); /** * 设置连接超时的时间 为:一分钟 这个超时不可以过小 */ this.getConnector().setConnectTimeoutMillis(ClientSocketBase.timeOutMillis); logger.info("ip/port:" + this.getServer_IP_Port() + "/" + this.getServer_Socket_Port()); InetSocketAddress socketAddress = new InetSocketAddress(this.getServer_IP_Port(), this.getServer_Socket_Port()); /** * 建立连接 */ this.connectFuture = this.getConnector().connect(socketAddress); //等待连接,这句话比较重要,如果没有,则后面的this.connectFuture.isConnected()为false的。 //原因在于:connect方法是异步的,awaitUninterruptibly方式阻塞主线程,等待服务端返回,从而实现同步 //如果不用awaitUninterruptibly,可以采用this.connectFuture.addListener方法进行回调 this.connectFuture.awaitUninterruptibly(); //连接后,获得session if (this.connectFuture.isConnected()) { logger.info("已经建立用户连接"); this.setStart(true); logger.info("客户开启了与服务器的连接"); ConnectSession session = ConnectSession.getInstance(); //将session保存到缓存中 session.setTradeSession(this.connectFuture.getSession()); } else { logger.info("不能建立用户连接"); this.setStart(false); ConnectSession session = ConnectSession.getInstance(); session.setTradeSession(null); } } } catch (Exception e) { logger.error(e.getMessage(), e); this.setStart(false); } }}
?(5)客户端业务处理类handler
?
?
public class ClientTradeHandler extends IoHandlerAdapter { private static Logger logger = Logger.getLogger(ClientTradeHandler.class); private String user; public void setUser(String user) { this.user = user; } public String getUser() { return this.user; } /** * * 当接口中其他方法抛出异常未被捕获时触发此方法 */ @Override public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception { } /** * 当接收到消息后被触发 * */ @Override public void messageReceived(IoSession ioSession, Object msg) throws Exception { BaseMessage message = (BaseMessage) msg; logger.info("接受到得消息为:" + message); if(message!=null) { if(message instanceof ConnectResponseMessage) { ConnectResponseMessage connectResponseMessage = (ConnectResponseMessage)message; if(connectResponseMessage.getConnectResponse() == ConnectResponseMessage.ConnectSuccess) { ConnectSession clientConnect = ConnectSession.getInstance(); clientConnect.setTradeSession(ioSession); } } else { //根据服务端返回的message,实例化不同处理类 ClientReceivedMessageProcess messageProcess = new ClientReceivedMessageProcess(); messageProcess.processMessage(message); } } } /** * 当发送消息后被触发 * */ @Override public void messageSent(IoSession ioSession, Object msg) throws Exception { } /** * 当会话关闭时被触发 * */ @Override public void sessionClosed(IoSession ioSession) throws Exception { logger.info("与服务器断开连接!!");// EIMTrayIcon trayIcon = EIMTrayIcon.getInStance();// trayIcon.showIcon(EIMClientConfig.OffLineTryIcon_Type); } /** * 当会话创建时被触发 * */ @Override public void sessionCreated(IoSession ioSession) throws Exception { } /** * * 当会话空闲时被触发 */ @Override public void sessionIdle(IoSession ioSession, IdleStatus msg) throws Exception { } /** * * 当会话开始时被触发 */ @Override public void sessionOpened(IoSession ioSession) throws Exception { logger.info("会话已经打开"); }}
?
最后,可以进行运行测试了。。。。