注册 登录

清河洛

python中的asyncio库的高层级API

qingheluo2022-02-11清河洛397
asyncio库自python3.4版本开始加入标准库,用于基于协程来实现异步IO协程又称微线程,作用是在执行函数时可以中断去执行其他函数,整个过程看似像多线程创建一个协程不用调用系统功能,程序自身就能完成,所以协程也被称作用户态线程 协程是在线程中模拟出来的,也就是说协程是由线程生成的 一个线程生成的所有协程始终只在这一个线程中运行 协程之间的切换是由程序来决定的,也就是说我们可以人为的控制协程的切换,且切换开销很小 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多使用async关键字创建协程函数,在协程函数中使用awai...

asyncio库自python3.4版本开始加入标准库,用于基于协程来实现异步IO

协程又称微线程,作用是在执行函数时可以中断去执行其他函数,整个过程看似像多线程

创建一个协程不用调用系统功能,程序自身就能完成,所以协程也被称作用户态线程
协程是在线程中模拟出来的,也就是说协程是由线程生成的
一个线程生成的所有协程始终只在这一个线程中运行
协程之间的切换是由程序来决定的,也就是说我们可以人为的控制协程的切换,且切换开销很小
不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多

使用async关键字创建协程函数,在协程函数中使用await关键字异步等待一个可等待对象(await仅可用于协程函数的函数体中)

可等待对象有三种主要类型:

    协程(coroutines)一般指调用协程函数所返回的协程对象
    任务(Task):用来并发调度的协程,是对协程函数的进一步包装,任务可以包含各种状态,便于对异步操作状态的调控
        asyncio.create_task(coro, *, name=None):将一个协程对象(调用协程函数返回)封装为任务
        loop.create_task(coro, *, name=None):将一个协程对象封装为任务
            如果提供name参数且不为None,会使用Task.set_name()来设为任务的名称
        一般来说,协程函数具有4种状态:
            Pending:创建但是未运行
            Running:正在运行
            Done :运行完毕
            Cacelled:被取消
    Futures:是低层级的可等待对象,表示一个异步操作的最终结果
        当一个Future对象被等待,协程将保持等待直到该Future对象操作完毕
        操作结束后会把最终结果设置到这个Future对象上
协程函数直接调用会返回一个协程对象,并不会运行函数中的语句,需要将协程对象添加到事件循环中,由事件循环去运行

事件循环(Eventloop)是asyncio应用的核心,用于管理所有的任务,是中央总控,实例提供了注册、取消、执行任务和回调的方法,把一些任务注册到事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行

一个线程中同时只会允许创建一个事件循环,再次创建会等待上一个事件循环结束后再创建新的事件循环

协程运行的工作流程

1、定义/创建协程对象
2、将协程转为task任务
3、定义事件循环对象容器
4、将task任务添加到事件循环对象中
5、执行事件循环

python中asyncio异步模块的高层级API

自省方法

current_task(loop=None):返回当前运行的Task对象,如果没有返回None
all_tasks(loop=None):返回事件循环所运行的未完成的Task对象

Task对象的方法

task.cancel(msg=None):取消Task对象的运行,在下一轮事件循环中抛出一个CancelledError异常
task.cancelled():返回Task对象是否被取消
task.done():Task对象是否已完成
task.result():返回Task的结果
    如果Task对象已完成,其结果会被返回(当协程引发异常时,该异常会被重新引发)
    如果Task对象被取消,会引发一个CancelledError异常
    如果Task对象的结果还不可用,此方法会引发一个InvalidStateError异常
task.exception():返回Task对象的异常,无异常返回None
task.add_done_callback(callback, *, context=None):添加一个回调,将在Task对象完成时被运行
task.remove_done_callback(callback):从回调列表中移除callback
task.get_stack(*, limit=None):返回此Task对象中所有未完成的栈框架列表
    如果已经完成或取消返回空列表,如果有一场则会返回回溯的架构
    栈返回最新的框架,回溯返回最旧的框架
