12.1 排他性消息消费者
12.1 Exclusive consumers
12.1 排他性消息消费者
?
When messages are dispatched from an ActiveMQ broker, they’ll always be in first in,
first out order. But if you have more than one message consumer for a queue in your
application(s), you can’t guarantee that the order in which the messages were dispatched
will be the same order in which your application will consume them. This is
because you never have control over the scheduling of threads used to deliver the
messages on the client side—even if all your message consumers are using the same
connection. Ideally you’d only have one message consumer to ensure ordering of
messages. But you may also need to support failover, to have another instance of your
queue message consumer take over the processing of the queue messages if the first
consumer fails. ActiveMQ can support having multiple message consumers on a
queue, but having only one of them receive the messages from that queue. We’ll discuss
this concept in the following subsection.
?
ActiveMQ代理总是以先进先出的方式转发消息.但是,如果你程序的一个队列中有多个消息消费者,
你就无法保证程序按照代理发送消息的顺序来处理消息.这是因为在客户端,即使你的消息消费者都使
用同一个连接,你也无法控制用于调度消息发送的线程.理想情况是你只使用一个消费者以保证处理
消息的顺序.但是,你可能还需要支持失效转移,支持在第一个消息消费者失效后,使用队列的另外一个
消息消费者实例来接管第一个小消费者.ActiveMQ支持一个消息队列拥有多个消息消费者,但是仅有
一个消费者会从代理接收消息.我们将在下面的小节中阐述这个概念.
?
12.1.1 Selecting an exclusive message consumer
12.1.1 选择一个排他的的消息消费者
?
For applications where message order is important, or you need to ensure that there
will be only one message consumer for a queue, ActiveMQ offers a client-side option
to have only one active message consumer process messages. The ActiveMQ message
broker will select one consumer on the queue to process messages. The benefit of
allowing the broker to make the choice is that if the consumer stops or fails, then
another message consumer can be selected to be active, as depicted in figure 12.1.
?
对于那些认为消息顺序很重要的应用程序来说,你需要保证一个队列只有一个消息消费者.
ActiveMQ提供一个客户端选项以便只让一个活动的消息消费者处理队列消息.ActiveMQ
消息代理会为队列选择消息消费者来处理消息.让代理来选择消息消费者的好处是,如果当前
的消息消费者停止了或者失效了,代理可以选择并激活其他的消费者,就像图12.1所示的那样.
?
If you mix ?standard consumers and ?exclusive consumers on ?the same queue, ?the
ActiveMQ message broker will still only deliver messages to one of the exclusive
consumers. If all the ?exclusive consumers become ?inactive, and there’s ? still
a standard message consumer, then consumption of ?queue messages will return ?to
the normal ?mode ?of ? delivery—the messages ? will ?be ?delivered ?in ?a round
-robin ?fashion between all the remaining active standard message consumers.
?
如果消息队列中既有标准的消费者也有排他消息消费者,那么ActiveMQ消息代理将仍然将
消息发送给其中的一个排他消息消费者.如果所有排他性的消费者都处于非激活状态,但同时
还有标准的消息消费者存在,这是队列的消息处理模式将切换为常规方式--所有消息会以
循环方式发送到所有保持活动的标注消息消费者.
?
You can create an exclusive consumer using a destination option on the client, like
in the following code extract:
你可以在客户端使用消息目的地选项来创建排他性的消息消费者,如下面的示例代码所示:
?
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
?
The ability to select a message consumer to be exclusive can be used for more than
guaranteeing that messages are consumed by only one active message consumer. You
can use the exclusive consumer pattern to create a distributed lock, as we’ll demonstrate
in the next section.
?
选择排他性的消息消费,不仅可以保证消息只会被一个活动的消费者处理,而且可以用来创建分布式锁,
这一点我们将在下一节介绍.
?
12.1.2 Using exclusive consumers to provide a distributed lock
12.1.2 使用排他性消息消费者提供分布式锁
?
Often you use messaging to broadcast data from an external resource, be that changes
to records in a database, or comma-separated values appended to a file, or a raw realtime
data feed. You might wish to build in redundancy, so if an instance of the application
reading and broadcasting the changing data fails, another can take over. Often
you can rely on locking a resource (row lock or file lock) to ensure that only one process
will be accessing the data and broadcasting over a topic at a time. But when you
don’t want the overhead of using a database, or want to run processes across more than
one machine (and can’t use distributed locks), then you can use the exclusive consumer
functionality to create a distributed lock. In figure 12.2 we show an application
where we want failover for a client reading data from a real-time feed. We only want
one client to connect to the feed and distribute the events, but if it fails, we need
another client available to take over.
?
通常,你需要从外部资源广播消息是为了更新数据库中的记录,或者在一个文件后面追加逗号分割的数据,或者
发布实时的原始数据.你可能会希望创建一个系统备份,以便应用程序在读取或者广播变化数据失败后,系统备份
可以接管并继续工作.通常情况下,你可以依靠给资源加锁(行锁或文件锁)以保证同一时间只有一个进程可以
操作数据或者广播消息到一个主题.但是,当你不打算接受使用数据库带来的额外开销,或者你想要在多个机器上
运行同一个程序(但不能使用分布锁),此时,你可以使用排他性消息消费者功能来创建一个分布式锁.
图12.2中我们展示了一个应用程序,该程序作为客户端可以实时的读取数据,我们打算让这个程序实现失效转移.
我们只打算让一个客户端连接到数据源并分发事件,但是当这个客户端失效后,我们需要其他的客户端可以接管.
?
In order to use exclusive consumers to create a distributed lock, we need our message
producer to subscribe exclusively to a well-known queue. If the message producer
receive a message from the queue, it becomes activated, and can then subscribe to the
real-time feed and transform the real-time data into JMS messages. Here’s a code snippet
for the message producer to initiate a distributed lock:
?
为了使用排他性的消息消费者来创建分布式锁,我们的消息生产者只注册一个消息队列.如果消息消费者从队列中
接收了消息,该消费者即被激活,接下来即可订阅实时的数据源并将实时的数据转换成JMS消息.
下面的是消息生产者初始化分布式锁的代码片段:
?
public void start() throws JMSException?
{
this.connection = this.factory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true");
Message message = this.session.createMessage();
MessageProducer producer = this.session.createProducer(destination);
producer.send(message);
MessageConsumer consumer = this.session.createConsumer(destination);
consumer.setMessageListener(this);
}
?
In this example, we always send a message to the well-known queue, to start ?off
consumption— this step could always be done externally by a management process.
Note ?that ?we ?use ?Session.CLIENT_ACKNOWLEDGE ?mode ?to ?consume ?the message.
?
在这个例子中,我们总是发送消息到已知的消息队列以便启动消息消费--这个步骤可以在管理
进程内部完成.请注意,我们使用了Session.CLIENT_ACKNOWLEDGE模式来消费消息.
?
Although we ?want to ?be notified ?that we’re ?an exclusive consumer—and hence
have the lock— we don’t want to remove the message from the well-known ?queue.
In this way, if we fail, another exclusive producer will be activated.
?
尽管我们打算将自己标识成一个排他性的消费者--因此占有锁--我们不想从公共的消息队列中
移除消息.这种情况下,如果,当前消息生产者失败了,其他的排他性生产者会被激活.
?
For this example, we’d implement the MessageListener to look like the following
code snippet. If we’re not already activated, we call a fictional method—start-
Producing(). If this were a real application, this method would start subscribing to a
real-time feed and convert real-time data into JMS messages:
?
在这个例子中.我们将MessageListener实现成下面代码片段所示的样子.如果当前生产者未激活,我们
可以调用示例方法startProducing().如果是在一个真实的程序中,这个方法应该要订阅一个
实时的数据源并将实时数据转换成JMS消息.
?
public void onMessage(Message message)
?{
if (message != null && this.active==false)?
{
this.active=true;
startProducing();
}
}
?
(译注:这个程序的应用场景: 利用排他性消息消费者,实现分布式锁.?
当前的这个类首先是一个消息消费者(并且是排他性的消费者),从一个数据源接收消息.
因为是排他性的,所以可以在多个机器上运行这个类,但同一时刻只有一个是处于激活状态的.
处于激活状态时,当这个消费者接收消息时,调用onMessage方法,开始startProducing();
如果当前的消费着失败了,则其他的消费者接替它继续工作,再次调用onMessage()方法,再次调用
startProducing();.这样就相当于使用排他性消息消费者的特性,构造了一个可以分布式运行的producer.)
?
We’ve shown that using an exclusive consumer allows us to ensure that only one message
consumer will be active at a time. In the next section, we’ll look at message
groups, where the ActiveMQ broker can selectively choose a message consumer for all
messages that have the same JMSXGroupID message header property set.
?
我们已经展示了如何使用排他行的消息消费以确保同一时刻只有一个消息消费者是激活的.
下一小节中我们将看到消息群组,ActiveMQ代理会为所有具有相同的名称为JMSXGroupID
消息头属性的消息选择一个消息消费者.