首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

施用activeMQ实现jms

2012-08-08 
使用activeMQ实现jms感谢作者分享 转自 http://blog.sina.com.cn/s/blog_4b5bc0110100kb8d.html一:jms介绍

使用activeMQ实现jms

感谢作者分享 转自 http://blog.sina.com.cn/s/blog_4b5bc0110100kb8d.html

一:jms介绍
???????? jms说白了就是java message service,是J2EE规范的一部分,跟jdbc差不多,sun只提供了接口,由各个厂商(provider)来进行具体的实现,然后使用者使用他们的jar包进行开发使用即可。
??????? 另外在jms的API中,jms传递消息有两种方式,一种是点对点的Queue,还有一个是发布订阅的Topic方式。区别在于:
??????? 对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。
??????? 对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。
?????? 关于api的简单基础可以看下:http://www.javaeye.com/topic/64707,简单的参考!
二:ActiveMQ介绍
???????? activeMQ是apache下的一个开源jms产品,具体参见 apache官方网站;
???????? Apache ActiveMQ is fast, supports many? Cross Language Clients and Protocols , comes with easy to use? Enterprise Integration Patterns?? and many? advanced features?? while fully supporting? JMS 1.1?? and J2EE 1.4. Apache ActiveMQ is released under the? Apache?? 2.0 License
三:开始实现代码
?????? 1:使用activeMQ来完成jms的发送,必须要下载activeMQ,然后再本机安装,并且启动activeMQ的服务才行。在官网下载完成之后,运行bin目录下面的activemq.bat,将activeMQ成功启动。
?????? 启动成功之后可以运行:http://localhost:8161/admin/index.jsp? 查看一下。
?????? 2:发送端,sender
import javax.jms.Connection;??
import javax.jms.ConnectionFactory;??
import javax.jms.DeliveryMode;??
import javax.jms.Destination;??
import javax.jms.MessageProducer;??
import javax.jms.Session;??
import javax.jms.TextMessage;??
?
import org.apache.activemq.ActiveMQConnection;??
import org.apache.activemq.ActiveMQConnectionFactory;??
?
public class Sender {??
??? private static final int SEND_NUMBER = 5;??
?
??? public static void main(String[] args) {??
??????? // ConnectionFactory :连接工厂,JMS 用它创建连接??
??????? ConnectionFactory connectionFactory;??
??????? // Connection :JMS 客户端到JMS Provider 的连接??
??????? Connection connection = null;??
??????? // Session: 一个发送或接收消息的线程??
??????? Session session;??
??????? // Destination :消息的目的地;消息发送给谁.??
??????? Destination destination;??
??????? // MessageProducer:消息发送者??
??????? MessageProducer producer;??
??????? // TextMessage message;??
??????? // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar??
?
??????? connectionFactory = new ActiveMQConnectionFactory(??
??????????????? ActiveMQConnection.DEFAULT_USER,??
??????????????? ActiveMQConnection.DEFAULT_PASSWORD,??
??????????????? "tcp://localhost:61616");??
??????? try {??
??????????? // 构造从工厂得到连接对象??
??????????? connection = connectionFactory.createConnection();??
??????????? // 启动??
??????????? connection.start();??
??????????? // 获取操作连接??
??????????? session = connection.createSession(Boolean.TRUE,??
??????????????????? Session.AUTO_ACKNOWLEDGE);??
??????????? // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置??
??????????? destination = session.createQueue("test-queue");??
??????????? // 得到消息生成者【发送者】??
??????????? producer = session.createProducer(destination);??
??????????? // 设置不持久化,可以更改??
??????????? producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);??
??????????? // 构造消息??
??????????? sendMessage(session, producer);??
??????????? session.commit();??
?
??????? } catch (Exception e) {??
??????????? e.printStackTrace();??
??????? } finally {??
??????????? try {??
??????????????? if (null != connection)??
??????????????????? connection.close();??
??????????? } catch (Throwable ignore) {??
??????????? }??
??????? }??
?
??? }??
??? public static void sendMessage(Session session, MessageProducer producer)??
??????????? throws Exception {??
??????? for (int i = 1; i <=SEND_NUMBER; i++) {??
??????????? TextMessage message = session??
??????????????????? .createTextMessage("ActiveMq 发送的消息" + i);??
??????????? // 发送消息到目的地方??
??????????? System.out.println("发送消息:" + i);??
??????????? producer.send(message);??
??????? }??
??? }??
}??
?
????
?????? 3:接收端,receive??
?
???
?
import javax.jms.Connection;??
import javax.jms.ConnectionFactory;??
import javax.jms.Destination;??
import javax.jms.JMSException;??
import javax.jms.Message;??
import javax.jms.MessageConsumer;??
import javax.jms.MessageListener;??
import javax.jms.Session;??
import javax.jms.TextMessage;??
?
import org.apache.activemq.ActiveMQConnection;??
import org.apache.activemq.ActiveMQConnectionFactory;??
?
public class Receiver {??
??? public static void main(String[] args) {??
?
??????? // ConnectionFactory :连接工厂,JMS 用它创建连接??
??????? ConnectionFactory connectionFactory;??
??????? // Connection :JMS 客户端到JMS Provider 的连接??
??????? Connection connection = null;??
??????? // Session: 一个发送或接收消息的线程??
??????? Session session;??
??????? // Destination :消息的目的地;消息发送给谁.??
??????? Destination destination;??
??????? // 消费者,消息接收者??
??????? MessageConsumer consumer;??
?
??????? connectionFactory = new ActiveMQConnectionFactory(??
??????????????? ActiveMQConnection.DEFAULT_USER,??
??????????????? ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");??
??????? try {??
??????????? // 构造从工厂得到连接对象??
??????????? connection = connectionFactory.createConnection();??
??????????? // 启动??
??????????? connection.start();??
??????????? // 获取操作连接??
??????????? session = connection.createSession(Boolean.FALSE,??
??????????????????? Session.AUTO_ACKNOWLEDGE);??
??????????? //test-queue跟sender的保持一致,一个创建一个来接收??
??????????? destination = session.createQueue("test-queue");??
??????????? consumer = session.createConsumer(destination);??
??????????? consumer.setMessageListener(new MessageListener() {??
??????????????? public void onMessage(Message arg0) {??
??????????????????? System.out.println("==================");??
??????????????????? try {??
??????????????????????? System.out.println("RECEIVE1第一个获得者:"?
??????????????????????????????? + ((TextMessage) arg0).getText());??
??????????????????? } catch (JMSException e) {??
??????????????????????? // TODO Auto-generated catch block??
??????????????????????? e.printStackTrace();??
??????????????????? }??
?
??????????????? }??
??????????? });??
?
??????????? MessageConsumer consumer1 = session.createConsumer(destination);??
??????????? consumer1.setMessageListener(new MessageListener() {??
??????????????? public void onMessage(Message arg0) {??
??????????????????? System.out.println("+++++++++++++++++++");??
??????????????????? try {??
??????????????????????? System.out.println("RECEIVE1第二个获得者:"?
??????????????????????????????? + ((TextMessage) arg0).getText());??
??????????????????? } catch (JMSException e) {??
??????????????????????? // TODO Auto-generated catch block??
??????????????????????? e.printStackTrace();??
??????????????????? }??
?
??????????????? }??
??????????? });??
??????? } catch (Exception e) {??
??????????? e.printStackTrace();??
??????? }??
??????? //在eclipse里运行的时候,这里不要关闭,这样就可以一直等待服务器发送了,不然就直接结束了。??
??????? // } finally {??
??????? // try {??
??????? // if (null != connection)??
??????? // connection.close();??
??????? // } catch (Throwable ignore) {??
??????? // }??
??????? // }??
?
??? }??
}??
?
????
???
?
?????? 4:发送端,sender 上面的是用Queue的方式来创建,下面再用topic的方式实现同样的功能。??
?
???
?
import javax.jms.Connection;??
import javax.jms.JMSException;??
import javax.jms.Message;??
import javax.jms.MessageConsumer;??
import javax.jms.MessageListener;??
import javax.jms.MessageProducer;??
import javax.jms.Session;??
import javax.jms.TextMessage;??
import javax.jms.Topic;??
?
import org.apache.activemq.ActiveMQConnectionFactory;??
import org.apache.activemq.command.ActiveMQTopic;??
?
public class TopicTest {??
??? public static void main(String[] args) throws Exception {??
??????? ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(??
??????????????? "tcp://localhost:61616");??
?
??????? Connection connection = factory.createConnection();??
??????? connection.start();??
?
??????? // 创建一个Topic??
??????? Topic topic = new ActiveMQTopic("testTopic");??
??????? Session session = connection.createSession(false,??
??????????????? Session.AUTO_ACKNOWLEDGE);??
?
??????? // 注册消费者1??
??????? MessageConsumer comsumer1 = session.createConsumer(topic);??
??????? comsumer1.setMessageListener(new MessageListener() {??
??????????? public void onMessage(Message m) {??
??????????????? try {??
??????????????????? System.out.println("Consumer1 get "?
??????????????????????????? + ((TextMessage) m).getText());??
??????????????? } catch (JMSException e) {??
??????????????????? e.printStackTrace();??
??????????????? }??
??????????? }??
??????? });??
?
??????? // 注册消费者2??
??????? MessageConsumer comsumer2 = session.createConsumer(topic);??
??????? comsumer2.setMessageListener(new MessageListener() {??
??????????? public void onMessage(Message m) {??
??????????????? try {??
??????????????????? System.out.println("Consumer2 get "?
??????????????????????????? + ((TextMessage) m).getText());??
??????????????? } catch (JMSException e) {??
??????????????????? e.printStackTrace();??
??????????????? }??
??????????? }??
?
??????? });??
?
??????? // 创建一个生产者,然后发送多个消息。??
??????? MessageProducer producer = session.createProducer(topic);??
??????? for (int i = 0; i < 10; i++) {??
??????????? System.out.println("producer begin produce=======");??
??????????? producer.send(session.createTextMessage("Message:" + i));??
??????? }??
??? }??
?
}?

?

热点排行