11.2 Advisory 消息
else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE)
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Connection stopped: " + removeInfo.getObjectId());
System.err.println("Unknown message " + data);
You can see from the example that we use a regular JMS construct to start listening to
advisory topics. It’s worth noting the use of the AdvisorySupport class, which contains
the definition of all the advisory topic definitions. Things get harder when we start
using ActiveMQ-specific command objects—although a ConnectionInfo is sent when
a connection starts, a RemoveInfo is sent when a connection stops. The RemoveInfo
does carry the connectionId (set as the RemoveInfo’s objectId)—so it’s possible to
correlate which connection has stopped.
Most advisory messages are specific to destinations. But the AdvisorySupport class
does have some helper methods to determine which advisory topic to listen to. You
can also use wildcards—so, for example, if you created an advisory topic for the queue
named >, you’d get information for all queues.
Let’s look at an example of listening for consumers coming and going for a queue
named test.Queue:
Listing 11.1 Subscribing for consumer advisories
代码清单11.1 订阅consumer相关的advisorie 消息
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
//Lets first create a Consumer to listen too
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Lets first create a Consumer to listen too
Queue queue = session.createQueue("test.Queue");
MessageConsumer testConsumer = session.createConsumer(queue);
//so lets listen for the Consumer starting and stopping
Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(queue);
MessageConsumer consumer = session.createConsumer(advisoryTopic);
new MessageListener()
public void onMessage(Message m)?
System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE)?
? ConsumerInfo consumerInfo = (ConsumerInfo) data;
System.out.println("Consumer started: " + consumerInfo);
else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE)?
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Consumer stopped: "+ removeInfo.getObjectId());
System.err.println("Unknown message " + data);
} catch (JMSException e)?
? ? );
You’ll notice in the example that we create a test consumer on the queue test.queue
before we create the listener for consumer advisories on test.queue. This is to demonstrate
that the ActiveMQ broker will also send advisory messages for consumers that
already exist when you start to listen for them.
consumer相关的 advisory消息创建了监听器.这就说明,在启动监听已有的消费者时,ActiveMQ同样
There are some advisories on destinations that aren’t enabled by default; these are
advisories on message delivery, slow consumers, fast producers, and so forth. To enable
these advisories, you have to configure them on a destination policy in the ActiveMQ
broker configuration file. For example, to configure an advisory message for slow consumers
on all queues, you need to add the following to your configuration:
<policyEntry queue=">" advisoryForSlowConsumers="true" />
You can use advisory messages to supplement your application behavior (for example,
you could slow message production from your producers if you have slow consumers)
or to supplement JMX monitoring of the ActiveMQ broker. Advisory messages are useful
for getting dynamic information on changes to your ActiveMQ system.
Many ?different ?advisories are ?generated ?by the ?ActiveMQ ?broker to ?provide
information about the running system. A dozen of the more useful advisory topics
appear in ?the numbered ?list below, ?and their ?properties (matched ?by number)
appear in table 11.1.
1 ActiveMQ.Advisory.Connection
2 ActiveMQ.Advisory.Producer.Queue
3 ActiveMQ.Advisory.Consumer.Queue
4 ActiveMQ.Advisory.Queue
5 ActiveMQ.Advisory.Expired.Queue
6 ActiveMQ.Advisory.SlowConsumer.Queue
7 ActiveMQ.Advisory.FastProducer.Queue
8 ActiveMQ.Advisory.MessageDelivered.Queue
9 ActiveMQ.Advisory.MessageConsumed.Queue
10 ActiveMQ.Advisory.FULL
11 ActiveMQ.Advisory.MasterBroker
12 ActiveMQ.Advisory.MessageDLQd.Queue
Table 11.1 Properties from the list of 12 ActiveMQ advisory topics
表11.1 ActiveMQ的advisory主题的属性
Description PropertiesData structure Generated by default ?Policy entry property
1 Generated when ?a connection ?start/stops null null true none
2 Producer start/stop messages on a queue ? ? ? String='producerCount'— ? ProducerInfo true none
number of producers ?
3 Consumer start/stop messages on a Queue ? ? String='consumerCount'—ConsumerInfo true none
number of Consumers
4 Queue created/destroyednull nulltrue none
5 Expired messages on aqueueString='orignalMessageId'— Message true none
expired id
6 Slow queue consumerString='consumerId'— ConsumerInfo false advisoryForSlowConsumers
consumer ID
7 Fast queue producerString='producerId'—ProducerInfo false advisdoryForFastProducers
producer ID
8 Message delivered to the brokerString='orignalMessageId'—Message false advisoryForDelivery
delivered ID
9 Message consumed by a clientString='orignalMessageId'—Message false advisoryForConsumed
delivered ID
10 A usage resource is at its limitString='usageName'—null false advisoryWhenFull
name of usage resource
11 A broker is now the master in a null null true none
master/slave ?configuration
12 Message sent to a dead letter queue ? ? ? ? ? ? String='orignalMessageId'—Message true none
delivered ID
In the next section, we’re going to change tack and look at an advanced feature called
virtual topics, which can be used to supplement the way you consume messages, combining
the features of both of topics and queues.