13.3 优化讯息消费者

13.3 Optimizing message consumers

In ?order to ?maximize application ?performance, you ?have to ?look at ?all the

participants— and ?as we ?have seen ?so far, ?consumers play ?a big part in the

overall performance of ActiveMQ. Message consumers typically have to work ?twice

as hard as ?message producers, because ?in addition to ?consuming messages, they

have to acknowledge that the message ?has been consumed. We’ll explain some ?of

the biggest performance gains you can get with ActiveMQ by tuning the consumers.







Typically the ActiveMQ ?broker will deliver ?messages as quickly ?as possible to

consumer connections. Once ?messages are delivered ?over the transport ?from the

ActiveMQ broker, they’re ?typically queued in ?the session associated ?with the

consumer, where they wait ?to be delivered. In ?the next section we’ll ?explain

why and how the ?rate at which messages ?are pushed to consumers ?is controlled,

and how to tune that rate for better throughput.






13.3.1 Prefetch limit

13.3.1 预获取限制


ActiveMQ uses a push-based model for delivery, delivering messages to ?consumers

when they’re received by the ActiveMQ broker. To ensure that a consumer ?won’t

exhaust its memory, there’s a limit (prefetch limit) to how many messages ?will

be delivered to a consumer before ?the broker waits for an acknowledgement ?that

the messages have ?been consumed by ?the application. Internal ?to the consumer,

messages are taken off the transport when they’re delivered and placed into ?an

internal queue associated with the consumer’s session, as shown in figure 13.5.



限制,有一个参数(prefetch limit)可以限制代理在消费者确认消息已被应用程序处理之前可以




A consumer connection will queue ?messages to be delivered internally. ?The size

of these queues plus the number ?of in-flight messages (messages that have ?been

dispatched to the consumer but haven’t yet been acknowledged) is limited by the

prefetch limit for that consumer. In general, the larger the prefetch size, ?the

faster the consumer will work.




消费者的prefetch limit 参数限制.通常,这个prefetch limit 参数设置的越大,消费者运行的越快.


But this isn’t always ideal for queues, where you might want to ensure that messages

are evenly distributed across all consumers on a queue. In this case with a large

prefetch, a slow consumer could have pending messages waiting to be processed that

could’ve been worked on by a faster consumer. In this case, a lower prefetch number

would work better. If the prefetch is zero, the consumer will pull messages from the

broker and no push will be involved.



给一个队列上的所有消费者.这种情况下,当prefetch limit 设置的很大时,处理速度较慢的消费者


limit值可能会更适合.如果prefetch limit 值设置为0,这消息消费者会主动从代理拉取消息并且



There are different default prefetch sizes for different consumers:

对于不同种类的消费者而言有不同的prefetch limit默认值:


? Queue consumer default prefetch size = 1000

? Queue browser consumer default prefetch size = 500

? Persistent topic consumer default prefetch size = 100

? Nonpersistent topic consumer default prefetch size = 32766

队列消费者的prefetch limit默认值为1000

队列浏览消费者的prefetch limit默认值为500

持久化主题消费者的prefetch limit默认值为100

非持久化主题的prefetch limit默认值为32766


The prefetch size is the number of outstanding messages that your consumer will have

waiting to be delivered, not the memory limit. You can set the prefetch size for your

connection by configuring the ActiveMQConnectionFactory as shown next.


prefetch limit值是消息消费者等待接收的消息的数量而不是内存值大小.可以通过设置

ActiveMQConnectionFactory的相关属性值值来设置prefetch limit,如下代码所示:


Listing 13.11 Configuring the prefetch policy on the ActiveMQConnectionFactory

代码清单13.11 配置ActiveMQConnectionFactory的预拉取策略


? ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();

? Properties props = new Properties();

? props.setProperty("prefetchPolicy.queuePrefetch", "1000");

? props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500");

? props.setProperty("prefetchPolicy.durableTopicPrefetch", "100");

? props.setProperty("prefetchPolicy.topicPrefetch", "32766");

? cf.setProperties(props);


Or you can pass the prefetch size as a destination property when you create a




Listing 13.12 Setting the prefetch size when creating a destination

代码情况13.12 创建消息目的地时设置预拉取尺寸


? Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

? MessageConsumer consumer = session.createConsumer(queue);


Prefetch limits are an easy mechanism to boost performance, but should be used with

caution. For queues, you should consider the impact on your application if you have a

slow consumer, and for topics, factor how much memory your messages will consume

on the client before they’re delivered.


使用Prefetch limit是一种简单的提升性能机制,但是需要谨慎使用.对于队列来说,你应该考虑




Controlling the rate at which messages are delivered to a consumer is only part of

the story. Once the message reaches the consumer’s connection, the method of message

delivery to the consumer and the options chosen for acknowledging the delivery

of that message back to the ActiveMQ broker have an impact on performance. We’ll

cover these in the next section.






13.3.2 Delivery and acknowledgment of messages

13.3.2 消息的分发和确认


Something that should be apparent from figure 13.5 is that delivery of messages via a

javax.jms.MessageListener.onMessage() will always be faster with ActiveMQ than

calling javax.jms.MessageConsumer.receive(). If a MessageListener isn’t set for a

MessageConsumer, then its messages will be queued for that consumer, waiting for the

receive() method to be called. Not only will maintaining the internal queue for the

consumer be expensive, but so will the context switch by the application thread calling

the receive() method.







As the ActiveMQ broker keeps a record of how many messages have been consumed

to maintain its internal prefetch limits, a MessageConsumer has to send a message

