首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

13.3 优化讯息消费者

2014-01-03 
13.3 优化消息消费者13.3 Optimizing message consumers13.3 优化消息消费者?In ?order to ?maximize appl

13.3 优化消息消费者

13.3 Optimizing message consumers

13.3 优化消息消费者

?

In ?order to ?maximize application ?performance, you ?have to ?look at ?all the

participants— and ?as we ?have seen ?so far, ?consumers play ?a big part in the

overall performance of ActiveMQ. Message consumers typically have to work ?twice

as hard as ?message producers, because ?in addition to ?consuming messages, they

have to acknowledge that the message ?has been consumed. We’ll explain some ?of

the biggest performance gains you can get with ActiveMQ by tuning the consumers.

?

为了最大限度的提升应用程序的性能,你必须关注所有影响性能的因素.到目前为止,消息消费者

在整个ActiveMQ系统的性能表现中都扮演着举足轻重的角色.通常,消息消费者必须要尽量以

2倍于消息生产者的速度运行,因为消费者还要通知代理消息已经被处理了.下面我们将介绍

通过优化消息消费者你可以获取的最大的性能提升.

?

Typically the ActiveMQ ?broker will deliver ?messages as quickly ?as possible to

consumer connections. Once ?messages are delivered ?over the transport ?from the

ActiveMQ broker, they’re ?typically queued in ?the session associated ?with the

consumer, where they wait ?to be delivered. In ?the next section we’ll ?explain

why and how the ?rate at which messages ?are pushed to consumers ?is controlled,

and how to tune that rate for better throughput.

?

通常,ActiveMQ代理会通过消费者连接尽可能快的发送消息.通常情况下,一旦消息通过ActiveMQ代理的传输连接

发送完成之后,消息就加入了与消费者关联的session队列中,并等待分发.在下一节中,我们将解释消息发送给消费者

的速度为何可控以及如何控制,同时还将阐述如何调整这个消息发送速率已获取更好的吞吐量.

?

13.3.1 Prefetch limit

13.3.1 预获取限制

?

ActiveMQ uses a push-based model for delivery, delivering messages to ?consumers

when they’re received by the ActiveMQ broker. To ensure that a consumer ?won’t

exhaust its memory, there’s a limit (prefetch limit) to how many messages ?will

be delivered to a consumer before ?the broker waits for an acknowledgement ?that

the messages have ?been consumed by ?the application. Internal ?to the consumer,

messages are taken off the transport when they’re delivered and placed into ?an

internal queue associated with the consumer’s session, as shown in figure 13.5.

?

ActiveMQ使用一种基于推送的模式来将收到的消息分发给代理.为了防止超过消费者的内存

限制,有一个参数(prefetch limit)可以限制代理在消费者确认消息已被应用程序处理之前可以

发送给消费者的消息数量.在消费者内部,从传输连接器上接管的消息会被分发并放置于一个和消费者

session管理的内部队列中,如图13.5所示.

?

A consumer connection will queue ?messages to be delivered internally. ?The size

of these queues plus the number ?of in-flight messages (messages that have ?been

dispatched to the consumer but haven’t yet been acknowledged) is limited by the

prefetch limit for that consumer. In general, the larger the prefetch size, ?the

faster the consumer will work.

?

消费者连接会在内部将分发过来的消息队列化.这个内部的消息队列的尺寸加上尚未发送回执

给代理的消息(这些消息已经被消费者接收了但是还没有通知代理消息已被消费)的数量之和受到

消费者的prefetch limit 参数限制.通常,这个prefetch limit 参数设置的越大,消费者运行的越快.

?

But this isn’t always ideal for queues, where you might want to ensure that messages

are evenly distributed across all consumers on a queue. In this case with a large

prefetch, a slow consumer could have pending messages waiting to be processed that

could’ve been worked on by a faster consumer. In this case, a lower prefetch number

would work better. If the prefetch is zero, the consumer will pull messages from the

broker and no push will be involved.

?

但是对于消息队列来说,设置这个限制并非是最理想方案,因为你可能希望消息会被平均的分发

给一个队列上的所有消费者.这种情况下,当prefetch limit 设置的很大时,处理速度较慢的消费者

