摘要:记录一下关于协程中task and future常见的用法以及概念。

关于协程的概念

首先,在单核不开启超线程的前提下,在一段时间片内,cpu只能执行一个线程。而协程的关键就在于在这个线程中实现了一个loop循环,然后把task任务或者future一个一个加进去,然后遍历这个loop循环,当一个事件发生(比如使用loop.call_later设置几秒后执行这个task)时,loop就去执行这个task。所以协程对密集计算型是没啥用的,就一个时间片,你咋用都是这么多,对爬虫之类的IO密集型(因为不需要等待比如 http请求,你需要等待response,而response的时间是不一定的) 会提升较多。密集计算型应该使用多进程,因为cpu时间片的划分是以进程为单位的,所以多进程会占用更多的cpu时间片。

对于await关键词,await关键词。异步io的关键在于 await io操作,此时,当前携程就会被挂起,时间循环转而执行其他携程,但是要注意前面这句话,并不是说所有携程里的await都会导致当前携程的挂起,要看await后面跟的是什么,如果跟的是我们定义的携程,则会执行这个携程,如果是asyncio模块制作者定义的固有携程,比如模拟io操作的asyncio.sleep,以及io操作,比如网络io:asyncio.open_connection这些,才会挂起当前携程。参考自:await这个关键词以及asyncio.wait asyncio.gather - 扫驴 - 博客园

await关键字和yield from关键字

其实await和yield from关键字没什么用,至于前面说的 当前携程就会被挂起,时间循环转而执行其他携程。简单来说,await io操作 时,程序会跳转到io操作,然后io资源准备好需要时间吧,正常情况下程序会阻塞到这一行代码(而异步模块比如 aiohtpp 之类的 这个模块是精心设计的,作者会采用非阻塞的模式,io没准备好没有关系,继续往下执行io操作包含的代码,准备好后麻烦操作系统通知我一下)。所以如果你在await io操作io操作 中使用time.sleep(5)之类的该阻塞还是会阻塞的,时间循环就不会转而执行其他携程。

await io操作 其实就是遇到比如请求IO资源时设置一个回调函数,然后当IO资源准备好后之后,系统会给loop循环发送通知表示IO资源已经准备好,然后loop循环调用回调函数,而回调函数内部包含对IO资源的处理,就这样,这就是协程的大概原理。

awaitable对象

什么是awaitable对象——即可暂停等待的对象

有三类对象是可等待的,即 coroutines, Tasks, and Futures

coroutine:本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;

Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;

Future:它是一个“更底层”的概念,他代表一个一步操作的最终结果,因为一步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。

三者的关系,coroutine可以自动封装成task,而Task是Future的子类。

future对象

pycharm中使用

import asyncio.tasks
import asyncio.futures

然后按CTRL同时鼠标点击futures即可查看futures类的定义

futures类.png

关于future的状态

future对象有几个状态:

  • Pending
  • Running
  • Done
  • Cancelled

创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,状态为cancel。

相关使用:

import asyncio

def foo(future, result):
    print(f"此时future的状态:{future}")
    print(f"设置future的结果:{result}")
    future.set_result(result)
    print(f"此时future的状态:{future}")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()
        # all_done,"Future is done"为参数
        # 函数原型: loop.call_soon(callback, *args, context=None)
        loop.call_soon(foo, all_done, "Future is done!")
        print("进入事件循环")
        result = loop.run_until_complete(all_done)
        print("返回结果", result)
    finally:
        print("关闭事件循环")
        loop.close()
    print("获取future的结果", all_done.result())
'''
运行结果:
进入事件循环
此时future的状态:<Future pending cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>
设置future的结果:Future is done!
此时future的状态:<Future finished result='Future is done!'>
返回结果 Future is done!
关闭事件循环
获取future的结果 Future is done!
'''

关于future的创建

