首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > perl python >

python-memcached的线程安全有关问题

2012-10-31 
python-memcached的线程安全问题答案是肯定的,前提你在使用Python 2.4+和python-memcached 1.36+为什么我

python-memcached的线程安全问题

答案是肯定的,前提你在使用Python 2.4+和python-memcached 1.36+
为什么我们需要线程安全的memcached client,因为我们的实际应用一般是多线程的模型,例如cherrypy、twisted,如果python-memcached不是线程安全的话,引起的问题不仅仅是并发修改共享变量这么简单,是外部socket链接的数据流的混乱
python-memcached怎么实现线程安全的呢?查看源代码看到

try:    # Only exists in Python 2.4+    from threading import localexcept ImportError:    # TODO:  add the pure-python local implementation    class local(object):        pass

?class Client(local):很取巧的让Client类继承threading.local,也就是Client里面的每一个属性都是跟当前线程绑定的。实现虽然不太优雅,但是很实在
但是别以为这样就可以随便在线程里面用python-memcached了,因为这种thread local的做法,你的应用必须要使用thread pool的模式,而不能不停创建销毁thread,因为每一个新线程的创建,对于就会使用一个全新的Client,也就是一个全新的socket链接,如果不停打开创建销毁thread的话,就会导致不停的创建销毁socket链接,导致性能大量下降。幸好,无论是cherrypy还是twisted,都是使用了thread pool的模式。

?

但是不幸的是gevent不是thread pool的模式,这导致不停的创建销毁socket链接。

def _patch(self): key = object.__getattribute__(self, '_local__key') d = current_thread().__dict__.get(key) if d is None: d = {} current_thread().__dict__[key] = d object.__setattr__(self, '__dict__', d) # we have a new instance dict, so call out __init__ if we have # one cls = type(self) if cls.__init__ is not object.__init__: args, kw = object.__getattribute__(self, '_local__args') cls.__init__(self, *args, **kw) else: object.__setattr__(self, '__dict__', d)

?

cls是当前对象的class,object是基类class,我打印了清单:

new :? <class 'duitang.memcache.Client'> | <type 'object'>
--------------------
cached: <class 'gevent.local.local'> | <slot wrapper '__init__' of 'object' objects> | <slot wrapper '__init__' of 'object' objects>


yunpeng@yunpeng-duitang:~/test2$ netstat -an | grep 11211
tcp??????? 0????? 0 0.0.0.0:11211?????????? 0.0.0.0:*?????????????? LISTEN????
tcp?????? 31????? 0 127.0.0.1:46835???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46832???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46831???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46829???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0???? 14 127.0.0.1:46833???????? 127.0.0.1:11211???????? ESTABLISHED
tcp?????? 31????? 0 127.0.0.1:46828???????? 127.0.0.1:11211???????? ESTABLISHED
tcp??????? 0????? 0 127.0.0.1:46834???????? 127.0.0.1:11211???????? ESTABLISHED
tcp?????? 31????? 0 127.0.0.1:46830???????? 127.0.0.1:11211???????? ESTABLISHED

当python进程退出socket会被自动关闭。
yunpeng@yunpeng-duitang:/duitang/dist/app/trunk/duitang$ netstat -an | grep 11211
tcp??????? 0????? 0 0.0.0.0:11211?????????? 0.0.0.0:*?????????????? LISTEN????
tcp??????? 0????? 0 127.0.0.1:44900???????? 127.0.0.1:11211???????? TIME_WAIT?


但是很明显,采用threading locale这种方式来保证线程安全存在一些缺陷:

1.要求web server采用thread pool的方式,如果thread每次执行完之后就结束了,这会导致不停的创建销毁socket链接。
2.要求使用python thread locale的语义,但不幸的是python的thread语义很容易被改变,gevent就可以直接把python的一个thread转换成greenlet。


gevent的monkey提供的patch方法

patch_all() 调用所有的monkey patch
???
patch_os() os.fork()替换成gevent.fork


patch_select(aggressive=False) `select.select`替换成`gevent.select.select`

patch_socket(dns=True, aggressive=True)? 标准的socket object 替换成 gevent's cooperative sockets.

