文章详情页 您现在的位置是:网站首页>文章详情
关于tornado异步的一点研究
Jeyrce.Lu 发表于:2019年8月31日 18:25 分类:【Python】 5470次阅读
Tornado框架与Django、Flask相比,主要的突出点在于异步的使用。(下面的内容最好配合框架代码一起食用)
Tornado异步的使用姿势
Tornado异步的使用方式:
@tornado.web.asynchronous - 老版本的异步装饰器(目前推荐使用 gen.coroutine 替代),必须手动调用 self.finish() 才会对请求响应,否则会一直 pending。
class MainHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, *args, **kwargs): self.write("Hello World!") self.finish()
@tornado.gen.coroutine - 使用 yield 配合 Tornada 编写异步程序。
class NoBlockingHandler(tornado.web.RequestHandler): @gen.coroutine def get(self, *args, **kwargs): yield gen.sleep(10) # yield 右侧的函数或方法需是异步的,如果换成了time.sleep(10)仍为阻塞方式 self.write("No Blocking Request") # gen.sleep 的实现如下: def sleep(duration): f = _create_future() IOLoop.current().call_later(duration, lambda: future_set_result_unless_cancelled(f, None)) return f
@run_on_executor - 使用线程的方式让阻塞变成非阻塞(相对的),原理是另外启动一个线程来执行阻塞的程序。依赖于 concurrent.futures.ThreadPoolExecutor。
class NoBlockingHandler(tornado.web.RequestHandler): # 此处必须为executor,在 run_on_executor 装饰器函数中,会在self中获取executor属性 executor = ThreadPoolExecutor(4) # 1 @run_on_executor def sleep(self, second): time.sleep(second) # 此处的sleep可以是阻塞的,因为run_on_executor装饰过后的方法,会被转换为Future return second @gen.coroutine def get(self, *args, **kwargs): second = yield self.sleep(5) self.write("noblocking Request: {}".format(second)) # run_on_executor实现: def run_on_executor(*args, **kwargs): def run_on_executor_decorator(fn): executor = kwargs.get("executor", "executor") @functools.wraps(fn) def wrapper(self, *args, **kwargs): callback = kwargs.pop("callback", None) async_future = Future() # 此处开始构造Future对象实例!! conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs) # 此处调用的是 #1 处的线程池,结果仍返回的是Future,但会启动一个线程执行被装饰的方法。(可跳进去看实现) chain_future(conc_future, async_future) # 链式调用,conc_future先被调用,接着调用async_future if callback: warnings.warn("callback arguments are deprecated, use the returned Future instead", DeprecationWarning) from tornado.ioloop import IOLoop IOLoop.current().add_future( async_future, lambda future: callback(future.result())) return async_future return wrapper if args and kwargs: raise ValueError("cannot combine positional and keyword args") if len(args) == 1: return run_on_executor_decorator(args[0]) elif len(args) != 0: raise ValueError("expected 1 argument, got %d", len(args)) return run_on_executor_decorator
以上三种方式:
@tornado.web.asynchronous 方式已经不怎么使用。
@tornado.gen.coroutine 方式依赖于 yield 后面的方法或函数是否是 非阻塞实现。
@run_on_executor 方式需要考虑吞吐量过多时,线程对系统资源的消耗。
具体介绍以及事例显示可以参考这篇Blog: https://hexiangyu.me/2017/01/29/real-tornado-async-noblocking/
Future、Runner、IOLoop
Future、Runner、IOLoop 为实现异步的关键!!!
Tornado架构实现异步的代码有点儿复杂,一时半会说不清楚。下面会给一个简化版的异步实现,基本原理是一样的。
PS:select模块为操作系统多路IO复用模式的封装,不同操作系统的底层实现不一样,包括了select、poll、epoll、kqueue。基本原理是,注册一个绑定了文件的事件,当事件触发时,会调用对应事件的回调函数。
import sys import socket from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ selector = DefaultSelector() stop = False urls_todo = 10 class Future: """可以理解为每个执行步骤的载体,yield语句返回的结果,起占位作用""" def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, step): self._callbacks.append(step) def set_result(self, result): self.result = result for step in self._callbacks: # 每次set_result后,都会启动回调函数,这里的回调函数其实是Task中step方法,这会形成了一个调用环,每次调用完后,转到下一个yield语句处 step(self) class Runner: """发动机""" def __init__(self, coro): self.coro = coro def run(self): f = Future() f.set_result(None) self.step(f) # 3 coro为一个生成器,这一步,其实就是为了 send(None) 给 #2 处的生成器 def step(self, future): try: next_future = self.coro.send(future.result) # 4 将暂停的生成器继续启用,并接收 Future实例,调用完后,程序主体跳转到下一个yield语句处 except StopIteration: return next_future.add_done_callback(self.step) # 6 next_future为 下一个生成器返回的 Future实例 class Crawler: def __init__(self, url): self.url = url self.response = b'' def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect((self.url, 80)) except BlockingIOError: pass f = Future() def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f # 2 此处主要是占位,运行至此时,函数将控制权交还给上一级 selector.unregister(sock.fileno()) get = "GET / HTTP/1.0\r\nHost: {}\r\n\r\n".format(self.url) sock.send(get.encode("ascii")) global stop global urls_todo while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) # 注册及下面的取消注册为的是触发sock读事件 chunk = yield f # 5 此处为真正的调用主体,每次会返回Future实例,这里的Future实例的作用实际是返回sock读取的内容 # 此处在循环中,所以会不停的返回 Future实例,直到从服务器接收完所有的信息 selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo -= 1 if not urls_todo: stop = True break print(self.response) sys.stdout.flush() def loop(): while not stop: event = selector.select() # 每次都会选出那些已经触发的事件,下面则进行回调函数的触发 for event_key, event_mask in event: callback = event_key.data callback() # 回调函数可以是 if __name__ == '__main__': import time start_time = time.time() for _ in range(urls_todo): crawler = Crawler("baidu.com") runner = Runner(crawler.fetch()) runner.run() # 1 此处运行至 #2 处,传入Task的参数是一个 生成器 gen,此时已经完成了socket connect步骤 loop() # 事件循环开始,会根据注册的事件响应,回调对应的函数。开始时,事件只有socket connect由服务器返回结果的事件 print("Use time -> {}'s.".format(time.time() - start_time))
Future 对象是个神奇的存在,它本身并不产生结果,每次 set_result 的时候,除了会将result保存,同时也会调用回调函数。
而回调函数则是 Task 中的 step 方法。
Task 对象中的coro成员为 Crawler.fetch(),它是个生成器。
Task.step 方法中调用的 self.coro.send(future.result) 将结果返回给 Crawler.fetch(),程序的流程走到下一个 yield 语句处。
Task.step 下一步 next_future.add_done_callback(self.step),又将自己加入到了 future 的回调函数中。这样就形成了一个环形的调用链,直到生成器结束,抛出 StopIteration 异常。
loop 方法处理注册事件的触发,回调过程中会调用 Future.set_result。
yield
yield是python中实现异步调用的关键,它能够 将函数或方法的调用暂停,并将上下文保存下来,等到下次触发时,恢复上一次调用的上下文环境。
import dis def foo(): result = yield 111 print(f'result of yield: {result}\n') result2 = yield 222 print(f'result of 2nd yield: {result2}\n') return 'done' dis.dis(foo) print("=" * 60, flush=True) first_res = gen = foo() print("First -> {}, lasti -> {}".format(first_res, gen.gi_frame.f_lasti), flush=True) second_res = gen.send(None) # 触发生成器,并将 111 返回,流程走到第一个yield处暂停 print("Second -> {}, lasti -> {}".format(second_res, gen.gi_frame.f_lasti), flush=True) third_res = gen.send("hello") # send方法恢复暂停的生成器,并将hello发送给生成器,yield左边接收hello,并打印,程序继续执行至下一个yield处,此时返回222 print("Third -> {}, lasti -> {}".format(third_res, gen.gi_frame.f_lasti), flush=True) try: gen.send("world") # 此时send完后,result2接收到world,下面没有了yield语句,抛出StopIteration异常 except StopIteration: try: print("Four -> {}".format(gen.gi_frame.f_lasti)) except: pass pass
输出结果如下:
# ===============================output====================================== # 12 0 LOAD_CONST 1 (111) # 2 YIELD_VALUE # 4 STORE_FAST 0 (result) # # 13 6 LOAD_GLOBAL 0 (print) # 8 LOAD_CONST 2 ('result of yield: ') # 10 LOAD_FAST 0 (result) # 12 FORMAT_VALUE 0 # 14 BUILD_STRING 2 # 16 CALL_FUNCTION 1 # 18 POP_TOP # # 14 20 LOAD_CONST 3 (222) # 22 YIELD_VALUE # 24 STORE_FAST 1 (result2) # # 15 26 LOAD_GLOBAL 0 (print) # 28 LOAD_CONST 4 ('result of 2nd yield: ') # 30 LOAD_FAST 1 (result2) # 32 FORMAT_VALUE 0 # 34 BUILD_STRING 2 # 36 CALL_FUNCTION 1 # 38 POP_TOP # # 16 40 LOAD_CONST 5 ('done') # 42 RETURN_VALUE # ============================================================ # First -> <generator object foo at 0x000001FB3647B848>, lasti -> -1 # Second -> 111, lasti -> 2 # result of yield: hello # # Third -> 222, lasti -> 24 # result of 2nd yield: world
上面是一段生成器代码映射的字节码,gen.gi_frame.f_lasti 属性为 上一次暂停时程序运行的字节码位置,如果为-1,则代表的是新的生成器对象。
可以看到,每次send消息到生成器中时,f_lasti 的值会相应的改变,通过它记录生成器执行到了哪个位置。
另一个关键的地方是,Python是个解释型的语言,以CPython为例,CPython解释器是一个简单的c程序,c程序的调用栈在函数返回后会被销毁。但是,Python程序调用时的 栈帧 是保存在堆上的,解释器能将其保留了下来。所以,可以在函数返回后仍能保存住程序的上下文环境,供下次调用。
具体的yield为啥能这么做,请参考:https://www.cnblogs.com/aguncn/p/10178811.html
参考
版权声明 本文属于本站 原创作品,文章版权归本站及作者所有,请尊重作者的创作成果,转载、引用自觉附上本文永久地址: http://blog.lujianxin.com/x/art/beq0koymmud0
猜你喜欢
文章评论区
作者名片
- 作者昵称:Jeyrce.Lu
- 原创文章:61篇
- 转载文章:3篇
- 加入本站:2046天
作者其他文章
站长推荐
友情链接
站点信息
- 运行天数:2047天
- 累计访问:164169人次
- 今日访问:0人次
- 原创文章:69篇
- 转载文章:4篇
- 微信公众号:第一时间获取更新信息