首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 网站开发 > Web前端 >

AMQP 跟 RabbitMQ 入门

2012-10-19 
AMQP 和 RabbitMQ 入门高级消息队列协议(AMQP) 是一个异步消息传递所使用的应用层协议规范。作为线路层协议

AMQP 和 RabbitMQ 入门

高级消息队列协议(AMQP) 是一个异步消息传递所使用的应用层协议规范。作为线路层协议,而不是API(例如JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器3和 客户端可以投入使用。

AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件 (MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一 部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特 性,例如更易于扩展,基于内容的路由。

本文中区别发布/订阅是为了将生产者和消费者拆分开来:生产者无需知道消费者按照什么标准接受消息。队列是一个先入先出的数据结构。路由封装了消息 队列中的消息的相关信息,这些信息决定了消息在异步消息系统中的最终展现形式。

在这里,我尝试解释一下这个模型的一些概念,Aman Gupta使用Ruby5实 现了AMQP模型6。 它使用的是一种事件驱动架构(基于EventMachine7), 在阅读和使用的时候都会让人觉得有些不太熟悉。但是API的设计表明了在AMQ模型实体之间通信的方式是非常简单的,因此,即便开发者对Ruby并不熟 悉,他同样也可以得到收获。

应该注意到,至少有三个或者更多的Ruby客户端8, 9, 10可 供选择。其中的一个客户端Carrot很明显使用了非事件驱动的同步Ruby架构,因此,这个客户端在使用事件驱动的Ruby API的时候,风格非常简洁。

本文中的AMQP服务器是使用Erlang11编 写的RabbitMQ。它实现了AMQP规范0-8版的内容,并且将在近期实现0-9-1版的内容12。

在开始之前再交代一些东西:异步消息是一个非常普通并且广泛使用的技术,从例如Skype或者XMPP/Jabber这样各种各样的即时消息协议到 古老的email。但是,这些服务都有如下特征:

- 它们会在传输消息的时候或多或少加入一些随意的内容(例如一封email可能会包含一个文本和关于办公室笑话的PPT)和一些比较正式的路由信息(例如 email地址)。

- 它们都是异步的,也就是说它们将生产者和消费者区分开来,因此可能将消息加入队列(例如某人发给你一条消息,但是你不在线或者你的邮箱会收到一封 email)。

- 生产者和消费者是具有不同知识的不同角色。我不需要知道你的IMAP用户名和密码就能够给你发送email。事实上,我甚至不需要知道你的email地址 是否是一个马甲或者“真实”地址。这个特性意味着生产者不能控制什么内容被阅读或者订阅了 - 就像我的email客户端会舍弃掉大多数主动发送给我的医药广告。

AMQP是一个抽象的协议(也就是说它不负责处理具体的数据),这个事实并不会将事情变得更复杂。反而,Internet使得消息无处不在。人们通 常使用它们和异步消息简单灵活地解决很多问题。

37
38 send_to_exchange 'Hello', 'foo'
39 send_to_exchange 'You', 'gee'
40 send_to_exchange 'Cruel', 'bar'
41 send_to_exchange 'World', 'wee'
42
43 event_loop.join

虽然不能被命名,但是队列也有以下属性,这些属性和交换器所具有的属性类似。

- 持久性:如果启用,队列将会在协商器重启前都有效。

- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。

- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。

- 排他性:如果启用,队列只能被声明它的消费者使用。

这些性质可以用来创建例如排他和自删除的transient或者私有队列。这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉 - 它们只是短暂地连接到协商器,但是可以用于实现例如RPC或者在AMQ上的对等通信。

AMQP上的RPC是这样的:RPC客户端声明一个回复队列,唯一命名(例如用UUID19), 并且是自删除和排他的。然后它发送请求给一些交换器,在消息的reply-to字段中包含了之前声明的回复队列的名字。RPC服务器将会回答这些请求,使 用消息的reply-to作为routing key(之前提到过默认绑定器会绑定所有的队列到默认交换器)发送到默认交换器。注意仅仅是惯例而已。根据和RPC服务器的约定,它可以解释消息的任何属 性(甚至数据体)来决定回复给谁。

队列也可以是持久的,可共享,非自动删除以及非排他的。使用同一个队列的多个用户接收到的并不是发送到这个队列的消息的一份拷贝,而是这些用户共享 这队列中的一份数据,然后在使用完之后删除掉。

2.4 消息投递的保障机制

消费者会显式或者隐式地通知消息的使用完毕。当隐式地通知的时候,消息被认为在投递之后便被消耗掉。否则客户端需要显式地发送一个验证信息。只有这 个验证信息收到之后,消息才会被认为已经收到并且从队列中删除。如果没有收到,那么协商器会在通道20关 闭之前尝试着重新投递消息。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5 EM.run do
6 EM.add_timer(1) do
7 EM.stop
8 end
9 end
10 end
11
12 def subscribe_to_queue
13
14 exchange = MQ.fanout('my-fanout-exchange-with-acks')
15 queue = MQ.queue('my-fanout-queue-with-acks')
16
17 queue.bind(exchange).subscribe(:ack => true) do |header, body|
18 yield header, body
19 header.ack unless body == 'Cruel'
20 end
21
22 end
23
24 def send_to_exchange(message)
25
26 exchange = MQ.fanout('my-fanout-exchange-with-acks')
27 exchange.publish message
28
29 end
30
31 subscribe_to_queue do |header, body|
32 p "I received a message: #{body}"
33 end
34
35 send_to_exchange 'Hello'
36 send_to_exchange 'Cruel'
37 send_to_exchange 'World'
38
39 event_loop.join
40
41 __END__
42
43 First run:
44
45 "I received a message: Hello"
46 "I received a message: Cruel"
47 "I received a message: World"
48
49 Second run:
50
51 "I received a message: Cruel"
52 "I received a message: Hello"
53 "I received a message: Cruel"
54 "I received a message: World"
55
56 ... and so forth

消息生产者可以选择是否在消息被发送到交换器并且还未投递到队列(没有绑定器存在)和/或没有消费者能够立即处理的时候得到通知。通过设置消息的 mandatory和/或immediate属性为真,这些投递保障机制的能力得到了强化。

现在在本文例子中使用的Ruby AMQP API还不完全支持这些标志位。但是,在GitHub上已经有两个patch21, 22展 示了完全支持之后的情况。

此外,一个生产者可以设置消息的persistent属性为真。这样一来,协商器将会尝试将这些消息存储在一个稳定的位置,直到协商器崩溃。当然, 这些消息肯定不会被投递到非持久的队列中。

2.5 拥塞控制

在给出的例子中,对消息的使用永远看做是一个订阅。那么考虑到了拥塞控制吗?规范制定了QoS23特 性,限制了通过一个通道发送到一个消费者的消息总量。很不幸的是,这个特性在当前RabbitMQ的版本中还不支持(计划在1.6),但是在原则上是应该 被AMQP API支持的。

作为一个替代方案,客户端可以选择从队列中取出消息而不是通过订阅。当使用这种方法的时候,拥塞控制可以手动地实现。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5 EM.run do
6 EM.add_timer(5) do
7 EM.stop
8 end
9 end
10 end
11
12 def subscribe_to_queue
13
14 exchange = MQ.fanout('my-fanout-exchange')
15 queue = MQ.queue('my-fanout-queue')
16
17 queue.bind(exchange).pop do |header, body|
18 yield header, body
19 end
20
21 EM.add_periodic_timer(0.25) do
22 queue.pop
23 end
24
25 end
26
27 def send_to_exchange(message)
28
29 exchange = MQ.fanout('my-fanout-exchange')
30 exchange.publish message
31
32 end
33
34 received = 0
35
36 subscribe_to_queue do |header, body|
37 p "I received a message: #{body}"
38 end
39
40 send_to_exchange 'Hello'
41 send_to_exchange 'World'
42
43 event_loop.join
一个模型样例

想像一下你想创建一个普通的聊天应用,那么应该有以下几个基本特性:

- 聊天 - 两个用户应该可以相互发送消息。

- 一个好友系统 - 用户能够控制谁给他发送消息。

我们假设在协商器上有两种消费者:好友服务器和聊天客户端。

3.1 成为好友

为了成为Bob的好友,Alice首先得发送一个消息给fanout交换器iends,我们假设这个交换器是访问受限24的: 普通用户不能够将队列绑定到它。在这个消息中,Alice表示想和Bob成为朋友。

在协商器上有大量的聊天服务器,从绑定到friends交换器的一个单一持久队列中持续地取出消息。这个队列的名字是例如 friends.298F2DBC6865-4225-8A73-8FF6175D396D这样的,这难以猜测的名字能够阻止聊天客户端直接取出信息 - 记住:不知道队列的名字,就不能设置订阅。

当一个聊天服务器收到Alice的消息(只有一个会得到这个消息,虽然它们都是从同一个队列中获取),决定这个请求是否有效,然后将其(也许是做过 一些调整或者参数化)发送到默认交换器(可以是直接的或者持久的)。它使用另外一个只有Bob知道的routing key来投递。当Bob上线的时候(或者一个服务器做了这件事),他会声明一个队列,这个队列的名字就是之前的routing key(记住在虚拟主机上的默认绑定器是将所有的队列和默认交换器绑定在一起)。

Bob的聊天客户端现在询问Bob是否想和Alice成为朋友。在她的请求消息中,有一个特殊的属性叫做reply-to - 这个属性包括了一个持久和排他的好友队列的名字,这个队列是Alice声明将用于和Bob的未来聊天用。如果Bob想和Alice成为朋友,他会使用这个 队列的名字作为routing key,发送一个消息到默认交换器。他也会需要声明一个持久和排他的好友队列,将其名字设为reply-to的值。

例如:Alice和Bob的好友队列的名字是B5725C4A-6621463E-AAF1-8222AA3AD601。Bob发送给Alice的 消息的routing-key的值便是这个名字,也是Alice发送给Bob的消息中reply-to的值。

因为好友队列是持久的,因此发送到消息在用户离线的时候也不会丢失。当用户上线之后,所有的在好友队列的消息将会发送到用户,然后才去获取新的消 息。

当Bob不再想和Alice成为好友,他可以简单地删除掉为Alice声明的好友队列。在她使用mandatory标志位发送消息的时 候,Alice也会注意到Bob已经不再想是她的好友。因为交换器会将她的消息认为不可投递而返回。

仍未提及的事情

仍然有很多本文没有介绍的东西,例如事务语义,关于信息的重路由,header交换器的规范以及不同AMQP规范之间的差异 - 尤其是在1.0版本之前的模型改变。为了简介起见,一个聊天的模型同样也被略过了。

这里也没有介绍了整个系统的管理,因为还不清楚AMQP和RabbitMQ将会走向何方。现在有一个课题,关于在保留的amq命名空间中可用的交换 器,它能获取协商器所有的日志信息。但是,能够列出现在已经声明的组件和已连接的用户的工具是用rabbitmqctl命令行接口而不是AMQ实体来实现 的。

1 require 'rubygems'
2 require 'mq'
3
4 PATH_TO_RABBITMQCTL = '/usr/local/sbin/rabbitmqctl'
5
6 event_loop = Thread.new { EM.run }
7
8 def subscribe_to_logger
9
10 random_name = (0...50).map{ ('a'..'z').to_a[rand(26)] }.join
11
12 exchange = MQ.topic('amq.rabbitmq.log')
13 queue = MQ.queue(random_name, :autodelete => true, :exclusive => true)
14 binding = queue.bind(exchange, :key => '#')
15
16 binding.subscribe do |header, body|
17 body.split("\n").each do |message|
18 yield header, message
19 end
20 end
21
22 end
23
24 def exchange_info(vhost = '/')
25 info :exchange, vhost, %w(name type durable auto_delete arguments)
26 end
27
28 def queue_info(vhost = '/')
29 info :queue, vhost, %w(name durable auto_delete arguments node messages_ready messages_unacknowledged
messages_uncommitted messages acks_uncommitted consumers transactions memory)
30 end
31
32 def binding_info(vhost = '/')
33 info :binding, vhost
34 end
35
36 def connection_info
37 info :exchange, nil, %w(node address port peer_address peer_port state channels user vhost timeout
frame_max recv_oct recv_cnt send_oct send_cnt send_pend)
38 end
39
40 def info(about, vhost = nil, items = [])
41
42 column_length = 20
43
44 puts "#{about} info\n"
45
46 cmd = "#{PATH_TO_RABBITMQCTL} list_#{about}s"
47 cmd << " -p #{vhost}" if vhost
48 cmd << " #{items.join(' ')} 2>&1"
49
50 pipe = IO.popen(cmd)
51
52 pipe.readlines.map { |line| line.chomp.split("\t").map { |item| item.ljust(column_length)[0,
column_length] } }.slice(1..-2).each do |exchange|
53 print exchange.join(' ') + "\n"
54 end
55
56 end
57
58 subscribe_to_logger do |message|
59 p "RabbitMQ logger: #{message}"
60 end
61
62 %w(connection exchange queue binding).each do |method|
63 self.send "#{method}_info".to_sym
64 end
65
66 event_loop.join

必须提及的是,已经有一些使用AMQP(或者RabbitMQ)的分布式架构。这些架构(例如Nanite25或 者Lizzy26) 在AMQP的顶部引入了一些抽象层,这样简化了一些操作,例如cluster中在Ruby客户端之间工作的分配。

热点排行