可能会累积待处理的消息,而这些消息却不能被更快的消费者处理.这种情况下,设置较低的prefetch?

limit值可能会更适合.如果prefetch limit 值设置为0,这消息消费者会主动从代理拉取消息并且

不允许代理推送任何消息到消费者.

?

There are different default prefetch sizes for different consumers:

对于不同种类的消费者而言有不同的prefetch limit默认值:

?

? Queue consumer default prefetch size = 1000

? Queue browser consumer default prefetch size = 500

? Persistent topic consumer default prefetch size = 100

? Nonpersistent topic consumer default prefetch size = 32766

队列消费者的prefetch limit默认值为1000

队列浏览消费者的prefetch limit默认值为500

持久化主题消费者的prefetch limit默认值为100

非持久化主题的prefetch limit默认值为32766

?

The prefetch size is the number of outstanding messages that your consumer will have

waiting to be delivered, not the memory limit. You can set the prefetch size for your

connection by configuring the ActiveMQConnectionFactory as shown next.

?

prefetch limit值是消息消费者等待接收的消息的数量而不是内存值大小.可以通过设置

ActiveMQConnectionFactory的相关属性值值来设置prefetch limit,如下代码所示:

?

Listing 13.11 Configuring the prefetch policy on the ActiveMQConnectionFactory

代码清单13.11 配置ActiveMQConnectionFactory的预拉取策略

?

? ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();

? Properties props = new Properties();

? props.setProperty("prefetchPolicy.queuePrefetch", "1000");

? props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500");

? props.setProperty("prefetchPolicy.durableTopicPrefetch", "100");

? props.setProperty("prefetchPolicy.topicPrefetch", "32766");

? cf.setProperties(props);

?

Or you can pass the prefetch size as a destination property when you create a

destination:

或者,在创建一个消息目的地时,传递预拉取尺寸参数作为消息目的地的属性,如下代码所示:

?

Listing 13.12 Setting the prefetch size when creating a destination

代码情况13.12 创建消息目的地时设置预拉取尺寸

?

? Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

? MessageConsumer consumer = session.createConsumer(queue);

?

Prefetch limits are an easy mechanism to boost performance, but should be used with

caution. For queues, you should consider the impact on your application if you have a

slow consumer, and for topics, factor how much memory your messages will consume

on the client before they’re delivered.

?

使用Prefetch limit是一种简单的提升性能机制,但是需要谨慎使用.对于队列来说,你应该考虑

你的程序中是否有比较慢的消费者;而对于主题来说,你需要考虑在消息被分发之前,你的队列

可以使用的客户端上的最大内存是多少.

?

Controlling the rate at which messages are delivered to a consumer is only part of

the story. Once the message reaches the consumer’s connection, the method of message

delivery to the consumer and the options chosen for acknowledging the delivery

of that message back to the ActiveMQ broker have an impact on performance. We’ll

cover these in the next section.

?

控制消息分发给消费者的速率仅仅是消费者调优的一部分.一旦消息到达消费者的连接器之后,

消息分发到消费者时使用的方法以及消费者用来将消息已被处理的确认发送给代理时使用的选项

就成为影响性能的重要组成部分.我们将在下一节讨论相关内容:

?

13.3.2 Delivery and acknowledgment of messages

13.3.2 消息的分发和确认

?

Something that should be apparent from figure 13.5 is that delivery of messages via a

javax.jms.MessageListener.onMessage() will always be faster with ActiveMQ than

calling javax.jms.MessageConsumer.receive(). If a MessageListener isn’t set for a

MessageConsumer, then its messages will be queued for that consumer, waiting for the

receive() method to be called. Not only will maintaining the internal queue for the

consumer be expensive, but so will the context switch by the application thread calling

the receive() method.

?

由图13.5可以看出,使用javax.jms.MessageListener.onMessage()来分发消息明显比使用

javax.jms.MessageConsumer.receive()要快.如果MessageConsumer没有设置MessageListener

则该消费者的消息会分发到队列中然后等待调用receive()方法.不仅维护消费者内部队列的代价是

昂贵的,而且应用程序线程不断的调用receive()来切换应用程序上下文的代价也是高昂的.

?

As the ActiveMQ broker keeps a record of how many messages have been consumed

