python-memcached的线程安全问题
答案是肯定的,前提你在使用Python 2.4+和python-memcached 1.36+
为什么我们需要线程安全的memcached client,因为我们的实际应用一般是多线程的模型,例如cherrypy、twisted,如果python-memcached不是线程安全的话,引起的问题不仅仅是并发修改共享变量这么简单,是外部socket链接的数据流的混乱
python-memcached怎么实现线程安全的呢?查看源代码看到
try: # Only exists in Python 2.4+ from threading import localexcept ImportError: # TODO: add the pure-python local implementation class local(object): pass
?class Client(local):很取巧的让Client类继承threading.local,也就是Client里面的每一个属性都是跟当前线程绑定的。实现虽然不太优雅,但是很实在
但是别以为这样就可以随便在线程里面用python-memcached了,因为这种thread local的做法,你的应用必须要使用thread pool的模式,而不能不停创建销毁thread,因为每一个新线程的创建,对于就会使用一个全新的Client,也就是一个全新的socket链接,如果不停打开创建销毁thread的话,就会导致不停的创建销毁socket链接,导致性能大量下降。幸好,无论是cherrypy还是twisted,都是使用了thread pool的模式。
?
但是不幸的是gevent不是thread pool的模式,这导致不停的创建销毁socket链接。
def _patch(self): key = object.__getattribute__(self, '_local__key') d = current_thread().__dict__.get(key) if d is None: d = {} current_thread().__dict__[key] = d object.__setattr__(self, '__dict__', d) # we have a new instance dict, so call out __init__ if we have # one cls = type(self) if cls.__init__ is not object.__init__: args, kw = object.__getattribute__(self, '_local__args') cls.__init__(self, *args, **kw) else: object.__setattr__(self, '__dict__', d)?
cls是当前对象的class,object是基类class,我打印了清单:
new :? <class 'duitang.memcache.Client'> | <type 'object'>
--------------------
cached: <class 'gevent.local.local'> | <slot wrapper '__init__' of 'object' objects> | <slot wrapper '__init__' of 'object' objects>
yunpeng@yunpeng-duitang:~/test2$ netstat -an | grep 11211
tcp??????? 0????? 0 0.0.0.0:11211?????????? 0.0.0.0:*?????????????? LISTEN????
tcp?????? 31????? 0 127.0.0.1:46835???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46832???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46831???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46829???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0???? 14 127.0.0.1:46833???????? 127.0.0.1:11211???????? ESTABLISHED
tcp?????? 31????? 0 127.0.0.1:46828???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46834???????? 127.0.0.1:11211???????? ESTABLISHED
tcp?????? 31????? 0 127.0.0.1:46830???????? 127.0.0.1:11211???????? ESTABLISHED
当python进程退出socket会被自动关闭。
yunpeng@yunpeng-duitang:/duitang/dist/app/trunk/duitang$ netstat -an | grep 11211
tcp??????? 0????? 0 0.0.0.0:11211?????????? 0.0.0.0:*?????????????? LISTEN????
tcp??????? 0????? 0 127.0.0.1:44900???????? 127.0.0.1:11211???????? TIME_WAIT?
但是很明显,采用threading locale这种方式来保证线程安全存在一些缺陷:
1.要求web server采用thread pool的方式,如果thread每次执行完之后就结束了,这会导致不停的创建销毁socket链接。
2.要求使用python thread locale的语义,但不幸的是python的thread语义很容易被改变,gevent就可以直接把python的一个thread转换成greenlet。
gevent的monkey提供的patch方法
patch_all() 调用所有的monkey patch
???
patch_os() os.fork()替换成gevent.fork
patch_select(aggressive=False) `select.select`替换成`gevent.select.select`
patch_socket(dns=True, aggressive=True)? 标准的socket object 替换成 gevent's cooperative sockets.
patch_thread(threading=True, _threading_local=True) thread` module 替换成 gevent's thread
patch_time()? 把标准的`time.sleep` 替换成`gevent.sleep`.
gunicorn如何使用gevent?
代码:
/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/ggevent.py
# -*- coding: utf-8 -## This file is part of gunicorn released under the MIT license.# See the NOTICE for more information.from __future__ import with_statementimport osimport sysfrom datetime import datetime# workaround on osx, disable kqueueif sys.platform == "darwin": os.environ['EVENT_NOKQUEUE'] = "1"try: import geventexcept ImportError: raise RuntimeError("You need gevent installed to use this worker.")from gevent.pool import Poolfrom gevent.server import StreamServerfrom gevent import pywsgiimport gunicornfrom gunicorn.workers.async import AsyncWorkerVERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)BASE_WSGI_ENV = { 'GATEWAY_INTERFACE': 'CGI/1.1', 'SERVER_SOFTWARE': VERSION, 'SCRIPT_NAME': '', 'wsgi.version': (1, 0), 'wsgi.multithread': False, 'wsgi.multiprocess': False, 'wsgi.run_once': False}class GeventWorker(AsyncWorker): server_class = None wsgi_handler = None @classmethod def setup(cls): from gevent import monkey monkey.noisy = False monkey.patch_all() def timeout_ctx(self): return gevent.Timeout(self.cfg.keepalive, False) def run(self): self.socket.setblocking(1) pool = Pool(self.worker_connections) if self.server_class is not None: server = self.server_class( self.socket, application=self.wsgi, spawn=pool, log=self.log, handler_class=self.wsgi_handler) else: server = StreamServer(self.socket, handle=self.handle, spawn=pool) server.start() pid = os.getpid() try: while self.alive: self.notify() if pid == os.getpid() and self.ppid != os.getppid(): self.log.info("Parent changed, shutting down: %s", self) break gevent.sleep(1.0) except KeyboardInterrupt: pass try: # Try to stop connections until timeout self.notify() server.stop(timeout=self.cfg.graceful_timeout) except: pass def handle_request(self, *args): try: super(GeventWorker, self).handle_request(*args) except gevent.GreenletExit: pass if gevent.version_info[0] == 0: def init_process(self): #gevent 0.13 and older doesn't reinitialize dns for us after forking #here's the workaround import gevent.core gevent.core.dns_shutdown(fail_requests=1) gevent.core.dns_init() super(GeventWorker, self).init_process()class GeventResponse(object): status = None headers = None response_length = None def __init__(self, status, headers, clength): self.status = status self.headers = headers self.response_length = clengthclass PyWSGIHandler(pywsgi.WSGIHandler): def log_request(self): start = datetime.fromtimestamp(self.time_start) finish = datetime.fromtimestamp(self.time_finish) response_time = finish - start resp = GeventResponse(self.status, self.response_headers, self.response_length) req_headers = [h.split(":", 1) for h in self.headers.headers] self.server.log.access(resp, req_headers, self.environ, response_time) def get_environ(self): env = super(PyWSGIHandler, self).get_environ() env['gunicorn.sock'] = self.socket env['RAW_URI'] = self.path return envclass PyWSGIServer(pywsgi.WSGIServer): base_env = BASE_WSGI_ENVclass GeventPyWSGIWorker(GeventWorker): "The Gevent StreamServer based workers." server_class = PyWSGIServer wsgi_handler = PyWSGIHandler?
测试:ab -n100 http://7199.t.duitang.com:7199/cache/
[admin@server2 duitang]$ netstat -an| grep 11211 |wc -l
306
总结:使用了gevent之后thread local有太多不可控.
gevent代码:/duitang/dist/sys/python/lib/python2.7/site-packages/gevent
gevent+django:http://www.slideshare.net/mahendram/scaling-django-with-gevent