博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在 tornado 中异步无阻塞的执行耗时任务
阅读量:4348 次
发布时间:2019-06-07

本文共 5083 字,大约阅读时间需要 16 分钟。

在 tornado 中异步无阻塞的执行耗时任务


在  上 tornado 是基于 epoll 的事件驱动框架,在事件上是无阻塞的。但是因为 tornado 自身是单线程的,所以如果我们在某一个时刻执行了一个耗时的任务,那么就会阻塞在这里,无法响应其他的任务请求,这个和 tornado 的高性能称号不符,所以我们要想办法把耗时的任务转换为不阻塞主线程,让耗时的任务不影响对其他请求的响应。

在  3.2 上,增加了一个并行库 concurrent.futures,这个库提供了更简单的异步执行函数的方法。

如果是在 2.7 之类的 python 版本上,可以使用 pip install futures 来安装这个库。

关于这个库的具体使用,这里就不详细展开了,可以去看官方文档,需要注意的是,前两个例子是示例错误的用法,可能会产生死锁。

下面说说如何在 tornado 中结合使用 futures 库,最好的参考莫过于有文档+代码。正好, tornado 中解析 ip 使用的 dns 解析服务是无阻塞的。(netutils.ThreadedResolver)

我们来看看它的实现,看看如何应用到我们的中来。

tornado 中使用多线程无阻塞来处理 dns 请求

# 删除了注释 class ThreadedResolver(ExecutorResolver):    _threadpool = None    _threadpool_pid = None    def initialize(self, io_loop=None, num_threads=10):        threadpool = ThreadedResolver._create_threadpool(num_threads)        super(ThreadedResolver, self).initialize(            io_loop=io_loop, executor=threadpool, close_executor=False)    @classmethod    def _create_threadpool(cls, num_threads):        pid = os.getpid()        if cls._threadpool_pid != pid:            # Threads cannot survive after a fork, so if our pid isn't what it            # was when we created the pool then delete it.            cls._threadpool = None        if cls._threadpool is None:            from concurrent.futures import ThreadPoolExecutor            cls._threadpool = ThreadPoolExecutor(num_threads)            cls._threadpool_pid = pid        return cls._threadpool

ThreadedResolver 是 ExecutorEesolver 的子类,看看它的是实现。

class ExecutorResolver(Resolver):    def initialize(self, io_loop=None, executor=None, close_executor=True):        self.io_loop = io_loop or IOLoop.current()        if executor is not None:            self.executor = executor            self.close_executor = close_executor        else:            self.executor = dummy_executor            self.close_executor = False    def close(self):        if self.close_executor:            self.executor.shutdown()        self.executor = None    @run_on_executor    def resolve(self, host, port, family=socket.AF_UNSPEC):        addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)        results = []        for family, socktype, proto, canonname, address in addrinfo:            results.append((family, address))        return results

从 ExecutorResolver 的实现可以看出来,它的关键参数是 ioloop 和 executor,干活的 resolve 函数被@run_on_executor 修饰,结合起来看 ThreadedResolver 的实现,那么这里的 executor 就是from concurrent.futures import ThreadPoolExecutor

再来看看 @run_on_executor 的实现。

run_on_executor 的实现在 concurrent.py 文件中,它的源码如下:

def run_on_executor(fn):    @functools.wraps(fn)    def wrapper(self, *args, **kwargs):        callback = kwargs.pop("callback", None)        future = self.executor.submit(fn, self, *args, **kwargs)        if callback:            self.io_loop.add_future(future,                                    lambda future: callback(future.result()))        return future    return wrapper

关于 functions.wraps() 的介绍可以参考官方文档 functools — Higher-order functions and operations on callable objects

简单的说,这里对传递进来的函数进行了封装,并用 self.executor.submit() 对的函数进行了执行,并判断是否有回调,如果有,就加入到 ioloop 的 callback 里面。