to maintain its internal prefetch limits, a MessageConsumer has to send a message

acknowledgment for every message it has consumed. When you use transactions,

this happens at the Session.commit() method call, but is done individually for each

message if you’re using auto-acknowledgment.

?

因为ActiveMQ代理需要保存一个记录以表明当前有多少消息已被消费这消费了来维护消费者

内部的prefetch limit,MessageConsumer必须为每一个消息发送消息确认.如果你使用了事务,

当调用Session.commit() 方法是会发送消息确认,但是假如你使用auto-acknowledgment模式

则每个消息处理完成后都会独自发送消息确认.

?

Some optimizations ?are used ?for sending ?message acknowledgments ?back to ?the

broker, ?which ?can ? drastically ?improve ?the ? performance ?when ?using ? the

DUPS_OK_ACKNOWLEDGE session acknowledgment ?mode. In addition, ?you can set ?the

optimizeAcknowledge property on the ActiveMQ ConnectionFactory to give a hint to

the consumer to roll up message acknowledgments.

?

有一些优化选项专门用于发送消息确认给代理,当使用DUPS_OK_ACKNOWLEDGE session确认模式时,

这些优化选项可以显著的改善性能.另外,你可以设置ActiveMQ ConnectionFactory的

optimizeAcknowledge属性,通过给消费者一个提示信息以便批量发送消息确认信息.

?

Listing 13.13 Setting the optimizeAcknowledge property

?

? ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();

? cf.setOptimizeAcknowledge(true);

?

When using optimizeAcknowledge or the DUPS_OK_ACKNOWLEDGE acknowledgment

mode on a session, the message consumer can send one message acknowledgment

to the ActiveMQ message broker containing a range of all the messages

consumed. This reduces the amount of work the message consumer has to do,

enabling it to consume messages at a much faster rate.

?

当在一个session中使用optimizeAcknowledge或DUPS_OK_ACKNOWLEDGE确认模式时,消费者

只发送一个消息告知ActiveMQ代理一批消息已经完成处理.这样消息消费者要做的工作就减少了,

便于消费者尽可能快的处理消息.

?

Table 13.2 below outlines the ?different options for acknowledging messages ?and

how ?often they ?send back ?a message ?acknowledgment to ?the ActiveMQ ?message

broker.

下面的表13.2中列出了确认消息的不同选项以及使用这些选项后消费者发挥消息确认给ActiveMQ代理

的频率.

?

Table 13.2 ActiveMQ acknowledgment modes

表13.2 ActiveMQ消息确认模式

?

Acknowledgment modeSends an acknowledgmentDescription

确认模式消息确认如何发送描述

Session.SESSION_TRANSACTED Rolls up acknowledgments with Reliable way for message consumption

Session.commit(). ? ? ? ? ? ? ? ? and performs well, providing ? ? ? ?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?you consume more than one message ??

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? in a commit.?

Session.SESSION_TRANSACTED 使用 Session.commit()方法批量确认这是消息消费的一种可靠方式,并且性能很好,允许

消息 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?一次提交中处理多个消息. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??

?

Session.CLIENT_ACKNOWLEDGE All messages up to          Can perform well, providing the

when a message isapplication consumes a lot of messages

acknowledged are consumed. before calling acknowledge.

Session.CLIENT_ACKNOWLEDGE 所有消息都依赖于客户端手动调用方法确认 ?这种方式可以提供很好的性能.在调用确认消息方法

之前允许应用程序处理大量消息.

?

Session.AUTO_ACKNOWLEDGE Automatically sends aThis can be slow but is often the

message acknowledgment default mechanism for message

back to the ActiveMQ broker forconsumers.

every message consumed.

Session.AUTO_ACKNOWLEDGE 每个消息处理完成后自动发送这种方式会比较慢,但常常是默认的消息确认机制

消息确认到代理

?

Session.DUPS_OK_ACKNOWLEDGE Allows the consumer toAn acknowledgment will be sent

send one acknowledgmentback when the prefetch limit has

back to the ActiveMQ broker for areached 50% full. The fastest standard

range of messages consumed.way of consuming messages.

