JGroups (4)
4 Protocol Stack
4.1 Transport protocols
??? Transport protocols是指协议栈中最底层的协议,它们负责发送和接收消息。JGgroups提供了以下几种transport protocols。
4.1.1 UDP
??? JGroups中的UDP协议使用IP multicast向集群发送消息,使用UDP datagram向单个的成员发送unicast消息。启动后会打开两个socket,分别是multicast socket和unicast socket。Channel的地址是unicast socket的地址和端口号。UDP通常用于集群中的成员分布于LAN内的情况。
??? 如果使用UDP和PING做为协议栈的底层协议,那么JGroups会使用IP multicast发现集群中的成员,以及向集群发送发送消息。然而,如果IP multicast在子网间被禁用,那么可以设置UDP的ip_mcast属性为false,以便指定UDP使用多个unicast messages向集群发送消息,而不是使用multicast message。此外,还需要设置PING的gossip_系列属性,以便指定PING使用GossipRouter来发现集群中的其它成员。需要注意的是,对GossipRouter的依赖可能会导致single point of failure,而且系统的可伸缩性也比较差。
??? 在启动任何成员之前,首先要启动GossipRouter(否则成员需要处理MergeView消息用于合并subgroup的状态),例如:
java org.jgroups.stack.GossipRouter -port 5555 -bindaddress localhost
??? UDP和PING的配置如下:
<UDP ip_mcast="false" /><PING gossip_host="localhost" gossip_port="5555" gossip_refresh="15000" timeout="2000" num_initial_members="3"/>
4.1.2 TCP
??? 当集群中的成员分布于WAN时(路由器会丢弃IP multicast报文),TCP可能是唯一可用的传输协议。当使用TCP作为传输协议是,可用的发现协议有:
??? 以下是个使用TCP和TCPPING的例子:
<TCP start_port="7800" /> +<TCPPING initial_hosts="HostA[7800],HostB[7800]" port_range="5" timeout="3000" num_initial_members="3" />
??? 使用TCPPING的优点是不需要额外GossipRouters,而是从集群的成员中选择那些已知的成员,例如以上例子中的HostA[7800]和 HostB[7800],并从这些成员处得到其它成员的信息。TCP协议的start_port="7800"属性指定了选择7800作为端口号,如果该端口号被占用,那么尝试下一个(7801)端口号,直到找到可用的端口号。TCPPING协议会尝试连接HostA和HostB,连接的端口号的范围是从 7800到7800 + port_range -1(在以上例子中是7804)。
??? 以下是个使用TCP和TCPGOSSIP的例子:
<TCP /><TCPGOSSIP initial_hosts="localhost[5555],localhost[5556]" gossip_refresh_rate="10000" num_initial_members="3" />
??? 以上例子中,initial_hosts 属性用于指定GossipRouter的地址和端口号。GossipRouter需要先于集群中的成员启动。
4.2 Reliable Message
4.2.1 pbcast.NAKACK
??? NAKACK协议保证了向集群的所有成员发送的消息的传输可靠性,以及消息的FIFO顺序。消息传输的可靠性是指发送的消息不会丢失。此外发送者将发送的消息编号,如果接收者没有收到特定编号的消息,那么发送者会收到重新发送的请求。FIFO顺序是指接收者会以消息发送的顺序接收消息。以下是部分 NAKACK协议的属性:
4.2.2 UNICAST
??? UNICAST协议保证了单独的发送者和接收者之间传递的消息的传输可靠性,以及消息的FIFO顺序。在可靠的传输协议(例如TCP)之上, UNICAST协议并不是必须的。然而,UNICAST可以防止相同发送者上的并发的消息传递。除非希望如此,否则应该在协议栈中包含UNICAST协议。
??? 以下是部分UNICAST协议的属性:
4.3 Failure Detection
??? Failure detection 的目的是检测集群内的成员是否崩溃。当某个成员被怀疑崩溃时,那么会向集群中的每个成员发送SUSPECT 消息,以进行通知。需要注意的时是,Failure detection 并不负责从集群中清除崩溃的成员(实际上是由GMS协议负责),它只是负责发现可能已经崩溃的成员,并通知集群中的其它成员。
4.3.1 FD
??? FD协议基于心跳消息。如果在timeout指定的毫秒内没有接收到某个成员的应答,并且在尝试了max_tries 指定的次数后,那么这个成员会被标记为可疑,并将被GMS协议从集群中清除。
每个成员向其右侧的邻居(当前view的成员列表中,该成员的下一个成员。列表中最后的成员的右侧邻居是列表的第一个成员)发送带有 FdHeader.HEARTBEAT头的消息。当邻居收到这个消息后,它会应答带有FdHeader.HEARTBEAT_ACK头的消息。每当收到应答时,FD协议的last_ack属性会被更新成当前的时间,num_tries也会设置为0。如果当前时间和last_ack之差大于timeout指定的毫秒数,那么FD协议会最多尝试max_tries 指定的次数,如果仍然没有收到应答,那么这个邻居会被标记为可疑。
4.3.2 FD_SOCK
??? FD_SOCK协议基于一个有TCP sockets组成的环,即集群中的每个成员都通过TCP socket连接到右侧的邻居(当前view的成员列表中,该成员的下一个成员。列表中最后的成员的右侧邻居是列表的第一个成员)。当某个成员检测到它的邻居非正常地关闭了TCP socket之后,那么它会把这个邻居标记为可疑。
4.4 Miscellaneous
4.4.1 STABLE
??? 为了响应可能的重传输请求,集群中的成员需要保存一定数量的消息直到它确定这些消息已经被集群中所有的成员成功地接收。对于某个消息M来说, message stability 意味着M已经被集群中所有的成员接收。STABLE协议周期性地(或者收到消息的字节数达到的配置的上限)向集群中的所有成员发送stable messages,这些消息中包含了特定成员收到的最大序号。当集群中的每个成员都收到了其它所有成员的stable messages后,可以计算出目前每个成员已经收到的消息的最小序号,接下来这个序号被发送到集群中每个成员,最后每个成员会从自己的 retransmission tables中删除小于这个最小序号的最小消息。需要注意的是,如果没有在协议栈中配置STABLE,那么可能会导致内存耗尽。以下是个配置STABLE 协议的例子:
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="1000000"/>
??? 以上例子中stability_delay属性指定,在发送消息前等待1~1000毫秒,以避免所有的成员同时发送消息。 desired_avg_gossip属性指定发送stable messages的周期,单位是毫秒,如果是0,那么禁用周期检查。max_bytes指定了在发送stable message消息前,接收到的消息的最大字节数。
4.4.2 pbcast.FLUSH
??? 4.2 Reliable Message中介绍了保证消息可靠传输的协议,但是在某些情况下,这种保证是不够的,考虑以下情况:
集群中某个成员A向集群发送消息M1,此时A的当前View是V1={A,B,C},也就是说A认为M1将发送到A(如果Channel.LOCAL选项是true)、B和C。正在此时,D也加入到集群中,那么D可能会,也可能不会收到M1。
通过在协议栈中配置FLUSH协议可以保证:
??? 通常,在以下两种情况下需要使用FLUSH协议:
State transfer 当某个成员请求状态传递时,它通知其它成员停止发送消息并等待响应。接下来coordinator会将状态发送给这个成员。当该成员接收到状态后,它通知其它成员可以继续发送消息。View changes 在安装新的view时,所有发送到V1的消息都会被传递到V1。??? FLUSH协议通常在STATE_TRANSFER、STATE_TRANSFER 或者 GMS 协议之上。此外需要注意的时,FLUSH协议必须是协议栈的最上层协议。除了JGroups自动处理FLUSH之外,JGroups也允许开发人员显式调用 Channel.startFlush()方法发起flush。在Channel.startFlush()方法返回后,在调用 Channel.stopFlush()方法之前,可以保证集群中的所有成员不能发送消息,而且Channel.startFlush()方法调用前发送的消息都会被所有成员接收。在调用了Channel.stopFlush()方法之后,集群中的所有成员可以继续发送消息。
??? 如果将Channel.BLOCK属性设置为true(缺省是false),那么可以在flush阶段得到通知。如果采用poll方式,那么在某个成员调用Channel.startFlush()方法后,其它成员会收到EVENT.BLOCK消息,这些成员应该发送EVENT.BLOCK_OK消息进行响应。如果采用push方式,那么channel上注册的MembershipListener的block()方法会被调用。
4.4.3 MERGE2
??? 假设由于某种原因(例如switch故障),某个集群{A,B,C,D,E},分裂为两个子集群{A,B,C} 和{D,E},A、B和C可以互相ping通,D和E可以互相ping通,但是A、B和C却ping不通D和E。在这种情况下,由于两个子集群独立工作,会导致这两个子集群的状态并不相同。当故障解除后,MERGE2协议会通知集群中的成员,这两个子集群将合并成一个。
??? 至于如何处理状态的合并,需要应用程序自己决定,这是因为JGroups并不了解集群的状态。需要注意的是,用于合并的状态的代码应该在单独的线程中执行。一种简单的处理方式是对于原来是主子集群中的成员不做任何处理,对于其它的成员则丢弃当前状态,从合并后集群的coordinator处重新获得状态。