文章详情页 您现在的位置是:网站首页>文章详情

关于tornado异步的一点研究

图片丢失 Jeyrce.Lu 发表于:2019年8月31日 18:25 分类:【Python 5342次阅读

Tornado框架与Django、Flask相比,主要的突出点在于异步的使用。(下面的内容最好配合框架代码一起食用)

Tornado异步的使用姿势

Tornado异步的使用方式:

  • @tornado.web.asynchronous - 老版本的异步装饰器(目前推荐使用 gen.coroutine 替代),必须手动调用 self.finish() 才会对请求响应,否则会一直 pending

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self, *args, **kwargs):
        self.write("Hello World!")
        self.finish()
  • @tornado.gen.coroutine - 使用 yield 配合 Tornada 编写异步程序。

class NoBlockingHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self, *args, **kwargs):
        yield gen.sleep(10) # yield 右侧的函数或方法需是异步的,如果换成了time.sleep(10)仍为阻塞方式
        self.write("No Blocking Request")

# gen.sleep 的实现如下:
def sleep(duration):
    f = _create_future()
    IOLoop.current().call_later(duration,
                                lambda: future_set_result_unless_cancelled(f, None))
    return f
  • @run_on_executor - 使用线程的方式让阻塞变成非阻塞(相对的),原理是另外启动一个线程来执行阻塞的程序。依赖于 concurrent.futures.ThreadPoolExecutor

class NoBlockingHandler(tornado.web.RequestHandler):
    # 此处必须为executor,在 run_on_executor 装饰器函数中,会在self中获取executor属性
    executor = ThreadPoolExecutor(4) # 1

    @run_on_executor
    def sleep(self, second):
        time.sleep(second) # 此处的sleep可以是阻塞的,因为run_on_executor装饰过后的方法,会被转换为Future
        return second

    @gen.coroutine
    def get(self, *args, **kwargs):
        second = yield self.sleep(5)
        self.write("noblocking Request: {}".format(second))

# run_on_executor实现:
def run_on_executor(*args, **kwargs):
    def run_on_executor_decorator(fn):
        executor = kwargs.get("executor", "executor")

        @functools.wraps(fn)
        def wrapper(self, *args, **kwargs):
            callback = kwargs.pop("callback", None)
            async_future = Future() # 此处开始构造Future对象实例!!
            conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs) # 此处调用的是 #1 处的线程池,结果仍返回的是Future,但会启动一个线程执行被装饰的方法。(可跳进去看实现)
            chain_future(conc_future, async_future) # 链式调用,conc_future先被调用,接着调用async_future
            if callback:
                warnings.warn("callback arguments are deprecated, use the returned Future instead",
                              DeprecationWarning)
                from tornado.ioloop import IOLoop
                IOLoop.current().add_future(
                    async_future, lambda future: callback(future.result()))
            return async_future
        return wrapper
    if args and kwargs:
        raise ValueError("cannot combine positional and keyword args")
    if len(args) == 1:
        return run_on_executor_decorator(args[0])
    elif len(args) != 0:
        raise ValueError("expected 1 argument, got %d", len(args))
    return run_on_executor_decorator

以上三种方式:

  • @tornado.web.asynchronous 方式已经不怎么使用。

  • @tornado.gen.coroutine 方式依赖于 yield 后面的方法或函数是否是 非阻塞实现

  • @run_on_executor 方式需要考虑吞吐量过多时,线程对系统资源的消耗。

具体介绍以及事例显示可以参考这篇Blog: https://hexiangyu.me/2017/01/29/real-tornado-async-noblocking/

Future、Runner、IOLoop

Future、Runner、IOLoop 为实现异步的关键!!!

Tornado架构实现异步的代码有点儿复杂,一时半会说不清楚。下面会给一个简化版的异步实现,基本原理是一样的。

PS:select模块为操作系统多路IO复用模式的封装,不同操作系统的底层实现不一样,包括了select、poll、epoll、kqueue。基本原理是,注册一个绑定了文件的事件,当事件触发时,会调用对应事件的回调函数。

import sys
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()
stop = False
urls_todo = 10


class Future:
    """可以理解为每个执行步骤的载体,yield语句返回的结果,起占位作用"""
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, step):
        self._callbacks.append(step)

    def set_result(self, result):
        self.result = result
        for step in self._callbacks: # 每次set_result后,都会启动回调函数,这里的回调函数其实是Task中step方法,这会形成了一个调用环,每次调用完后,转到下一个yield语句处
            step(self)


class Runner:
    """发动机"""
    def __init__(self, coro):
        self.coro = coro

    def run(self):
        f = Future()
        f.set_result(None)
        self.step(f) # 3 coro为一个生成器,这一步,其实就是为了 send(None) 给 #2 处的生成器

    def step(self, future):
        try:
            next_future = self.coro.send(future.result) # 4 将暂停的生成器继续启用,并接收 Future实例,调用完后,程序主体跳转到下一个yield语句处
        except StopIteration:
            return
        next_future.add_done_callback(self.step) # 6 next_future为 下一个生成器返回的 Future实例


class Crawler:
    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)

        try:
            sock.connect((self.url, 80))
        except BlockingIOError:
            pass

        f = Future()

        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield f # 2 此处主要是占位,运行至此时,函数将控制权交还给上一级

        selector.unregister(sock.fileno())
        get = "GET / HTTP/1.0\r\nHost: {}\r\n\r\n".format(self.url)
        sock.send(get.encode("ascii"))

        global stop
        global urls_todo
        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(), EVENT_READ, on_readable) # 注册及下面的取消注册为的是触发sock读事件
            chunk = yield f # 5 此处为真正的调用主体,每次会返回Future实例,这里的Future实例的作用实际是返回sock读取的内容
                                        # 此处在循环中,所以会不停的返回 Future实例,直到从服务器接收完所有的信息
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls_todo -= 1
                if not urls_todo:
                    stop = True
                break

        print(self.response)
        sys.stdout.flush()