三种方式:
创建future有三种方式,我感觉前面两种方式没什么区别,都不会自动把future添加到loop循环中去。future并没有什么用所以我们选择更加完善的task类,也就是future的子类。通过 loop.run_until_complete 函数创建的例如 loop.run_until_complete(asyncio.wait([coro1(),coro2()])) ,会在 asyncio.wait 函数内部把 coro1 封装成asyncio.ensure_future(coro1())

  • asyncio.Future()
    源码:

    // asyncio.Future应该就是 asyncio.futures.Future 对象的导出,应该就是创建一个Future类的对象,具体代码懒得找了。

    使用:

    import asyncio
    future = asyncio.Future()
    async def coro1():
        print("wait 1 second")
        await asyncio.sleep(1)
        print("set_result")
        future.set_result('data')
    
    
    async def coro2():
        result = await future
        print(result)
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([
        coro1(),
        coro2()
    ]))
    loop.close()
    
    '''
    运行结果:
    wait 1 second
    set_result
    data
    
    运行结果解释:首先运行coro1,然后打印出"wait 1 second",遇到await asyncio.sleep(1),coro1睡眠1s放弃控制权。
    coro2获得控制权,遇到await future,此时 future 的状态不是finished,所以coro2放弃控制权,相当于阻塞在await future。
    经过1s后coro1被loop循环调用,然后调用set_result将future对象的状态设置为finished,然后coro1运行结束。
    因为future的状态被设为finished, await future的阻塞被解除,await返回coro2设置的结果"data"并赋值为result变量,然后打印出"data"。
    '''
  • asyncio.ensure_future
    源码:
    可以看到 asyncio.ensure_future 就是一个对 loop.create_task 的封装,实际上调用的还是loop.create_task,因此会自动把future添加到loop循环。

    def ensure_future(coro_or_future, *, loop=None):
        """Wrap a coroutine or an awaitable in a future.
    
        If the argument is a Future, it is returned directly.
        """
        if coroutines.iscoroutine(coro_or_future):
            if loop is None:
                loop = events.get_event_loop()
            task = loop.create_task(coro_or_future)
            if task._source_traceback:
                del task._source_traceback[-1]
            return task

    使用:

    import asyncio
    
    async def hello1(a, b):
        print("Hello world 01 begin")
        await asyncio.sleep(3)  # 模拟耗时任务3秒
        print("Hello again 01 end")
        return a + b
    
    
    def callback(future):  # 定义的回调函数
        # 获得 future 完成后的结果
        print(future.result())
    
    
    loop = asyncio.get_event_loop()
    # 创建一个future对象
    future = asyncio.ensure_future(hello1(10, 5))
    future.add_done_callback(callback)
    loop.run_until_complete(future)
    loop.close()
    
    '''
    运行结果:
    Hello world 01 begin
    Hello again 01 end
    15
    '''

三种创建future方式的区别

loop.create_future(),asyncio.Future()仅仅是创建了一个future对象,并没有加入loop循环中,loop循环也就不会自动去调用future,因为loop不知道这个future对象存在,这也是,所以如果你要想把future对象加入loop循环中去,就必须使用await 去把 future加入到loop循环之中。而asyncio.ensure_future会把future对象直接加入到loop循环中,loop循环会根据条件自动调用future。

举例一:

import asyncio

async def hello(name):
    print("正在执行future", name)

loop = asyncio.get_event_loop()

# 并没有使用 loop.run_until_complete 运行下面这个
# 证明 asyncio.ensure_future 会自动把协程函数添加进future
asyncio.ensure_future(hello("future1"))

future2 = asyncio.ensure_future(hello("future2"))

loop.run_until_complete(future2)
loop.close()

举例二:

import asyncio

def done_callback(future):
    print("done")
    print(future)

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

futu = asyncio.Future()

# 添加一个 done_callback 的回调函数
# 证明 asyncio.Future 不会自动被添加进回调函数
futu.add_done_callback(done_callback)

future2 = asyncio.ensure_future(do_some_work(3))

loop = asyncio.get_event_loop()
loop.run_until_complete(future2)

可以看到 future1 不会被执行。

关于future的判断

可以通过 asyncio.isfuture() 来判断一个对象是不是future对象,asyncio.isfuture()实际上就是 base_futures.isfuture 换了个名字.....

