初JMS(java消息服务发布/订阅demo_01)
Hi
今天那个实例来介绍下JMS发布/订阅 .现在网络聊天的需求可以完美的映射到发布.订阅消息的传达模型之上的。
实例:、如果要运行Chat。需要使用支持JNDI和JMS的JMS提供者。我使用的开源JMS提供者:ActiveMq (版本apache-activemq-5.5.0)
apache-activemq-5.5.0下载地址
http://activemq.apache.org/activemq-550-release.html
下载后运行D:\Jar\apache-activemq-5.5.0\bin\activemq.bat
访问http://localhost:8161/admin/
页面显示如下表示服务开启
Welcome to the ActiveMQ Console of localhost (ID:dnepc504-1186-1303391964578-0:1) You can find more information about ActiveMQ on the Apache ActiveMQ Site
package iteye.JMS_Exception.chat;import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.Properties;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicPublisher;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import javax.naming.InitialContext;/** * Chat 类自身实现了MessageListener接口和onMessage方法 * Email: JMS_Exception@hotmail.com * @author PC504 * */public class Chat implements javax.jms.MessageListener{ private TopicSession pubSession; private TopicPublisher publisher; private TopicConnection connection; private String username; /* 初始化Chat */ public Chat(String topicFactory, String topicName, String username) throws Exception { // 设置JNDI连接参数 Properties env = new Properties(); env.put(javax.naming.Context.SECURITY_PRINCIPAL, "system"); env.put(javax.naming.Context.SECURITY_CREDENTIALS, "manager"); env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory"); env.put(javax.naming.Context.PROVIDER_URL,"tcp://127.0.0.1:61616"); //实例化InitialContext对象以后 //就可以使用它在消息传送服务器的命名服务中查找TopicConnectionFactory InitialContext ctx = new InitialContext(env); //创建到JMS提供者的发布/订阅的 TopicConnection TopicConnectionFactory conFactory = (TopicConnectionFactory)ctx.lookup(topicFactory); //客户端使用一个 TopicConnectionFactory 对象 //来创建到JMS提供者的发布/订阅的 TopicConnection TopicConnection connection = conFactory.createTopicConnection(); // TopicSession 用于创建Message.TopicPublisher .TopicSubscriber对象的工厂.我们创建两个JMS会话对象 //false意味创建的TopicSession将不是事务性的。 //AUTO_ACKNOWLEDGE 意味消息将在客户端接收之后自动确定 TopicSession pubSession = connection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); //pubSession.createDurableSubscriber(topic, topicName) TopicSession subSession = connection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); // 查找JMS主题 Topic chatTopic = (Topic)ctx.lookup(topicName); // 创建JSM 发布/订阅 TopicPublisher publisher = pubSession.createPublisher(chatTopic); TopicSubscriber subscriber = subSession.createSubscriber(chatTopic, null, true); //设置一个JMS消息监听器 subscriber.setMessageListener(this); this.connection = connection; this.pubSession = pubSession; this.publisher = publisher; this.username = username; // 启动JMS连接.允许传达消息 connection.start( ); } /* 接收来自TopicSubscriber的消息 * 当TopicSubscriber从它的主题接收一条消息, * 调用了它的MessageListener对象的onMessage()方法 * */ public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText( ); System.out.println(text); } catch (JMSException jmse){ jmse.printStackTrace( ); } } /*使用发布者创建并发送消息 */ protected void writeMessage(String text) throws JMSException { //TextMessage类型携带了一个String作为有效负载. //生产出"准备就绪可传达的TextMessage对象" TextMessage message = pubSession.createTextMessage( ); //MapMessage mapMessage = pubSession.createMapMessage(); message.setText(username+":"+text); System.out.println(message.getText()); publisher.publish(message); } /* 关闭JMS连接 */ public void close( ) throws JMSException { connection.close( ); } /* 运行Main */ public static void main(String [] args){ try{ if (args.length!=3) System.out.println("Factory, Topic, or username missing"); Chat chat = new Chat(args[0],args[1],args[2]); // 从命令行读取 BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in)); // 键入"exit" 停止运行 while(true){ String s = commandLine.readLine( ); if (s.equalsIgnoreCase("exit")){ chat.close( ); System.exit(0); } else chat.writeMessage(s); } } catch (Exception e){ e.printStackTrace( ); } }}