12.2 消息群组
12.2 Message groups
12.2 消息群组
?
We can refine the exclusive consumer concept further with message groups. Instead
of all messages going to a single message consumer, messages can be grouped
together for a single consumer, and a message producer can designate a group for a
message by setting the message header JMSXGroupID. The ActiveMQ broker will
ensure that all messages with the same JMSXGroupID are sent to the same consumer, as
shown in figure 12.3.
?
消息群组是排他性消息消费者概念的进一步完善.不同于将所有消息都发送到单一的消费者,消息可以被
分组,同一组消息会发送到同一个消费者,并且消息生产者通过设置消息头中的JMSXGroupID值给消息
分组.ActiveMQ代理确保JMSXGroupID值相同的消息会被发送到相同的消费者,如图12.3所示.
?
If the consumer designated by the ActiveMQ broker to receive messages for a given
JMSXGroupID should close or become unavailable (a network outage, for example),
then the ActiveMQ broker will select a different message consumer to dispatch the
grouped messages to.
?
如果代理之前根据JMSXGroupID而选中的消费者变得不可用了(比如网络故障),代理会选择另外
一个代理来接收之前的分组消息.
?
Using message groups is straightforward. The definition of a group is left up to a
user and is done on the message producer—it just has to be unique for a particular
queue. All the routing is done in the ActiveMQ message broker.
?
使用消息群组非常简单.消息群组的定义由用户决定并通过消息生产者完成--对于一个队列来说群组必须
是唯一的.所有的路由都由ActiveMQ代理完成.
?
To create a group, you need to set a JMSXGroupID string property on the messages
being sent by the message producer, as shown:
?
为了创建消息群组,你选在消息生产者发送消息之前设置字符型的属性JMSXGroupID设置属性值,
如下代码所示:
?
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("<foo>test</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
producer.send(message);
?
The previous example shows a message producer being created, and then setting a
TextMessage to belong to the message group TEST_GROUP_A.
?
前面的例子展示了消息生产者创建后,将一个文本消息设置成为TEST_GROUP_A消息组中一员.
?
Message groups use normal message consumers, so no additional work is required
to consume messages from a group. All the work is done by the message producer in
defining the group messages belong to, and by the ActiveMQ broker in selecting a
message consumer to send all the grouped messages to.
?
消息群组使用常规的消息消费者,因此接收群组中的消息时无需完成任何额外的工作.消息生产
者定义消息属于那个群组,以及ActiveMQ代理选择消息消费者以便接收分组的消息后,
所有使用消息分组工作就完成了.
?
The ActiveMQ broker will add a sequence number to each message in a group,
using the standard JMSXGroupSeq message header property. The sequence number
will start from 1 for a new message group.
?
ActiveMQ代理会使用标准的消息头属性JMSXGroupSeq为群组中的每个消息设置一个序列号.
对于新的群组来说该序列号从1开始.
?
But from the perspective of the message consumer, you can’t assume that the first
message you receive for a new group will have the JMSXGroupSeq set to 1. If an existing
message group consumer closes or dies, any messages being routed to its group will be
assigned a new consumer. To help identify that a message consumer is receiving messages
to a new group, or a group that it hasn’t seen before, a Boolean property called
JMSXGroupFirstForConsumer is set for the first message sent to the new message consumer.
You can check whether a message is being sent to your consumer for the first
time by seeing if this property has been set, as shown:
?
但是,从消息消费者的角度看,你不能假设消费者接收到的新群组中的第一个消息的序列号JMSXGroupSeq值是1.
如果一个消息群组的消费者失效了,任何属于该群组的消息会发送到一个新的消费者.为了确定消息消费者开始从
一个新的群组接收消息,或从一个之前没有接触过的群组接收消息,第一个发送的新消息消费者的消息会设置一个
布尔型的属性JMSXGroupFirstForConsumer.你可以通过检查这个属性来确定当前消费者接收到的消息是否是
群组中的第一个消息,示例代码如下所示:
?
Connection connection = new ActiveMQConnectionFactory().createConnection();
? ? connection.start();
? ? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
? ? Queue queue = session.createQueue("group.queue");
? ? MessageConsumer consumer = session.createConsumer(queue);
? ? Message message = consumer.receive();
? ? String groupId = message.getStringProperty("JMSXGroupId");
? ? if (message.getBooleanProperty("JMSXGroupFirstForConsumer"))
? ? {
? ? ? // do processing for new group
? ? }
?
It’s often the ?case that you ?start a number ?of message consumers ?to process
messages at the same time. The ActiveMQ message broker will allocate all message
groups evenly across all consumers, but if there are already messages waiting to
be ?dispatched, the ?message groups ?will typically ?be allocated ?to the ?first
consumer.?
?
通常情况下,你会同时启动很多消息消费者处理消息.ActiveMQ会将所有消息群组平均分配给所有
的消息消费者,但是如果消息群组中有正等待被分发的消息,那么这个群组通常被分配给第一个消息
消费者.
?
To ensure an even distributed load, it’s possible to give the message broker ?a
hint to wait for more message consumers to start. To do this, you have to set up
a ?destination ?policy in ?the ?ActiveMQ broker’s ?XML ?configuration. Set ?the
consumersBeforeDispatchStarts ?property with ?the number ?of message consumers
you expect your application to use, as the following example demonstrates:
?
为了更加均衡的分布负载,可以给代理一个信号让其等待更多的消息消费者启动.为此,你需要
在ActiveMQ代理的XML配置文件中设置一个消息目的地策略.设置consumersBeforeDispatchStarts
属性为你的应用重新中打算使用的消息消费者的数量如下面示例代码所示:
?
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">"
consumersBeforeDispatchStarts="2"
timeBeforeDispatchStarts="5000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
?
The sample configuration tells the ActiveMQ broker that any queue (the name of the
queue is >, which is a wildcard for any match) should wait for two consumers before
dispatching. Additionally we’ve also set the timeBeforeDispatchStarts property to
5000ms to inform the ActiveMQ broker that if two message consumers aren’t available
within 5 seconds of getting the first message on the queue, it should use the first that
becomes available.
?
上面的示例配置代码通知ActiveMQ代理所有的队列(通过使用通配符>,匹配所有队列)都要等待两个
消息消费者都启动完成了才开始分发消息.此外,我们还设置了timeBeforeDispatchStarts属性为
5000毫秒,以便通知ActiveMQ代理如果两个代理没有在5秒内成功接收到队列中的第一个消息,
队列应该使用第一个可用的消费者.
?
Using message groups does add some minimal overhead to the ActiveMQ broker,
in terms of storing routing information for each message group. It’s possible to explicitly
close a message group by sending a message to the ActiveMQ broker with the
JMSXGroupID set to the group you want to close and the JMSXGroupSeq property set to
-1, like in the following example:
?
使用消息群组确实是会给ActiveMQ代理增加一些额外开销的,因为需要为每一个消息群组存储路由
信息.可以通过设置JMSXGroupID为你打算关闭的群组名称同时设置JMSXGroupSeq值为-1,来明确
的关闭一个消息群组.如下面的示例代码所示:
?
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);<foo />
Message message = session.createTextMessage("<foo>close</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);
?
You can re-create a message group that has been closed by sending a new message to
the group. But the group may be assigned to a different message consumer by the
ActiveMQ broker.
?
你可以通过发送新消息到以关闭的群组来重新创建这个群组.但是ActiveMQ代理可能会给这个
重新创建的消息群组设置一个新的消息消费者.
?
Conceptually, message groups are like using message selectors. The difference is
that message groups automatically handle the selection of message consumers, and
they also handle the failover of message groups when a message consumer fails.
?
从概念上将,消息群组类似与使用消息选择器.不同的是使用消息分组可以自动的为消息消费者选择消息,
同时当群组的消费者失效后还可以进行失效转移.
?
Having looked at exclusive consumers and message groups, in the next sections
we’re going to look at how to transport large messages using advanced client-side
options with ActiveMQ, using either JMS streams or blob messages.
?
了解了排他性消息消费者和消息分组后,在下一节中我们就爱那个看到如何借助客户端高级选项,
使用JMS流和二进制消息来来传输大消息.