asyncio.isfuture()的源码:

# futures.py
isfuture = base_futures.isfuture

使用:

import asyncio

future = asyncio.Future()
print("判断是不是future对象", asyncio.isfuture(future))

'''
运行结果:
判断是不是future对象 True
'''

cancel

取消future的运行。

使用:


import asyncio
async def cancel_me():
    print('cancel_me')

async def main(loop):
    future = loop.create_future()
    future.cancel()
    try:
        print("开始执行future")
        await future
    except asyncio.CancelledError:
        print("future 被取消")


loop = asyncio.get_event_loop()
task = loop.create_task(main(loop))
loop.run_until_complete(task)

'''
运行结果:
开始执行future
future 被取消
'''

注意使用cancel函数可能会导致某些后果。例如下面的一个例子:

import asyncio

future = asyncio.Future()

async def coro1():
    print("wait 1 second")
    await asyncio.sleep(3)
    future.cancel()

async def coro2():
    print("coro2")
    await asyncio.sleep(6)
    # 当 coro2() 在内部调用 await future 时,
    # 该协程函数会阻塞在 await future,loop循环执行其他task
    # 只有当 future 的状态 被设置为 finished 时候
    # await future 才会停止阻塞,继续向下执行
    # 由于取消了future,这个future的状态为 Cancelled
    # 只有当future状态为finished时,coro2才会继续往下运行
    # 所以协程 coro2()会一直卡死在这里
    # 这个理解不是很全面,但是这里篇幅限制,省略了
    result = await future

    print("future运行完成")
    if future.cancelled():
        print("future被取消")

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    coro1(),
    coro2()
]))
loop.close()

'''
运行结果:
wait 1 second
coro2

'''

调用分析:当 coro2() 在 内部调用 await future 时,该协程函数会阻塞在 await future,loop循环执行其他task。
只有当 future 的状态 被设置为 finished 时候,await future 才会停止阻塞,继续向下执行。由于取消了future,这个时候 future状态 为 Cancelled,只有当 future状态finished 时,coro2才会继续往下运行,所以协程 coro2()会一直 卡死 在这里。(ps: 这个理解不是很全面,但是这里篇幅限制,省略了

__schedule_callbacks

这个其实就是保存 通过 add_done_callback 添加的回调函数 的列表。当调用 future.set_result 时会执行 __schedule_callbacks 里面保存的回调函数。

有兴趣去 Python线程、协程探究(3)——协程的调度实现 - 知乎 查看。

cancelled

判断future是否被取消,被取消了返回true。搞不懂有啥用,先放这。

用法:

import asyncio
async def cancel_me():
    print('cancel_me')

async def main(loop):
    future = loop.create_future()
    future.cancel()
    try:
        print("开始执行future")
        await future
    except asyncio.CancelledError:
        print("future 被取消")
    print("future被取消", future.cancelled())


loop = asyncio.get_event_loop()
task = loop.create_task(main(loop))
loop.run_until_complete(task)

'''
运行结果:
开始执行future
future 被取消
future被取消 True
'''

done

判断future是否执行完成(处于finished的状态),完成返回真。

用法:

import asyncio
future = asyncio.Future()
async def coro1():
    print("wait 1 second")
    await asyncio.sleep(1)
    print("set_result")
    future.set_result('data')


async def coro2():
    result = await future
    print(result)
    # 判断 future 是否处于finished状态
    print("future任务是否执行完成", future.done())


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    coro1(),
    coro2()
]))
loop.close()

'''
运行结果:
wait 1 second
set_result
data
future任务是否执行完成 True
'''

result

获得future完成后的结果

用法:

import asyncio

async def hello1(a, b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  # 模拟耗时任务3秒
    print("Hello again 01 end")
    return a + b


def callback(future):  # 定义的回调函数
    # 获得 future 完成后的结果
    print(future.result())


loop = asyncio.get_event_loop()
future = asyncio.ensure_future(hello1(10, 5))
future.add_done_callback(callback)
loop.run_until_complete(future)
loop.close()

'''
运行结果:
Hello world 01 begin
Hello again 01 end
15
'''

add_done_callback

通过add_done_callback添加回调函数,当添加多个回调函数的时候会按照先后顺序依次调用,先添加先调用,后添加后调用。

用法:

import asyncio

# 回调函数
# 会给done_callback返回一个future对象
def done_callback(future):
    print("done")
    print(future)


async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)