task.print_stack(*, limit=None, file=None):打印此Task对象的栈或回溯
    file参数是输出写入的I/O流;默认会写入sys.stderr
task.get_coro():返回由Task包装的协程对象
task.get_name():返回Task的名称
task.set_name(value):设置Task的名称,value可为任意对象,它随后会被转换为字符串

asyncio.run(coro,*,debug=False)

    运行协程对象coro,debug表示是否以调适模式运行
    此函数总是会创建一个新的事件循环并在结束时关闭
    由于一个线程中同时只会允许创建一个事件循环,再次创建会等待上一个事件循环结束后再创建新的事件循环
    所以多次使用run()函数运行协程时,后面的run()函数会等待前一个运行结束后才能创建新的事件循环并运行协程函数
    这就导致了类似同步运行运行的效果,不符合我们异步的需求
    所以它应当被用作asyncio程序的主入口点,理想情况下应当只被调用一次

asyncio.create_task(coro, *, name=None)

    将coro协程封装为一个任务(Task)并调度其执行,返回Task对象
    name表示该任务的名称,内部会使用Task.set_name()来设为任务的名称

coroutine asyncio.sleep(delay, result=None, *, loop=None)

阻塞delay指定的秒数
如果指定了result,则当协程完成时将其返回给调用者
sleep()总是会挂起当前任务,以允许其他任务运行。
将delay设为0将提供一个经优化的路径以允许其他任务运行。这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

并发运行aws序列中的可等待对象,aws中的协程将自动被作为任务调度自执行
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与aws中可等待对象的顺序一致
return_exceptions表示任务中引发异常是否当作正常结果一样处理,为False则会立即传播给等待gather()的任务
但是怎么处理异常,aws序列中的其他可等待对象不会被取消并将继续运行
如果gather()被取消,所有被提交且尚未完成的可等待对象也会被取消。

awaitable asyncio.shield(aw, *, loop=None)

屏蔽取消操作的运行一个可等待对象aw防止其被取消,如果aw是一个协程,它将自动被作为任务调度
如:如果包含res = await shield(something())这个语句的协程被取消,从something()的角度看来,取消操作并没有发生,然而其调用者已被取消,因此 "await"表达式仍然会引发CancelledError。

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

指定可等待对象aw完成直到超时,如果aw是一个协程,它将自动被作为任务调度
timeout可以为float或int型数值表示的等待秒数,也可以为None(等待直到完成)
如果发生超时,任务将取消(可以使用shield()避免任务取消)并引发asyncio.TimeoutError异常
当aw因超时被取消,wait_for会等待aw被真正取消,所以总等待时间可能超过timeout
如果等待被取消,则aw指定的对象也会被取消
如果在取消期间发生了异常,异常将会被传播

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发地运行aws可迭代对象(不能为空)中的可等待对象 并进入阻塞状态直到满足return_when所指定的条件
aws中的协程将自动被作为任务调度自执行
返回已经完成任务和未完成任务,如done, pending = await asyncio.wait(aws)
此函数不会引发asyncio.TimeoutError。当超时时,未完成的任务会被被返回
return_when 指定此函数应在何时返回。它必须为以下常数之一:
    FIRST_COMPLETED:函数将在任意可等待对象结束或取消时返回。
    FIRST_EXCEPTION:函数将在任意可等待对象因引发异常而结束时返回。如没有引发任何异常相当于ALL_COMPLETED
    ALL_COMPLETED:函数将在所有可等待对象结束或取消时返回

在线程中运行

to_thread(func, /, *args, **kwargs)
    在不同的OS线程中异步地运行函数func
    *args 和 **kwargs会被直接传给func
    当前contextvars.Context会被传播,允许在不同的线程中访问来自事件循环的上下文变量
    返回一个可被等待以获取func的最终结果的协程。

run_coroutine_threadsafe(coro, loop):向指定事件循环提交一个协程
    返回一个Future以等待来自其他OS线程的结果
    此函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程
    主要用于跨线程调度

队列集

