首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > Apache >

Apache ActiveMQ Queue Topic 详解 课程 加入代码解释说明

2012-12-24 
Apache ActiveMQ Queue Topic详解 教程 加入代码解释说明一、特性及优势1、实现 JMS1.1 规范,支持 J2EE1.4以

Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明

一、特性及优势


1、实现 JMS1.1 规范,支持 J2EE1.4以上
2、可运行于任何 jvm和大部分 web 容器(ActiveMQ works great in any JVM)
3、支持多种语言客户端(java, C, C++, AJAX, ACTIONSCRIPT 等等)
4、支持多种协议(stomp,openwire,REST)
5、良好的 spring 支持(ActiveMQ has great Spring Support)
6、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than
JBossMQ.)
7、与 OpenJMS、JbossMQ等开源jms provider 相比,ActiveMQ有 Apache 的支
持,持续发展的优势明显。

?

二、下载部署

1、下载
http://activemq.apache.org/activemq-510-release.html,下载 5.1.0 Windows
Distribution版本
2、安装
直接解压至任意目录(如:d:\ apache-activemq-5.1.0)
3、启动 ActiveMQ服务器
方法 1:
直接运行 bin\activemq.bat
方法 2(在 JVM 中嵌套启动):
cd example
ant embedBroker
4、ActiveMQ消息管理后台系统:
http://localhost:8161/admin

?

?

三、运行附带的示例程序

1、Queue 消息示例:(点对点)
*? 启动 Queue 消息消费者
cd example ant consumer
*? 启动 Queue 消息生产者
cd example
ant producer
简要说明:生产者(producer)发消息,消费者(consumer)接消息,发送/接
收 2000 个消息后自动关闭
2、Topic 消息示例:(群组订阅)
*? 启动 Topic 消息消费者
cd example
ant topic-listener
*? 启动 Topic 消息生产者
cd example
ant topic-publisher
简要说明:重复 10 轮,publisher每轮发送2000 个消息,并等待获取 listener
的处理结果报告,然后进入下一轮发送,最后统计全局发送时间。


四、Queue与 Topic 的比较


1、JMS Queue 执行 load balancer语义:
一条消息仅能被一个 consumer(消费者) 收到。如果在 message 发送的时候没有可用的
consumer,那么它将被保存一直到能处理该 message 的 consumer 可用。如果一
个 consumer 收到一条 message 后却不响应它,那么这条消息将被转到另一个
consumer 那儿。一个 Queue 可以有很多 consumer,并且在多个可用的 consumer
中负载均衡。?

?

?

注:


点对点消息传递域的特点如下:
?? 每个消息只能有一个消费者。?
?? 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发
送消息的时候是否处于运行状态,它都可以提取消息。


2、Topic 实现 publish和 subscribe 语义:
一条消息被 publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber
将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的
subscriber能够获得消息的一个拷贝。

?

注:

?

发布/订阅消息传递域的特点如下:
?? 每个消息可以有多个消费者。?
?? 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费
自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程
度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激
活状态时发送的消息。


3、分别对应两种消息模式:
Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者) 其中在 Publicher/Subscriber 模式下又有Nondurable subscription(非持久订阅)
和 durable subscription (持久化订阅)2种消息处理方式(支持离线消息)。

?

注:

在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递
域中,目的地被成为主题(topic)。


五、Point-to-Point (点对点)消息模式开发流程


1、生产者(producer)开发流程(ProducerTool.java):

1.1? 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection。

?

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);            connection = connectionFactory.createConnection();            connection.start();
?

?

1.2? 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。

?
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
?


1.3? 创建 Destination对象:
需指定其对应的主题(subject)名称,producer 和 consumer 将根据 subject
来发送/接收对应的消息。

?

            if (topic) {                destination = session.createTopic(subject);            } else {                destination = session.createQueue(subject);            }
?


1.4? 创建 MessageProducer:
根据 Destination创建MessageProducer 对象,同时设置其持久模式。

?

            MessageProducer producer = session.createProducer(destination);         ?


1.5? 发送消息到队列(Queue):
封装 TextMessage 消息, 使用 MessageProducer 的 send 方法将消息发送出去。

?

            TextMessage message = session.createTextMessage(createMessageText(i));            producer.send(message);

?


2、消费者(consumer)开发流程(ConsumerTool.java):
2.1? 实现 MessageListener 接口:
消费者类必须实现MessageListener 接口,然后在onMessage()方法中监听消息的
到达并处理。

?

public class ConsumerTool extends Thread implements MessageListener, ExceptionListener

?实现 onMessage(Message message)方法,实现监听消息的到达


2.2? 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection,如果是durable 模式,
还需要给 connection设置一个 clientId。

?

            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);            Connection connection = connectionFactory.createConnection();            //是否是 durable 模式.(离线消息持久化支持)             if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {                connection.setClientID(clientId);            }            connection.setExceptionListener(this);            connection.start();
?


2.3? 创建 Session 和 Destination:
与 ProducerTool.java 中的流程类似,不再赘述。

?

 session = connection.createSession(transacted, ackMode);

?

2.4 创建 replyProducer【可选】:
可以用来将消息处理结果发送给 producer。

2.5? 创建 MessageConsumer:?
根据 Destination创建MessageConsumer 对象。