对比官方的 concurrent.futures.Executor 的,里面有个 submit() 方法,从头至尾看看ThreadedResolver 的实现,就是使用了 concurrent.futures.ThreadPoolExecutor 这个 Executor 的子类。

所以 tornado 中解析 dns 使用的多线程无阻塞的方法的实质就是使用了 concurrent.futures 提供的ThreadPoolExecutor 功能。


使用多线程无阻塞方法来执行耗时的任务

借鉴 tornado 的使用方法,在我们自己的程序中也使用这种方法来处理耗时的任务。

from tornado.concurrent import run_on_executorfrom concurrent.futures import ThreadPoolExecutorclass LongTimeTask(tornado.web.RequestHandler):    executor = ThreadPoolExecutor(10)    @run_on_executor()    def get(self, data):        long_time_task(data)

上面就是一个基本的使用方法,下面展示一个使用 sleep() 来模拟耗时的完整程序。

#!/usr/bin/env python#-*-coding:utf-8-*-import tornado.ioloopimport tornado.webimport tornado.httpserverfrom concurrent.futures import ThreadPoolExecutorfrom tornado.concurrent import run_on_executorimport timeclass App(tornado.web.Application):    def __init__(self):        handlers = [            (r'/', IndexHandler),            (r'/sleep/(\d+)', SleepHandler),        ]        settings = dict()        tornado.web.Application.__init__(self, handlers, **settings)class BaseHandler(tornado.web.RequestHandler):    executor = ThreadPoolExecutor(10)class IndexHandler(tornado.web.RequestHandler):    def get(self):        self.write("Hello, world %s" % time.time())class SleepHandler(BaseHandler):    @run_on_executor    def get(self, n):        time.sleep(float(n))        self._callback()    def _callback(self):        self.write("after sleep, now I'm back %s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))if __name__ == "__main__":    app = App()    server = tornado.httpserver.HTTPServer(app, xheaders=True)    server.listen(8888)    tornado.ioloop.IOLoop.instance().start()

此时先调用 127.0.0.1:8888/sleep/10 不会阻塞 127.0.0.1:8888/ 了。

以上,就是完整的在 tornado 中利用多线程来执行耗时的任务。


结语

epoll 的好处确实很多,事件就绪通知后,上层任务函数执行任务,如果任务本身需要较耗时,那么就可以考虑这个方法了,

当然也有其他的方法,比如使用 celery 来调度执行耗时太多的任务,比如频繁的需要写入数据到不同的文件中,我公司的一个中,需要把数据写入四千多个文件中,每天产生几亿条数据,就是使用了 tornado + redis + celery 的方法来高效的执行写文件任务。

完。

转载于:https://www.cnblogs.com/ExMan/p/10439362.html

你可能感兴趣的文章
环形菜单的实现
查看>>
【解决Chrome浏览器和IE浏览器上传附件兼容的问题 -- Chrome关闭flash后,uploadify插件不可用的解决办法】...
查看>>
34 帧动画
查看>>
二次剩余及欧拉准则
查看>>
thymeleaf 自定义标签
查看>>
关于WordCount的作业
查看>>
UIView的layoutSubviews,initWithFrame,initWithCoder方法
查看>>
STM32+IAP方案 实现网络升级应用固件
查看>>
用74HC165读8个按键状态
查看>>
jpg转bmp(使用libjpeg)
查看>>
linear-gradient常用实现效果
查看>>
sql语言的一大类 DML 数据的操纵语言
查看>>
VMware黑屏解决方法
查看>>
JAVA 基础 / 第八课:面向对象 / JAVA类的方法与实例方法
查看>>
Thrift源码分析(二)-- 协议和编解码
查看>>
考勤系统之计算工作小时数
查看>>
4.1 分解条件式
查看>>
关于C++ const成员的一些细节
查看>>
《代码大全》学习摘要(五)软件构建中的设计(下)
查看>>
C#检测驱动是否安装的问题
查看>>