patch_thread(threading=True, _threading_local=True) thread` module 替换成 gevent's thread

patch_time()? 把标准的`time.sleep` 替换成`gevent.sleep`.


gunicorn如何使用gevent?
代码:
/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/ggevent.py

# -*- coding: utf-8 -## This file is part of gunicorn released under the MIT license.# See the NOTICE for more information.from __future__ import with_statementimport osimport sysfrom datetime import datetime# workaround on osx, disable kqueueif sys.platform == "darwin":    os.environ['EVENT_NOKQUEUE'] = "1"try:    import geventexcept ImportError:    raise RuntimeError("You need gevent installed to use this worker.")from gevent.pool import Poolfrom gevent.server import StreamServerfrom gevent import pywsgiimport gunicornfrom gunicorn.workers.async import AsyncWorkerVERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)BASE_WSGI_ENV = {    'GATEWAY_INTERFACE': 'CGI/1.1',    'SERVER_SOFTWARE': VERSION,    'SCRIPT_NAME': '',    'wsgi.version': (1, 0),    'wsgi.multithread': False,    'wsgi.multiprocess': False,    'wsgi.run_once': False}class GeventWorker(AsyncWorker):    server_class = None    wsgi_handler = None    @classmethod    def setup(cls):        from gevent import monkey        monkey.noisy = False        monkey.patch_all()    def timeout_ctx(self):        return gevent.Timeout(self.cfg.keepalive, False)    def run(self):        self.socket.setblocking(1)        pool = Pool(self.worker_connections)        if self.server_class is not None:            server = self.server_class(                self.socket, application=self.wsgi, spawn=pool, log=self.log,                handler_class=self.wsgi_handler)        else:            server = StreamServer(self.socket, handle=self.handle, spawn=pool)        server.start()        pid = os.getpid()        try:            while self.alive:                self.notify()                if  pid == os.getpid() and self.ppid != os.getppid():                    self.log.info("Parent changed, shutting down: %s", self)                    break                gevent.sleep(1.0)        except KeyboardInterrupt:            pass        try:            # Try to stop connections until timeout            self.notify()            server.stop(timeout=self.cfg.graceful_timeout)        except:            pass    def handle_request(self, *args):        try:            super(GeventWorker, self).handle_request(*args)        except gevent.GreenletExit:            pass        if gevent.version_info[0] == 0:            def init_process(self):                #gevent 0.13 and older doesn't reinitialize dns for us after forking                #here's the workaround                import gevent.core                gevent.core.dns_shutdown(fail_requests=1)                gevent.core.dns_init()                super(GeventWorker, self).init_process()class GeventResponse(object):    status = None    headers = None    response_length = None    def __init__(self, status, headers, clength):        self.status = status        self.headers = headers        self.response_length = clengthclass PyWSGIHandler(pywsgi.WSGIHandler):    def log_request(self):        start = datetime.fromtimestamp(self.time_start)        finish = datetime.fromtimestamp(self.time_finish)        response_time = finish - start        resp = GeventResponse(self.status, self.response_headers,                self.response_length)        req_headers = [h.split(":", 1) for h in self.headers.headers]        self.server.log.access(resp, req_headers, self.environ, response_time)    def get_environ(self):        env = super(PyWSGIHandler, self).get_environ()        env['gunicorn.sock'] = self.socket        env['RAW_URI'] = self.path        return envclass PyWSGIServer(pywsgi.WSGIServer):    base_env = BASE_WSGI_ENVclass GeventPyWSGIWorker(GeventWorker):    "The Gevent StreamServer based workers."    server_class = PyWSGIServer    wsgi_handler = PyWSGIHandler
?

测试:ab -n100 http://7199.t.duitang.com:7199/cache/

[admin@server2 duitang]$ netstat -an| grep 11211 |wc -l
306


总结:使用了gevent之后thread local有太多不可控.
gevent代码:/duitang/dist/sys/python/lib/python2.7/site-packages/gevent
gevent+django:http://www.slideshare.net/mahendram/scaling-django-with-gevent

热点排行