摘要:深入讲解一下await关键字的作用

await是python 最常见的关键字,那么 await 都干了什么?以下面的代码为例,尽可能分析一下....

代码的执行流程

代码如下:


import datetime
import asyncio
async def say_after_time(delay, what, name):
    await asyncio.sleep(delay)

async def main(name):
    print("开始时间为",  datetime.datetime.now())

    # 打印出当前正在运行的task
    # 可以看到task的状态为 running
    task = asyncio.current_task()
    blocking = getattr(task, '_asyncio_future_blocking', None)
    print("当前task是否阻塞", blocking, "当前task名字: ", name)
    print("当前正在运行的task: ", task, "当前task名字: ", name)

    # 打印出当前循环中的全部task
    task = asyncio.all_tasks()
    for othertask in task:
        blocking = getattr(othertask, '_asyncio_future_blocking', None)
        print("其他task是否阻塞", blocking)
    print("全部task", task, "当前task名字: ", name)


    await say_after_time(1, "hello", name)
    await say_after_time(5, "world", name)
    print("结束时间为:", datetime.datetime.now())


loop = asyncio.get_event_loop()
task1 = loop.create_task(main("task1"))
task2 = loop.create_task(main("task2"))
tasklist = [task1, task2]
loop.run_until_complete(asyncio.wait(tasklist))
loop.close()


'''
运行结果:
开始时间为 2020-12-25 20:41:40.040016
当前task是否阻塞 False 当前task名字:  task1
当前正在运行的task:  <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:14>> 当前task名字:  task1
其他task是否阻塞 False
其他task是否阻塞 False
其他task是否阻塞 False
全部task {<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:6>>, <Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:21>>} 当前task名字:  task1
开始时间为 2020-12-25 20:41:40.040016
当前task是否阻塞 False 当前task名字:  task2
当前正在运行的task:  <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:14>> 当前task名字:  task2
其他task是否阻塞 False
其他task是否阻塞 False
其他task是否阻塞 False
全部task {<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:21>>, <Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:24> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()]>>} 当前task名字:  task2
结束时间为: 2020-12-25 20:41:46.040710
结束时间为: 2020-12-25 20:41:46.040710
'''

上面代码的执行顺序为:
第一步:

await关键字1.png

第二步:

await关键字2.png

第三步:

await关键字3.png

代码执行过程分析

首先 在老式的协程中是使用@asyncio.coroutine和yield from实现协程,在新式的协程中是使用async和await来实现协程。所以yield from 和 await 的作用应该是一样的,但是await 只能在 async 修饰的函数里面使用yield from 就基本上哪都可以用,除了async修饰的函数里面

其次我发现了一个有趣的现象:

由于task1是先执行,所以在task1执行时打印出task1的状态为:(ps: 数字14就代表 task1 下行 要执行的 代码的位置 是第14行**)

<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:14>>

当执行task2的时候,此时task1处于await asyncio.sleep(1)造成的睡眠中。所以此时task1的状态为:

<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:24> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()]>>

注意一下 wait_for,发现 task1 任务在等待一个future对象

先来分析一下 asyncio.sleep 的实现

来查看下 asyncio.sleep()源代码:

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_event_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

注意这段代码 return await future ,future变量是一个future类型的对象,所以查看 future类的源码

    def __await__(self):
        # self._asyncio_future_blocking 我没看明白
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

    __iter__ = __await__  # make compatible with 'yield from'.

有点意思了,注意 make compatible with 'yield from.',前面还刚刚讲到 await 和yield from 就是 新式协程旧式协程 的关系。再联想到 yield from 和 __iter__ 的关系(ps: 不懂去看 为什么for循环可以遍历list:Python中迭代器与生成器 - 奥辰 - 博客园),没有理由不怀疑 await 的作用就是调用 future对象内部的 __await__ 方法

这个时候又联想到了前面提到的 awaitable对象 。

awaitable对象

什么是awaitable对象——即可暂停等待的对象

有三类对象是可等待的,即 coroutines, Tasks, and Futures。

coroutine:本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;

Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;

Future:它是一个“更底层”的概念,他代表一个一步操作的最终结果,因为一步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。

三者的关系,coroutine可以自动封装成task,而Task是Future的子类。

既然 coroutines, Tasks, and Futures 都是可暂停等待的对象, 于是构造一个协程对象 ,查看对象内部有没有 __await__ 方法,

import asyncio
async def say_after_time(delay, what, name):
    await asyncio.sleep(delay)
a = say_after_time(1, "hello", "world")
print(dir(a))

'''
运行结果:
['__await__', '__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_origin', 'cr_running', 'send', 'throw']

'''

哦豁,协程对象果然有__await__方法,那么可以推论 当使用 async和await 构造的 say_after_time 协程函数 构造协程对象时,对象内部会被添加__await__方法。 ,当代码执行到 await say_after_time(1, "hello", name)时 ,await关键字 会调用 say_after_time(1, "hello", name)协程对象 里面的 __await__ 方法。

还有一个问题 task1 的状态 是

<Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/asyncio_future_blocking和状态的关系.py:24> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()]>>