Queue(maxsize=0):返回一个先进先出(FIFO)队列对象,如果maxsize小于等于零,则队列尺寸不限

    maxsize:队列中可存放的最大元素数量
    empty():队列是否为空
    full():队列元素是否达到最大值
    get():返回一个队列中的元素。队列为空则等待直到队列中有元素
    get_nowait():立即返回一个队列中的元素,如果队列为空引发异常QueueEmpty
    join():阻塞至队列中所有的元素都被接收和处理完毕
        每当消费协程调用task_done()表示这个条目已经处理完毕,未完成计数就会减少
        当未完成计数降到零的时候,join()阻塞被解除
    put(item):添加一个元素进队列。如果队列已满会一直等待空闲插槽可用
    put_nowait(item):不阻塞的放一个元素入队列,如果队列已满会引发QueueFull异常
    qsize():返回当前队列的元素数量
    task_done():表明取出的任务已经完成,如果调用次数多于队列的长度,将引发ValueError

PriorityQueue(maxsize=0):优先级队列,按优先级顺序取出条目(最小的先取出)

每个元素都是一个(priority_number, data)元组

LifoQueue(maxsize=0):后进先出队列

同步原语

Lock():创建一个互斥锁对象,推荐使用async with语句使用该对象,每次执行完会自动调用release()

lock = asyncio.Lock()

async with lock:
    somethings
等价于以下语句
await lock.acquire()
try:
    somethings
finally:
    lock.release()

互斥锁对象的方法

    acquire():获取锁。会等待直至锁为unlocked,将其设为locked并返回True
    release():释放锁,将锁从locked设为unlocked 并返回,如果锁已经为unlocked则引发RuntimeError
    locked():验证锁是否为locked

Event():创建一个事件对象,用于通知多个asyncio任务某个事件已经发生

    wait():等待直至事件被设置
    set():设置事件
    clear():取消设置事件
    is_set():验证事件是否已被设置

Semaphore(value=1):创建一个信号量对象,推荐使用async with语句使用该对象,每次执行完会自动调用release()

    信号量会管理一个内部计数器,该计数器会每次调用acquire()递减并每次调用release()递增
    计数器永远不会降到零以下;当acquire()发现其值为零时,将保持阻塞直到某个任务调用了release()
    acquire():获取一个信号量,内部计数器减一并返回True,如果值为零,会等待直到release()调用并返回True
    locked():如果信号量对象无法被立即获取则返回True
    release():释放一个信号量对象,将内部计数器的值加一

异常

TimeoutError:操作已超过规定的截止事件
CancelledError:操作已被取消
InvalidStateError:Task或Future的内部状态无效,在为已设置结果值的Future设置结果值可以引发此问题
SendfileNotAvailableError:系统调用不适用于给定的套接字或文件类型
    子类RuntimeError
IncompleteReadError:请求的读取操作未完全完成,是EOFError的子类
    expected:预期字节的总数(int)
    partial:到达流结束之前读取的bytes字符串
LimitOverrunError:在查找分隔符时达到缓冲区大小限制

一个多线程结合asyncio解决调用时的假死的示例

import asyncio,asyncio,time,threading
 
async def func(num):
    print(f'准备调用func,大约耗时{num}')
    await asyncio.sleep(num)
    print(f'耗时{num}之后,func函数运行结束')
 
#定义一个专门创建事件循环loop的函数,在另一个线程中启动它
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 
#定义一个main函数
def main():
    coroutine1 = func(3)
    coroutine2 = func(2)
    coroutine3 = func(1)

    new_loop = asyncio.new_event_loop()
        #在当前线程下创建时间循环,(未启用),在start_loop里面启动它
    t = threading.Thread(target=start_loop,args=(new_loop,))
        #通过当前线程开启新的线程去启动事件循环
    t.start()

    asyncio.run_coroutine_threadsafe(coroutine1,new_loop)
    asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
    asyncio.run_coroutine_threadsafe(coroutine3,new_loop)
    #这几个是关键,代表在新线程中事件循环不断“游走”执行

main()

main是在主线程中的,而三个协程函数是在新线程中的,它们是在一起执行的,没有造成主线程main的阻塞


网址导航