futu = asyncio.ensure_future(do_some_work(3))
# 添加一个done_callback的回调函数
futu.add_done_callback(done_callback)

loop = asyncio.get_event_loop()

loop.run_until_complete(futu)

'''
运行结果:
Waiting 3
done
<Task finished coro=<do_some_work() done, defined at E:/pycharm/daima/pycode/协程/future协程添加回调.py:8> result=None>
'''

remove_done_callback

通过remove_done_callback添加回调函数

用法:

import asyncio

# 回调函数
def done_callback(future):
    print("我是回调函数1")
    print(future)

# 回调函数2
def done_callback2(future):
    print("我是回调函数2")


async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)



future = asyncio.ensure_future(do_some_work(3))
future.add_done_callback(done_callback)
future.add_done_callback(done_callback2)

# 删除回调函数1
future.remove_done_callback(done_callback)

loop = asyncio.get_event_loop()

loop.run_until_complete(future)

'''
运行结果:
Waiting 3
我是回调函数2
'''

set_result

将future设置为 finished 状态,同时设置一个结果,可以通过 future.result() 方法获得这个结果。

import asyncio
future = asyncio.Future()
async def coro1():
    print("wait 1 second")
    await asyncio.sleep(1)
    print("set_result")
    # 将future设置为finished状态,同时设置一个结果
    future.set_result('data')


async def coro2():
    result = await future
    print(result)
    print("future任务是否执行完成", future.done())


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    coro1(),
    coro2()
]))
loop.close()

'''
运行结果:
wait 1 second
set_result
data
future任务是否执行完成 True
'''

task对象

task是future的子类,所以task继承了future的属性和方法。

pycharm中使用

import asyncio.tasks
import asyncio.futures

然后按CTRL同时鼠标点击tasks即可查看tasks类的定义

tasks类.png

关于task的创建

task的创建有三种方法, loop.create_task和asyncio.create_task,asyncio.Task。都会把创建的task加入到loop循环。
asyncio.create_task 其实在内部还是调用的 loop.create_task,但是 asyncio.create_task 必须有一个 正在运行的loop循环中 使用,否则会爆错 "RuntimeError: no running event loop"
asyncio.Task 直接调用Task类进行创建。

  • loop.create_task

    使用:

    import asyncio
    
    
    async def main():
        print("创建一个task")
    
    
    loop = asyncio.get_event_loop()
    task = loop.create_task(main())
    loop.run_until_complete(task)
    
    '''
    运行结果:
    创建一个task
    '''
  • asyncio.create_task
    源码:
    asyncio.create_task 就是对 loop.create_task 的封装。

    def create_task(coro):
        """Schedule the execution of a coroutine object in a spawn task.
    
        Return a Task object.
        """
        loop = events.get_running_loop()
        return loop.create_task(coro)
    

    使用:

    import asyncio
    
    async def hello():
        print("创建另一个task")
    
    async def main():
        # 必须在一个正在运行的loop循环中创建,否则会爆错 "RuntimeError: no running event loop"
        asyncio.create_task(hello())
        print("创建一个task")
    
    loop = asyncio.get_event_loop()
    task = loop.create_task(main())
    loop.run_until_complete(task)
    
    '''
    运行结果:
    创建一个task
    进入另一个task
    '''
  • asyncio.Task
    使用:

    import asyncio
    
    async def hello():
        print("hello")
    
    loop = asyncio.get_event_loop()
    task = asyncio.Task(hello())
    loop.run_until_complete(task)
    
    '''
    运行结果:
    hello
    '''

关于task的状态

由于task是future的子类,所以task的状态应该就是和future是一样的。懒得翻源码了。

  • Pending
  • Running
  • Done
  • Cancelled