acknowledgment for every message it has consumed. When you use transactions,

this happens at the Session.commit() method call, but is done individually for each

message if you’re using auto-acknowledgment.



内部的prefetch limit,MessageConsumer必须为每一个消息发送消息确认.如果你使用了事务,

当调用Session.commit() 方法是会发送消息确认,但是假如你使用auto-acknowledgment模式



Some optimizations ?are used ?for sending ?message acknowledgments ?back to ?the

broker, ?which ?can ? drastically ?improve ?the ? performance ?when ?using ? the

DUPS_OK_ACKNOWLEDGE session acknowledgment ?mode. In addition, ?you can set ?the

optimizeAcknowledge property on the ActiveMQ ConnectionFactory to give a hint to

the consumer to roll up message acknowledgments.


有一些优化选项专门用于发送消息确认给代理,当使用DUPS_OK_ACKNOWLEDGE session确认模式时,

这些优化选项可以显著的改善性能.另外,你可以设置ActiveMQ ConnectionFactory的



Listing 13.13 Setting the optimizeAcknowledge property


? ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();

? cf.setOptimizeAcknowledge(true);


When using optimizeAcknowledge or the DUPS_OK_ACKNOWLEDGE acknowledgment

mode on a session, the message consumer can send one message acknowledgment

to the ActiveMQ message broker containing a range of all the messages

consumed. This reduces the amount of work the message consumer has to do,

enabling it to consume messages at a much faster rate.






Table 13.2 below outlines the ?different options for acknowledging messages ?and

how ?often they ?send back ?a message ?acknowledgment to ?the ActiveMQ ?message





Table 13.2 ActiveMQ acknowledgment modes

表13.2 ActiveMQ消息确认模式


Acknowledgment modeSends an acknowledgmentDescription


Session.SESSION_TRANSACTED Rolls up acknowledgments with Reliable way for message consumption

Session.commit(). ? ? ? ? ? ? ? ? and performs well, providing ? ? ? ?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?you consume more than one message ??

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? in a commit.?

Session.SESSION_TRANSACTED 使用 Session.commit()方法批量确认这是消息消费的一种可靠方式,并且性能很好,允许

消息 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?一次提交中处理多个消息. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??


Session.CLIENT_ACKNOWLEDGE All messages up to          Can perform well, providing the

when a message isapplication consumes a lot of messages

acknowledged are consumed. before calling acknowledge.

Session.CLIENT_ACKNOWLEDGE 所有消息都依赖于客户端手动调用方法确认 ?这种方式可以提供很好的性能.在调用确认消息方法



Session.AUTO_ACKNOWLEDGE Automatically sends aThis can be slow but is often the

message acknowledgment default mechanism for message

back to the ActiveMQ broker forconsumers.

every message consumed.

Session.AUTO_ACKNOWLEDGE 每个消息处理完成后自动发送这种方式会比较慢,但常常是默认的消息确认机制



Session.DUPS_OK_ACKNOWLEDGE Allows the consumer toAn acknowledgment will be sent

send one acknowledgmentback when the prefetch limit has

back to the ActiveMQ broker for areached 50% full. The fastest standard

range of messages consumed.way of consuming messages.

Session.DUPS_OK_ACKNOWLEDGE 允许消息消费者一次发送一个消息确认当消费者收到的消息达到prefetch limit设置值的





ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE Sends one acknowledgmentAllows great control by enabling

for every message consumed.messages to be acknowledged individually

but can be slow.

ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE 每处理一个消息就发送一次确认最大限度的允许控制每个消息独立的被确认,但是会很慢.


optimizeAcknowledge Allows the consumer toA hint that works in conjunction with

send one acknowledgmentSession.AUTO_ACKNOWLEDGE. An

back to theacknowledgment will be sent back

ActiveMQ broker for awhen 65% of the prefetch buffer

range of messages consumed.has been consumed. This is the

fastest way to consume messages.

optimizeAcknowledge 允许消息消费者一次发送一个消息确认与Session.AUTO_ACKNOWLEDGE一同起作用,





The downside to not acknowledging every message individually is that if the message

consumer were to lose its connection with the ActiveMQ broker for any reason, then

your messaging application could receive duplicate messages. But for applications

that require fast throughput (such as real-time data feeds) and are less concerned

about duplicates, using optimizeAcknowledge is the recommended approach.





The ActiveMQ message consumer incorporates duplicate message detection, which

helps minimize the risk of receiving the same message more than once.



13.3.3 Asynchronous dispatch

13.3.3 异步分发


Every session maintains an internal queue of messages to be dispatched to interested

message consumers (as can be seen from figure 13.5). The usage of an internal queue

together with an associated thread to do the dispatching to message consumers can

add considerable overhead to the consumption of messages.





You can disable a property called alwaysSessionAsync on the ActiveMQ

ConnectionFactory to turn this off. This allows messages to be passed directly from

the transport to the message consumer. This property can be disabled as shown in the

following code.





Listing 13.14 Disabling the alwaysSessionAsync property

代码清单13.14 禁用alwaysSessionAsync属性


ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();



Disabling asynchronous dispatch allows messages to be pass the internal queueing

and dispatching done by the session, as shown in figure 13.6.






So far we’ve looked at some general techniques you can use to improve performance,

such as using reliable messaging instead of guaranteed and co-locating an

ActiveMQ broker with a service. We’ve covered different tuning parameters for transports,

producers, and consumers.






Because using examples is the best way to demonstrate something, in the next section

we’ll demonstrate how to improve performance with an example application of a

real-time data feed.




