Apache curator-recipes代码实例
?
? ? 对于PathChildrenCache.getCurrentData()将从获取本地的数据列表,而不是触发一次zookeeper.getChildren(),因此为"Cache".
3. Queues:分布式队列
? ? 分布式队列的基本特性,就是"生产者"或"消费者"跨越多个进程,且在此种环境中需要确保队列的push/poll的有序性.
? ? zookeeper本身并没有提供分布式队列的实现,只是recipse根据zookeeper的watcher和具有version标记的node,来间接的实现分布式queue..内部机制如下:
? ? --> 如果是消费者(QueueConsumer),会创建一个类似于PathChildrenCache的实例用于监听queuePath下的子节点变更事件(单独的线程中).同时consumer处于阻塞状态,当有子节点变更事件时会被唤醒(包括创建子节点/删除子节点等);
? ? --> 此时consumer获取子节点列表,并将每个节点信息封装成Runnable任务单元,提交到线程池中.?
? ? --> Runnable中并执行QueueConsumer.consumer(.)方法.
? ? --> 如果是生产者,则发布一个message时recipes将会在queuePath下创建一个PERSISTENT_SEQUENTIAL节点,同时保存message数据. 消费时,也将按照节点的顺序进行.
?
? ? 发布消息并没有太多的问题仅仅是创建一个"有序"节点即可..但是对于消费者,那么需要考虑的因数就很多,比如:1) 多个消费者同时消费时,需要确保消息不能重复且有序 2) 消息消费时,如果网络异常,怎么办?
? ? 对于QistributedQueue中,对上述问题的解决办法也非常粗糙,内部机制如下:
? ? --> 如果使用了消费担保(即指定了lockPath),在调用consumer方法之前,首先创建一个临时节点(lockPath + 子节点),如果创建此临时节点失败也就意味着此消息被其他消费者,则忽略此消息.
? ? --> 然后从子节点中获取数据,如果获取失败,意味着此节点已经被其他消费者删除,则忽略此消息.
? ? --> 然后调用consumer()方法,如果此方法抛出异常,消息将会再次添加到队列中(删除旧的子节点,创建一个新的子节点).如果消费正常,则删除节点.
? ? --> 无论成败,则删除临时节点(lockPath + 子节点)
?
? ? --> 如果没有使用消费担保,则首先获取子节点的数据(getData),然后立即删除此子节点
? ? -->调用consumer()方法.
?
? ? 问题:
? ? 1) 源代码中,在获取子节点的data时(getData)并没有指定version校验,我深度怀疑,当消息被并发的消费时,是否有重复的可能.
? ? 2) 因为zookeeper本身获取一个节点的子节点列表时,将得到所有的子节点,那么就意味任何一个消费者中每次Event触发,都将获取整个childrenList,如果此列表很庞大,性能问题将是非常突出的.
? ? 3) 在消费担保的情况下,每消费一个消息,就会做"创建临时节点""删除临时节点""获取数据"等大量工作,如果有多个消费者同时运行,那么对zk的操作次数将会倍数级增加,性能问题以及数据安全性问题,也是非常值得考虑的.
? ? 4) zookeeper已经限定了每个节点的数据尺寸,以及每个节点下子节点的个数,这对于实现规模性的分布式队列,确实不是良好的选择.
? ? 5) 当队列中,消费者和生产者的速率不均衡时,问题将会更加严重,比如:快速的生产者 + 慢速的消费者;因为此时Event事件将会非常频繁(网络消耗严重),对于消费者而言,线程上线切换锁带来的性能消耗,不可忽视.
?
? ? 最终,我们需要明确使用zookeeper作为分布式队列的场景: 1) 队列深度较小 2) 生产者和消费者的速度都非常的低且消费者消费速度更快,即单位时间内产生的消息很少. ?3) 建议只有一个消费者.
? ? 比如: 一个数据分析系统,这个系统中有多个定时任务,当任务即将触发时,向此分布式队列中提交一个消息,消息中包含任务的ID,那么谁消费了此任务ID,那么谁就负责执行此任务.我们间接的实现了分布式任务单点运行的需求.
? ? 还有一种场景,比如实现"排它重入锁"也可以使用DistributedQueue作为底层支撑.
?
? ? 消费者:DistributedQueueConusumer.java
? ? 生产者: DistributedQueueProducer.java
? ? SharedCount中也需要关注"链接异常"的问题,我们可以通过注册listener的方式,当链接重连成功后,重新获取新的值.
? ? 在"计数器"中,还提供了DistributedAtomicInteger,DistributedAtomicLong两个分布式自增计数器.
?
5. Locks?
? ? 使用zookeeper作为分布式锁,是一个普遍的需求,如下展示如何设计一个分布式重入锁,其中一个path表示一个锁资源.
LeaderLatch latch = new LeaderLatch(client,"/task/leader");//让listener在单独的线程池中运行Executor executor = Executors.newCachedThreadPool();//每个listener都用来执行角色变换的事件处理.LeaderLatchListener latchListener = new LeaderLatchListener() {@Overridepublic void isLeader() {System.out.println("I am leader...");}@Overridepublic void notLeader() {System.out.println("I am not leader...");}};latch.addListener(latchListener,executor);latch.start();latch.await();//等待leader角色.//在await退出之后,你需要通过其他手段继续关注leader状态变更.System.out.println(latch.hasLeadership());Thread.sleep(5000);latch.close();Thread.sleep(2000);client.close();?
??
? ? 到此为止,我们已经把curator-recipse的大部分API都介绍完毕了,希望我们的zookeeper开发之旅更加愉快.
? ??