文章详情页 您现在的位置是:网站首页>文章详情
关于tornado异步的一点研究
Jeyrce.Lu
发表于:2019年8月31日 18:25
分类:【Python】
6092次阅读
Tornado框架与Django、Flask相比,主要的突出点在于异步的使用。(下面的内容最好配合框架代码一起食用)
Tornado异步的使用姿势
Tornado异步的使用方式:
@tornado.web.asynchronous - 老版本的异步装饰器(目前推荐使用 gen.coroutine 替代),必须手动调用 self.finish() 才会对请求响应,否则会一直 pending。
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self, *args, **kwargs):
self.write("Hello World!")
self.finish()@tornado.gen.coroutine - 使用 yield 配合 Tornada 编写异步程序。
class NoBlockingHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self, *args, **kwargs):
yield gen.sleep(10) # yield 右侧的函数或方法需是异步的,如果换成了time.sleep(10)仍为阻塞方式
self.write("No Blocking Request")
# gen.sleep 的实现如下:
def sleep(duration):
f = _create_future()
IOLoop.current().call_later(duration,
lambda: future_set_result_unless_cancelled(f, None))
return f@run_on_executor - 使用线程的方式让阻塞变成非阻塞(相对的),原理是另外启动一个线程来执行阻塞的程序。依赖于 concurrent.futures.ThreadPoolExecutor。
class NoBlockingHandler(tornado.web.RequestHandler):
# 此处必须为executor,在 run_on_executor 装饰器函数中,会在self中获取executor属性
executor = ThreadPoolExecutor(4) # 1
@run_on_executor
def sleep(self, second):
time.sleep(second) # 此处的sleep可以是阻塞的,因为run_on_executor装饰过后的方法,会被转换为Future
return second
@gen.coroutine
def get(self, *args, **kwargs):
second = yield self.sleep(5)
self.write("noblocking Request: {}".format(second))
# run_on_executor实现:
def run_on_executor(*args, **kwargs):
def run_on_executor_decorator(fn):
executor = kwargs.get("executor", "executor")
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback", None)
async_future = Future() # 此处开始构造Future对象实例!!
conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs) # 此处调用的是 #1 处的线程池,结果仍返回的是Future,但会启动一个线程执行被装饰的方法。(可跳进去看实现)
chain_future(conc_future, async_future) # 链式调用,conc_future先被调用,接着调用async_future
if callback:
warnings.warn("callback arguments are deprecated, use the returned Future instead",
DeprecationWarning)
from tornado.ioloop import IOLoop
IOLoop.current().add_future(
async_future, lambda future: callback(future.result()))
return async_future
return wrapper
if args and kwargs:
raise ValueError("cannot combine positional and keyword args")
if len(args) == 1:
return run_on_executor_decorator(args[0])
elif len(args) != 0:
raise ValueError("expected 1 argument, got %d", len(args))
return run_on_executor_decorator以上三种方式:
@tornado.web.asynchronous 方式已经不怎么使用。
@tornado.gen.coroutine 方式依赖于 yield 后面的方法或函数是否是 非阻塞实现。
@run_on_executor 方式需要考虑吞吐量过多时,线程对系统资源的消耗。
具体介绍以及事例显示可以参考这篇Blog: https://hexiangyu.me/2017/01/29/real-tornado-async-noblocking/
Future、Runner、IOLoop
Future、Runner、IOLoop 为实现异步的关键!!!
Tornado架构实现异步的代码有点儿复杂,一时半会说不清楚。下面会给一个简化版的异步实现,基本原理是一样的。
PS:select模块为操作系统多路IO复用模式的封装,不同操作系统的底层实现不一样,包括了select、poll、epoll、kqueue。基本原理是,注册一个绑定了文件的事件,当事件触发时,会调用对应事件的回调函数。
import sys
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector()
stop = False
urls_todo = 10
class Future:
"""可以理解为每个执行步骤的载体,yield语句返回的结果,起占位作用"""
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, step):
self._callbacks.append(step)
def set_result(self, result):
self.result = result
for step in self._callbacks: # 每次set_result后,都会启动回调函数,这里的回调函数其实是Task中step方法,这会形成了一个调用环,每次调用完后,转到下一个yield语句处
step(self)
class Runner:
"""发动机"""
def __init__(self, coro):
self.coro = coro
def run(self):
f = Future()
f.set_result(None)
self.step(f) # 3 coro为一个生成器,这一步,其实就是为了 send(None) 给 #2 处的生成器
def step(self, future):
try:
next_future = self.coro.send(future.result) # 4 将暂停的生成器继续启用,并接收 Future实例,调用完后,程序主体跳转到下一个yield语句处
except StopIteration:
return
next_future.add_done_callback(self.step) # 6 next_future为 下一个生成器返回的 Future实例
class Crawler:
def __init__(self, url):
self.url = url
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect((self.url, 80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f # 2 此处主要是占位,运行至此时,函数将控制权交还给上一级
selector.unregister(sock.fileno())
get = "GET / HTTP/1.0\r\nHost: {}\r\n\r\n".format(self.url)
sock.send(get.encode("ascii"))
global stop
global urls_todo
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable) # 注册及下面的取消注册为的是触发sock读事件
chunk = yield f # 5 此处为真正的调用主体,每次会返回Future实例,这里的Future实例的作用实际是返回sock读取的内容
# 此处在循环中,所以会不停的返回 Future实例,直到从服务器接收完所有的信息
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
urls_todo -= 1
if not urls_todo:
stop = True
break
print(self.response)
sys.stdout.flush()
def loop():
while not stop:
event = selector.select() # 每次都会选出那些已经触发的事件,下面则进行回调函数的触发
for event_key, event_mask in event:
callback = event_key.data
callback() # 回调函数可以是
if __name__ == '__main__':
import time
start_time = time.time()
for _ in range(urls_todo):
crawler = Crawler("baidu.com")
runner = Runner(crawler.fetch())
runner.run() # 1 此处运行至 #2 处,传入Task的参数是一个 生成器 gen,此时已经完成了socket connect步骤
loop() # 事件循环开始,会根据注册的事件响应,回调对应的函数。开始时,事件只有socket connect由服务器返回结果的事件
print("Use time -> {}'s.".format(time.time() - start_time))Future 对象是个神奇的存在,它本身并不产生结果,每次 set_result 的时候,除了会将result保存,同时也会调用回调函数。
而回调函数则是 Task 中的 step 方法。
Task 对象中的coro成员为 Crawler.fetch(),它是个生成器。
Task.step 方法中调用的 self.coro.send(future.result) 将结果返回给 Crawler.fetch(),程序的流程走到下一个 yield 语句处。
Task.step 下一步 next_future.add_done_callback(self.step),又将自己加入到了 future 的回调函数中。这样就形成了一个环形的调用链,直到生成器结束,抛出 StopIteration 异常。
loop 方法处理注册事件的触发,回调过程中会调用 Future.set_result。
yield
yield是python中实现异步调用的关键,它能够 将函数或方法的调用暂停,并将上下文保存下来,等到下次触发时,恢复上一次调用的上下文环境。
import dis
def foo():
result = yield 111
print(f'result of yield: {result}\n')
result2 = yield 222
print(f'result of 2nd yield: {result2}\n')
return 'done'
dis.dis(foo)
print("=" * 60, flush=True)
first_res = gen = foo()
print("First -> {}, lasti -> {}".format(first_res, gen.gi_frame.f_lasti), flush=True)
second_res = gen.send(None) # 触发生成器,并将 111 返回,流程走到第一个yield处暂停
print("Second -> {}, lasti -> {}".format(second_res, gen.gi_frame.f_lasti), flush=True)
third_res = gen.send("hello") # send方法恢复暂停的生成器,并将hello发送给生成器,yield左边接收hello,并打印,程序继续执行至下一个yield处,此时返回222
print("Third -> {}, lasti -> {}".format(third_res, gen.gi_frame.f_lasti), flush=True)
try:
gen.send("world") # 此时send完后,result2接收到world,下面没有了yield语句,抛出StopIteration异常
except StopIteration:
try:
print("Four -> {}".format(gen.gi_frame.f_lasti))
except:
pass
pass输出结果如下:
# ===============================output======================================
# 12 0 LOAD_CONST 1 (111)
# 2 YIELD_VALUE
# 4 STORE_FAST 0 (result)
#
# 13 6 LOAD_GLOBAL 0 (print)
# 8 LOAD_CONST 2 ('result of yield: ')
# 10 LOAD_FAST 0 (result)
# 12 FORMAT_VALUE 0
# 14 BUILD_STRING 2
# 16 CALL_FUNCTION 1
# 18 POP_TOP
#
# 14 20 LOAD_CONST 3 (222)
# 22 YIELD_VALUE
# 24 STORE_FAST 1 (result2)
#
# 15 26 LOAD_GLOBAL 0 (print)
# 28 LOAD_CONST 4 ('result of 2nd yield: ')
# 30 LOAD_FAST 1 (result2)
# 32 FORMAT_VALUE 0
# 34 BUILD_STRING 2
# 36 CALL_FUNCTION 1
# 38 POP_TOP
#
# 16 40 LOAD_CONST 5 ('done')
# 42 RETURN_VALUE
# ============================================================
# First -> <generator object foo at 0x000001FB3647B848>, lasti -> -1
# Second -> 111, lasti -> 2
# result of yield: hello
#
# Third -> 222, lasti -> 24
# result of 2nd yield: world上面是一段生成器代码映射的字节码,gen.gi_frame.f_lasti 属性为 上一次暂停时程序运行的字节码位置,如果为-1,则代表的是新的生成器对象。
可以看到,每次send消息到生成器中时,f_lasti 的值会相应的改变,通过它记录生成器执行到了哪个位置。
另一个关键的地方是,Python是个解释型的语言,以CPython为例,CPython解释器是一个简单的c程序,c程序的调用栈在函数返回后会被销毁。但是,Python程序调用时的 栈帧 是保存在堆上的,解释器能将其保留了下来。所以,可以在函数返回后仍能保存住程序的上下文环境,供下次调用。
具体的yield为啥能这么做,请参考:https://www.cnblogs.com/aguncn/p/10178811.html
参考
版权声明 本文属于本站 原创作品,文章版权归本站及作者所有,请尊重作者的创作成果,转载、引用自觉附上本文永久地址: http://blog.lujianxin.com/x/art/beq0koymmud0
猜你喜欢
文章评论区
作者名片
- 作者昵称:Jeyrce.Lu
- 原创文章:61篇
- 转载文章:3篇
- 加入本站:2387天
作者其他文章
站长推荐
友情链接
站点信息
- 运行天数:2388天
- 累计访问:164169人次
- 今日访问:0人次
- 原创文章:69篇
- 转载文章:4篇
- 微信公众号:第一时间获取更新信息