?

            MessageConsumer consumer = null;            if (durable && topic) {                consumer = session.createDurableSubscriber((Topic) destination, consumerName);            } else {                consumer = session.createConsumer(destination);            }
?

?

2.6? 消费 message:
?在 onMessage()方法中接收producer 发送过来的消息进行处理,并可以通过
replyProducer 反馈信息给 producer

?

?
            if (message.getJMSReplyTo() != null) {                replyProducer.send(message.getJMSReplyTo()                                               , session.createTextMessage("Reply: "                                                                                          + message.getJMSMessageID()));            }
?


六、Publisher/Subscriber(发布/订阅者)消息模式开发流程


1、订阅者(Subscriber)开发流程(TopicListener.java):

1.1? 实现 MessageListener 接口:
在 onMessage()方法中监听发布者发出的消息队列,并做相应处理。

?

    public void onMessage(Message message) {        if (checkText(message, "SHUTDOWN")) {            try {                connection.close();            } catch (Exception e) {                e.printStackTrace(System.out);            }        } else if (checkText(message, "REPORT")) {            // send a report:            try {                long time = System.currentTimeMillis() - start;                String msg = "Received " + count + " in " + time + "ms";                producer.send(session.createTextMessage(msg));            } catch (Exception e) {                e.printStackTrace(System.out);            }            count = 0;        } else {            if (count == 0) {                start = System.currentTimeMillis();            }            if (++count % 1000 == 0) {                System.out.println("Received " + count + " messages.");            }        }    }
?


1.2? 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection。

?

?
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);        connection = factory.createConnection();
?


1.3? 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。

?

       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
?


1.4? 创建 Topic:
?创建 2 个Topic,? topictest.messages用于接收发布者发出的消息,
topictest.control用于向发布者发送消息,实现双方的交互。

?

        topic = session.createTopic("topictest.messages");        control = session.createTopic("topictest.control");
?


1.5? 创建 consumer 和 producer 对象:
?根据topictest.messages创建consumer,根据topictest.control创建producer。

?

        MessageConsumer consumer = session.createConsumer(topic);//创建消费者        consumer.setMessageListener(this);        connection.start();        producer = session.createProducer(control);//创建生产者
?


1.6? 接收处理消息:
?在 onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消
息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作
等等),并且可以使用 producer对象将处理结果返回给发布者。

?

    //可以先检查消息类型private static boolean checkText(Message m, String s) {        try {            return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);        } catch (JMSException e) {            e.printStackTrace(System.out);            return false;        }    }
?
//然后  if (checkText(message, "SHUTDOWN")) {          //关机        } else if (checkText(message, "REPORT")) {            // 打印                  } else {            //别的操作                 }
?


2、发布者(Publisher)开发流程(TopicPublisher.java):


2.1? 实现 MessageListener 接口:
在 onMessage()方法中接收订阅者的反馈消息。

?

    public void onMessage(Message message) {        synchronized (mutex) {            System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");            if (remaining == 0) {                mutex.notify();            }        }    }
?


2.2? 创建 Connection:
根据 url? 创建一个 jms Connection。

?

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);        connection = factory.createConnection();
?

?

2.3? 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。

?

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
?


2.4? 创建 Topic:
?创建 2 个Topic,topictest.messages用于向订阅者发布消息,topictest.control用
于接收订阅者反馈的消息。这2个topic与订阅者开发流程中的topic是一一对应
的。

?

        topic = session.createTopic("topictest.messages");        control = session.createTopic("topictest.control");

?

2.5? 创建 consumer 和 producer 对象:
?根据topictest.messages创建publisher;
根据topictest.control创建consumer,同时监听订阅者反馈的消息。

?

        publisher = session.createProducer(topic);        publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化模式        session.createConsumer(control).setMessageListener(this);//加入监听        connection.start();
?


2.6? 给所有订阅者发送消息,并接收反馈消息:
?示例代码中,一共重复 10 轮操作。

?

        for (int i = 0; i < batch; i++) {            if (i > 0) {                Thread.sleep(delay * 1000);            }            times[i] = batch(messages);            System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");        }
?


每轮先向所有订阅者发送 2000 个消息;

?

    private long batch(int msgCount) throws Exception {        long start = System.currentTimeMillis();        remaining = subscribers;        publish();        waitForCompletion();        return System.currentTimeMillis() - start;    }

?

?

    private void publish() throws Exception {        // send events        BytesMessage msg = session.createBytesMessage();        msg.writeBytes(payload);        for (int i = 0; i < messages; i++) {            publisher.send(msg);            if ((i + 1) % 1000 == 0) {                System.out.println("Sent " + (i + 1) + " messages");            }        }        // request report        publisher.send(session.createTextMessage("REPORT"));    }

?

然后堵塞线程,开始等待;

?

    private void waitForCompletion() throws Exception {        System.out.println("Waiting for completion...");        synchronized (mutex) {            while (remaining > 0) {                mutex.wait();//赌赛线程            }        }    }
?


最后通过 onMessage()方法,接收到订阅者反馈的“REPORT”类信息后,才
print 反馈信息并解除线程堵塞,进入下一轮。

?

    public void onMessage(Message message) {        synchronized (mutex) {            System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");            if (remaining == 0) {                mutex.notify();//唤醒线程            }        }    }
?


注:可同时运行多个订阅者测试查看此模式效果

?

?

?

热点排行