可以看到 task1 wait_for(等待) 一个 future对象 。这个future 对象又是什么怎么来的?其实就是从asyncio.sleep()的下面这条件语句得来的。

    try:
        return await future
    finally:
        h.cancel()

至于 该future对象里面的 cb=[<TaskWakeupMethWrapper object at 0x0000024DBAB30EB8>()],cb就是callback回调函数,当future的状态为finished的时候,会调用其回调函数,然后future的回调函数就是 task1 的 __wakeup函数(ps:下面有更详细的解释,往下看)。

关于 _asyncio_future_blocking 当值为true 时表示这个future对象还没有执行完毕。但也不算恰当。毕竟创建一个为pending的future对象, _asyncio_future_blocking 属性 默认就是False,而按照上面的推论,false应该是执行完毕的。所以我也搞不懂是干嘛的。

在 futures.py中找到的提示

    # This field is used for a dual purpose:
    # - Its presence is a marker to declare that a class implements
    #   the Future protocol (i.e. is intended to be duck-type compatible).
    #   The value must also be not-None, to enable a subclass to declare
    #   that it is not compatible by setting this to None.
    # - It is set by __iter__() below so that Task._step() can tell
    #   the difference between
    #   `await Future()` or`yield from Future()` (correct) vs.
    #   `yield Future()` (incorrect).

对 await 执行过程分析

既然 await 这个关键字 分析完了,接下来来讲点更详细的细节,建议先看一下
python协程的原理

代码依次执行:

loop = asyncio.get_event_loop()
task1 = loop.create_task(main("task1"))
task2 = loop.create_task(main("task2"))
tasklist = [task1, task2]
loop.run_until_complete(asyncio.wait(tasklist))

故事从loop.run_until_complete(asyncio.wait(tasklist))开始

asyncio.wait(tasklist) 是一个 协程函数,然后 loop.run_until_complete 在内部将这个协程函数封装为future,然后将_run_until_complete_cb()设置为这个future的回调函数

<Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>

然后 在 asyncio.wait 函数内部 调用 task1, task1在内部调用 await say_after_time(1, "hello", name),这个时候进入 say_after_time函数内部,然后遇到了 await asyncio.sleep(delay),然后 进入 asyncio.sleep 函数内部执行,然后由于 asyncio.sleep 在内部返回一个 future,这个时候将 task1 的 __wakeup函数 加入到 future 的 self._callbacks = [] 中。由于loop.call_later设置的有睡眠时间,当时间到达,loop循环 调用 future对象的set_result 函数时,future对象被设置为finished状态,然后调用 self._callbacks 中的 __wakeup 函数,接着执行task1。

当task1和task2都执行完毕后,asyncio.wait(tasklist)封装的future 的状态变为 finished,然后调用future的回调函数_run_until_complete_cb()。这个函数会将loop._stopping 设置为 True,这个时候loop循环就会停止运行

对于task1 的 __wakeup函数 加入到 future的 解释

task1 的 __wakeup函数 加入到 future 的 self._callbacks = [] 的简化代码:

# tasks.py

coro = self._coro
result = coro.send(None)
blocking = getattr(result, '_asyncio_future_blocking', None)

# blocking 不为 None,说明result是一个future对象或者task对象
if blocking is not None:
    # Yielded Future must come from Future.__iter__().
    # 返回的future必须和当前的task在同一个loop循环中
    if result._loop is not self._loop:
        self._loop.call_soon(
            self._step,
            RuntimeError(
                'Task {!r} got Future {!r} attached to a '
                'different loop'.format(self, result)))
    elif blocking:
        # task 不能 await 自己,无限套娃
        if result is self:
            self._loop.call_soon(
                self._step,
                RuntimeError(
                    'Task cannot await on itself: {!r}'.format(
                        self)))
        else:
            # 将当前task的self._wakeup,加入future对象的回调函数
            result._asyncio_future_blocking = False
            result.add_done_callback(self._wakeup)
            self._fut_waiter = result
            if self._must_cancel:
                if self._fut_waiter.cancel():
                    self._must_cancel = False

loop.run_until_complete 的执行原理

loop.run_until_complete 内部调用 loop.run_forever,不同的是 loop.run_until_complete(asyncio.wait(tasklist)) 会给 asyncio.wait(tasklist)封装成的future 添加 _run_until_complete_cb 回调函数,当 封装成的future 执行完毕, _run_until_complete_cb回调函数loop._stopping 设置为 True,这个时候loop循环就会停止运行。

  • _run_until_complete_cb()的函数代码:

    # base_events.py
    def _run_until_complete_cb(fut):
        if not fut.cancelled():
            exc = fut.exception()
            if isinstance(exc, BaseException) and not isinstance(exc, Exception):
                # Issue #22429: run_forever() already finished, no need to
                # stop it.
                return
        futures._get_loop(fut).stop()
  • futures._get_loop(fut).stop()的函数代码:

    # # base_events.py
    def stop(self):
        """Stop running the event loop.
    
        Every callback already scheduled will still run.  This simply informs
        run_forever to stop looping after a complete iteration.
        """
        self._stopping = True
文章目录