首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

zeroMQ初体验-3.分而治之方式(push/pull)

2012-08-30 
zeroMQ初体验-3.分而治之模式(push/pull)push/pull模式:模型描述:1.上游(任务发布)2.工人(中间,具体工作)3

zeroMQ初体验-3.分而治之模式(push/pull)
push/pull模式:



模型描述:
1.上游(任务发布)
2.工人(中间,具体工作)
3.下游(信号采集或者工作结果收集)

上游代码:

import zmqimport randomimport timecontext = zmq.Context()# Socket to send messages onsender = context.socket(zmq.PUSH)sender.bind("tcp://*:5557")print "Press Enter when the workers are ready: "_ = raw_input()print "Sending tasks to workers..."# The first message is "0" and signals start of batchsender.send('0')# Initialize random number generatorrandom.seed()# Send 100 taskstotal_msec = 0for task_nbr in range(100):    # Random workload from 1 to 100 msecs    workload = random.randint(1, 100)    total_msec += workload    sender.send(str(workload))print "Total expected cost: %s msec" % total_msec


工作代码:
import sysimport timeimport zmqcontext = zmq.Context()# Socket to receive messages onreceiver = context.socket(zmq.PULL)receiver.connect("tcp://localhost:5557")# Socket to send messages tosender = context.socket(zmq.PUSH)sender.connect("tcp://localhost:5558")# Process tasks foreverwhile True:    s = receiver.recv()    # Simple progress indicator for the viewer    sys.stdout.write('.')    sys.stdout.flush()    # Do the work    time.sleep(int(s)*0.001)    # Send results to sink    sender.send('')


下游代码:
import sysimport timeimport zmqcontext = zmq.Context()# Socket to receive messages onreceiver = context.socket(zmq.PULL)receiver.bind("tcp://*:5558")# Wait for start of batchs = receiver.recv()# Start our clock nowtstart = time.time()# Process 100 confirmationstotal_msec = 0for task_nbr in range(100):    s = receiver.recv()    if task_nbr % 10 == 0:        sys.stdout.write(':')    else:        sys.stdout.write('.')# Calculate and report duration of batchtend = time.time()print "Total elapsed time: %d msec" % ((tend-tstart)*1000)



注意点:
这种模式与pub/sub模式一样都是单向的,区别有两点:
1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到

这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的那个"堵塞问题"的一个解决策略吧)

由上面的模型图可以看出,这是一个N:N的模式,在1:N的情况下,各消费者并不是平均消费的,而在N:1的情况下,则有所不同,如下图:



这种模式主要关注点在于,可以扩展中间worker,来到达并发的目的。

(未完待续)

热点排行