zeroMQ初体验-3.分而治之模式(push/pull)
push/pull模式:
模型描述:
1.上游(任务发布)
2.工人(中间,具体工作)
3.下游(信号采集或者工作结果收集)
上游代码:
import zmqimport randomimport timecontext = zmq.Context()# Socket to send messages onsender = context.socket(zmq.PUSH)sender.bind("tcp://*:5557")print "Press Enter when the workers are ready: "_ = raw_input()print "Sending tasks to workers..."# The first message is "0" and signals start of batchsender.send('0')# Initialize random number generatorrandom.seed()# Send 100 taskstotal_msec = 0for task_nbr in range(100): # Random workload from 1 to 100 msecs workload = random.randint(1, 100) total_msec += workload sender.send(str(workload))print "Total expected cost: %s msec" % total_msec
import sysimport timeimport zmqcontext = zmq.Context()# Socket to receive messages onreceiver = context.socket(zmq.PULL)receiver.connect("tcp://localhost:5557")# Socket to send messages tosender = context.socket(zmq.PUSH)sender.connect("tcp://localhost:5558")# Process tasks foreverwhile True: s = receiver.recv() # Simple progress indicator for the viewer sys.stdout.write('.') sys.stdout.flush() # Do the work time.sleep(int(s)*0.001) # Send results to sink sender.send('')
import sysimport timeimport zmqcontext = zmq.Context()# Socket to receive messages onreceiver = context.socket(zmq.PULL)receiver.bind("tcp://*:5558")# Wait for start of batchs = receiver.recv()# Start our clock nowtstart = time.time()# Process 100 confirmationstotal_msec = 0for task_nbr in range(100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.')# Calculate and report duration of batchtend = time.time()print "Total elapsed time: %d msec" % ((tend-tstart)*1000)