def loop():
    while not stop:
        event = selector.select() # 每次都会选出那些已经触发的事件,下面则进行回调函数的触发
        for event_key, event_mask in event:
            callback = event_key.data
            callback() # 回调函数可以是


if __name__ == '__main__':
    import time

    start_time = time.time()
    for _ in range(urls_todo):
        crawler = Crawler("baidu.com")
        runner = Runner(crawler.fetch())
        runner.run() # 1 此处运行至 #2 处,传入Task的参数是一个 生成器 gen,此时已经完成了socket connect步骤

    loop() # 事件循环开始,会根据注册的事件响应,回调对应的函数。开始时,事件只有socket connect由服务器返回结果的事件

    print("Use time -> {}'s.".format(time.time() - start_time))
  • Future 对象是个神奇的存在,它本身并不产生结果,每次 set_result 的时候,除了会将result保存,同时也会调用回调函数。

  • 而回调函数则是 Task 中的 step 方法。

  • Task 对象中的coro成员为 Crawler.fetch(),它是个生成器。

  • Task.step 方法中调用的 self.coro.send(future.result) 将结果返回给 Crawler.fetch(),程序的流程走到下一个 yield 语句处。

  • Task.step 下一步 next_future.add_done_callback(self.step),又将自己加入到了 future 的回调函数中。这样就形成了一个环形的调用链,直到生成器结束,抛出 StopIteration 异常。

  • loop 方法处理注册事件的触发,回调过程中会调用 Future.set_result

yield

yield是python中实现异步调用的关键,它能够 将函数或方法的调用暂停,并将上下文保存下来,等到下次触发时,恢复上一次调用的上下文环境

import dis


def foo():
    result = yield 111
    print(f'result of yield: {result}\n')
    result2 = yield 222
    print(f'result of 2nd yield: {result2}\n')
    return 'done'


dis.dis(foo)

print("=" * 60, flush=True)

first_res = gen = foo()
print("First -> {}, lasti -> {}".format(first_res, gen.gi_frame.f_lasti), flush=True)

second_res = gen.send(None) # 触发生成器,并将 111 返回,流程走到第一个yield处暂停
print("Second -> {}, lasti -> {}".format(second_res, gen.gi_frame.f_lasti), flush=True)

third_res = gen.send("hello") # send方法恢复暂停的生成器,并将hello发送给生成器,yield左边接收hello,并打印,程序继续执行至下一个yield处,此时返回222
print("Third -> {}, lasti -> {}".format(third_res, gen.gi_frame.f_lasti), flush=True)

try:
    gen.send("world") # 此时send完后,result2接收到world,下面没有了yield语句,抛出StopIteration异常
except StopIteration:
    try:
        print("Four -> {}".format(gen.gi_frame.f_lasti))
    except:
        pass
    pass

输出结果如下:

# ===============================output======================================
# 12 0 LOAD_CONST 1 (111)
# 2 YIELD_VALUE
# 4 STORE_FAST 0 (result)
#
# 13 6 LOAD_GLOBAL 0 (print)
# 8 LOAD_CONST 2 ('result of yield: ')
# 10 LOAD_FAST 0 (result)
# 12 FORMAT_VALUE 0
# 14 BUILD_STRING 2
# 16 CALL_FUNCTION 1
# 18 POP_TOP
#
# 14 20 LOAD_CONST 3 (222)
# 22 YIELD_VALUE
# 24 STORE_FAST 1 (result2)
#
# 15 26 LOAD_GLOBAL 0 (print)
# 28 LOAD_CONST 4 ('result of 2nd yield: ')
# 30 LOAD_FAST 1 (result2)
# 32 FORMAT_VALUE 0
# 34 BUILD_STRING 2
# 36 CALL_FUNCTION 1
# 38 POP_TOP
#
# 16 40 LOAD_CONST 5 ('done')
# 42 RETURN_VALUE
# ============================================================
# First -> <generator object foo at 0x000001FB3647B848>, lasti -> -1
# Second -> 111, lasti -> 2
# result of yield: hello
#
# Third -> 222, lasti -> 24
# result of 2nd yield: world

上面是一段生成器代码映射的字节码,gen.gi_frame.f_lasti 属性为 上一次暂停时程序运行的字节码位置,如果为-1,则代表的是新的生成器对象

可以看到,每次send消息到生成器中时,f_lasti 的值会相应的改变,通过它记录生成器执行到了哪个位置。

另一个关键的地方是,Python是个解释型的语言,以CPython为例,CPython解释器是一个简单的c程序,c程序的调用栈在函数返回后会被销毁。但是,Python程序调用时的 栈帧 是保存在堆上的,解释器能将其保留了下来。所以,可以在函数返回后仍能保存住程序的上下文环境,供下次调用。

具体的yield为啥能这么做,请参考:https://www.cnblogs.com/aguncn/p/10178811.html

参考


版权声明 本文属于本站  原创作品,文章版权归本站及作者所有,请尊重作者的创作成果,转载、引用自觉附上本文永久地址: http://blog.lujianxin.com/x/art/beq0koymmud0

文章评论区

作者名片

图片丢失
  • 作者昵称:Jeyrce.Lu
  • 原创文章:61篇
  • 转载文章:3篇
  • 加入本站:2004天

站点信息

  • 运行天数:2005天
  • 累计访问:164169人次
  • 今日访问:0人次
  • 原创文章:69篇
  • 转载文章:4篇
  • 微信公众号:第一时间获取更新信息