Session.DUPS_OK_ACKNOWLEDGE 允许消息消费者一次发送一个消息确认当消费者收到的消息达到prefetch limit设置值的

告知代理多个消息已被处理一半时即发送消息确认给代理.这消息处理中最快的

标准的消息确认方式

?

?

ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE Sends one acknowledgmentAllows great control by enabling

for every message consumed.messages to be acknowledged individually

but can be slow.

ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE 每处理一个消息就发送一次确认最大限度的允许控制每个消息独立的被确认,但是会很慢.

?

optimizeAcknowledge Allows the consumer toA hint that works in conjunction with

send one acknowledgmentSession.AUTO_ACKNOWLEDGE. An

back to theacknowledgment will be sent back

ActiveMQ broker for awhen 65% of the prefetch buffer

range of messages consumed.has been consumed. This is the

fastest way to consume messages.

optimizeAcknowledge 允许消息消费者一次发送一个消息确认与Session.AUTO_ACKNOWLEDGE一同起作用,

告知代理多个消息已被处理是一个提示,在消费者处理完的消息占到prefetch缓存的

65%时发送消息确认.使用这种模式可以以最快的方式处理消息.

?

?

The downside to not acknowledging every message individually is that if the message

consumer were to lose its connection with the ActiveMQ broker for any reason, then

your messaging application could receive duplicate messages. But for applications

that require fast throughput (such as real-time data feeds) and are less concerned

about duplicates, using optimizeAcknowledge is the recommended approach.

不单独确认每个消息的缺点是,不管消息消费者以任何理由失去了与ActiveMQ代理连接,那么

你的消息应用程序可能会收到重复的消息.但是,对于要求快速处理且不关心消息是否重复的

应用程序(比如实时的数据源)来说,推荐使用optimizeAcknowledge模式.

?

The ActiveMQ message consumer incorporates duplicate message detection, which

helps minimize the risk of receiving the same message more than once.

ActiveMQ的消息消费者包含重复消息侦测机制,可以最大限度的降低收到重复消息的风险.

?

13.3.3 Asynchronous dispatch

13.3.3 异步分发

?

Every session maintains an internal queue of messages to be dispatched to interested

message consumers (as can be seen from figure 13.5). The usage of an internal queue

together with an associated thread to do the dispatching to message consumers can

add considerable overhead to the consumption of messages.

?

灭个session都维护一个内部的即将被分发到各自的消费者的消息的队列(如图13.5所示).内部消息

队列以及与之关联的用于发送消息到消息消费者的线程的使用可能会给消息处理增加额外开销.

?

You can disable a property called alwaysSessionAsync on the ActiveMQ

ConnectionFactory to turn this off. This allows messages to be passed directly from

the transport to the message consumer. This property can be disabled as shown in the

following code.

?

你可以禁用ActiveMQ连接工厂的alwaysSessionAsync属性来停用上述消息队列和消息分发线程.

这种设置运行消息直接从传输连接器发送到消息消费者.下面代码是禁用该属性的示例代码:

?

Listing 13.14 Disabling the alwaysSessionAsync property

代码清单13.14 禁用alwaysSessionAsync属性

?

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();

cf.setAlwaysSessionAsync(false);

?

Disabling asynchronous dispatch allows messages to be pass the internal queueing

and dispatching done by the session, as shown in figure 13.6.

?

停用asynchronous允许消息直接发送到session内部的队列并由session负责进一步分发,

如图13.6所示.

?

?

So far we’ve looked at some general techniques you can use to improve performance,

such as using reliable messaging instead of guaranteed and co-locating an

ActiveMQ broker with a service. We’ve covered different tuning parameters for transports,

producers, and consumers.

?

到目前为止,我们已经了解了一些通用的提升性能技巧,比如使用可靠的消息系统以替代能确保消息收发系统

以及使用和应用程序同址部署的ActiveMQ代理(译注:嵌入式代理).我们也了解了传输连接器,消息生产者和

消费者的一些不同的调优参数.

?

Because using examples is the best way to demonstrate something, in the next section

we’ll demonstrate how to improve performance with an example application of a

real-time data feed.

?

使用示例是解释问题的最好方式,因此下一节中我们将使用一个实时的数据源程序示例来演示如何

改善性能.

?

热点排行