结合nova-network分析openstack mq机制
? ? server = service.Service.create(binary='nova-network') ? #create是一个类方法,该类的返回结果就实例化了一个Service对象,create方法进行了Service对象的初始化? ? service.serve(server) ? ?#启动该服务,服务启动后做的工作主要在这个方法中实现,该方法位于nova/service.py中? ? service.wait() ? ?#主线程等待? nova/service.py 的serve方法:??
_launcher = None?def serve(*servers):? ? global _launcher ?#定义_launcher为全局变量? ? if not _launcher: ??? ? ? ? _launcher = Launcher() ?#实例化launcher,serveice操作的一些方法,对service方法的封装,实现了多个service的操作? ? for server in servers:? ? ? ? _launcher.launch_server(server) 启动server? ? nova/service.py的launch_server方法:gt = eventlet.spawn(self.run_server, server) ? #创建一个绿色线程来执行run_server方法,并给该方法传入server对象self._services.append(gt) ? ?#把返回的线程对象添加到_services数组中ps:eventlet.spawn(func,*args,**kw)方法启动一个绿色线程(green thread)来执行func函数,args和kw为传入的参数,绿色线程用来处理与网络相关的一些工作,与普通线程相比,它的消耗非常低,每一个网络连接至少一个绿色线程;同时,绿色线程执行的是协同式调度,而非普通线程的抢占式调度,因此无需对共享的数据访问进行加锁。spawn会返回线程对象,通过该对象,也可以获取func执行的结果;spawn_n则没有返回值。? ? nova/service.py的run_server方法:server.start()server.wait()? ??nova/service.py的start方法:
????def start(self):? ? ? ? vcs_string = version.version_string_with_vcs()? ? ? ? LOG.audit(_('Starting %(topic)s node (version %(vcs_string)s)'),? ? ? ? ? ? ? ? ? {'topic': self.topic, 'vcs_string': vcs_string})? ? ? ? utils.cleanup_file_locks() ? ?#lockfile是用来不同平台进程通信时的同步锁,该方法删除进程执行失败后清理这些lockfile? ? ? ? self.manager.init_host() ? ? #network/manager.py下init_host方法初始化一些网络设备,比如说创建网桥等等? ? ? ? self.model_disconnected = False? ? ? ? ?.......?? ? ? ? self.conn = rpc.create_connection(new=True) ? ? #创建连接,默认是kombo实现? ? ? ? LOG.debug(_("Creating Consumer connection for Service %s") %? ? ? ? ? ? ? ? ? self.topic)?? ? ? ? # Share this same connection for these Consumers? ? ? ? self.conn.create_consumer(self.topic, self, fanout=False) ? ? #创建以topic为路由键的消费者?? ? ? ? node_topic = '%s.%s' % (self.topic, self.host)? ? ? ? self.conn.create_consumer(node_topic, self, fanout=False) ? #创建以node.topic为路由键的消费者,exchange方式为topic的?? ? ? ? self.conn.create_consumer(self.topic, self, fanout=True) ? ?#创建一个fanout方式转发的消费者,广播的形式,消息转发速度最快? ? ? ? #在创建consumer时,declare_consumer方法会将创建的声明的consumer添加到consumers数组中(self.consumers.append(consumer)),指定了消费者的回调函数(ProxyCallback,解析? ? ? ? #消息);在下面的consume_in_thread中执行? ? ? ? #self.consume ?()方法,consume()方法会用所有消费者去做消费消息动作,该动作在iterconsume方法中,_consume()方法做消费消息的动作:? ? ? ? #self.connection.drain_events(timeout=timeout) 会等待来自服务器的消息通知? ? ? ? # Consume from all consumers in a thread? ? ? ? self.conn.consume_in_thread() ? ?#在一个绿色线程中消费消息? ? ? ? #下面是两个定时器,报告服务的状态等信息? ? ? ? if self.report_interval:? ? ? ? ? ? pulse = utils.LoopingCall(self.report_state)? ? ? ? ? ? pulse.start(interval=self.report_interval, now=False)? ? ? ? ? ? self.timers.append(pulse)?? ? ? ? if self.periodic_interval:? ? ? ? ? ? periodic = utils.LoopingCall(self.periodic_tasks)? ? ? ? ? ? periodic.start(interval=self.periodic_interval, now=False)? ? ? ? ? ? self.timers.append(periodic)