Pending 就绪态,Running 正在运行,Done 运行完成,Cancelled 任务被取消。

关于task的判断

没有找到 ansyncio自带的模块,这里直接用instance方法。

import asyncio
async def say_after_time(delay, what, name):
    await asyncio.sleep(delay)

loop = asyncio.get_event_loop()
task = loop.create_task(say_after_time(1, "hello", "world"))
print(isinstance(task, asyncio.Task))

'''
运行结果:
True
'''

current_task 与 all_task

current与all_task都是静态方法,可以通过asyncio.current_task 与 asyncio.all_task 可以分别查看当前的正在运行的task和loop循环中的全部task。

import datetime
import asyncio
async def say_after_time(delay, what, name):
    await asyncio.sleep(delay)

async def main(name):
    print("开始时间为",  datetime.datetime.now())

    # 打印出当前正在运行的task
    # 可以看到task的状态为 running
    task = asyncio.current_task()
    blocking = getattr(task, '_asyncio_future_blocking', None)
    print("当前正在运行的task: ", task, "当前task名字: ", name)

    # 打印出当前循环中的全部task
    task = asyncio.all_tasks()
    print("全部task", task, "当前task名字: ", name)

    await say_after_time(1, "hello", name)
    await say_after_time(5, "world", name)
    print("结束时间为:", datetime.datetime.now())


loop = asyncio.get_event_loop()
task = loop.create_task(main("task1"))
task2 = loop.create_task(main("task2"))
tasklist = [task, task2]
loop.run_until_complete(asyncio.wait(tasklist))
loop.close()

'''
运行结果:
开始时间为 2020-12-25 15:22:33.225552
当前正在运行的task:  <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:13>> 当前task名字:  task1
全部task {<Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:17>>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:6>>} 当前task名字:  task1
开始时间为 2020-12-25 15:22:33.225552
当前正在运行的task:  <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:13>> 当前task名字:  task2
全部task {<Task pending coro=<wait() running at E:\python\lib\asyncio\tasks.py:366> cb=[_run_until_complete_cb() at E:\python\lib\asyncio\base_events.py:153]>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:20> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000002D626E10EB8>()]>>, <Task pending coro=<main() running at E:/pycharm/daima/pycode/协程/task类的current_task与all_tasks用法.py:17>>} 当前task名字:  task2
结束时间为: 2020-12-25 15:22:39.227163
结束时间为: 2020-12-25 15:22:39.227163

'''

cancel

取消task

用法:

import asyncio
async def cancel_me():
    print('cancel_me')

async def main(loop):
    task = loop.create_task(cancel_me())
    task.cancel()
    try:
        print("开始执行task")
        await task
    except asyncio.CancelledError:
        print("task 被取消")


loop = asyncio.get_event_loop()
task = loop.create_task(main(loop))
loop.run_until_complete(task)
'''
运行结果:
开始执行task
task 被取消
'''

__step

源码中明明写的 __step 不知到图片中怎么弄成 _step 了。__step是整个协程的核心生成器函数是怎么封装成task的,以及loop循环怎么操作生成器的,都是这个函数的功劳。
一个最简单的生成器函数例如下面:

def aa():
    yield "1234"

首先这个生成器函数会在task类中执行 self._coro = coro ,然后self._loop.call_soon(self.__step, context=self._context)会把 self.__step 加入到loop循环中,这也是我们说的 loop.create_task 会把创建的 task 自动添加到loop循环的原因。然后loop循环就通过self.__step调动生成器函数。self.__step的内部实现就是

coro = self._coro
# self.__step内部通过send来调动生成器
result = coro.send(None)

可以看到 self.__step内部通过send方法来调动生成器。更多详细内容查看 python协程的原理 - keepnight

__wakeup

当在 一个协程内部 await 另一个协程时,当前协程的 __wakeup 被加入 另一个协程的回调函数。当 另一个协程 执行完毕时,会通过执行 另一个协程的回调函数 来接着执行当前协程。

__wakeup的源代码:

  def __wakeup(self, future):
        # 执行当前协程的__step函数
        self.__step()