#! python3
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
# return f"task done: {what}"
async def main():
print(f"started at {time.strftime('%X')}")
# await say_after(1, 'hello')
# await say_after(2, 'world')
# print(f"finished at {time.strftime('%X')}")
# 总执行时间≈2秒(由最慢的任务决定)
# task1 = asyncio.create_task(say_after(1, 'hello'))
# task2 = asyncio.create_task(say_after(2, 'world'))
# await task1
# await task2
# print(f"finished at {time.strftime('%X')}")
# task管理器 当存在上下文管理器时 await 是隐式执行的。
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(say_after(1, 'hello'))
task2 = tg.create_task(say_after(2, 'world'))
print(f"finished at {time.strftime('%X')}")
async def taskCallback():
task = asyncio.create_task(say_after(1, 'hello'))
task.add_done_callback(lambda t: print("Task is done! Result: {!r}".format(t.result())))
await task
class TaskCancelException(Exception):
"""Exception raised to terminate a task group."""
async def force_cancel_task():
raise TaskCancelException()
async def taskGroupCancel():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(say_after(1, 'hello'))
task2 = tg.create_task(say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
await asyncio.sleep(1)
tg.create_task(force_cancel_task()) # 异步任务组取消
except* TaskCancelException:
print("Task is canceled!")
async def taskCancel():
print(f"started main at {time.strftime('%X')}")
task = asyncio.create_task(say_after(5, 'hello'))
print(f"started at {time.strftime('%X')}")
await asyncio.sleep(1)
task.cancel()
# task.cancel() 并不会立即终止任务,而是会在下一个 await 或挂起点 抛出 asyncio.CancelledError 异常
try:
await task
except asyncio.CancelledError:
print("main task is canceled!")
print(f"finished at {time.strftime('%X')}")
async def taskTimeout():
try:
print(f"started at {time.strftime('%X')}")
async with asyncio.timeout(5):
await asyncio.sleep(10)
except TimeoutError:
print("Task is timeout!")
print(f"finished at {time.strftime('%X')}")
async def taskTimeoutAt():
loop = asyncio.get_running_loop()
deadline = loop.time() + 5
print(f"started at {time.strftime('%X')}")
print(deadline)
try:
async with asyncio.timeout_at(deadline):
await asyncio.sleep(10)
except TimeoutError:
print("The long operation timed out, but we've handled it.")
print(f"finished at {time.strftime('%X')}")
async def eternity():
# 休眠一小时
await asyncio.sleep(10)
print('yay!')
async def taskWaitFor():
# 等待至多 5 秒
try:
print(f"started at {time.strftime('%X')}")
await asyncio.wait_for(eternity(), timeout=5.0)
except TimeoutError:
print('timeout!')
print(f"finished at {time.strftime('%X')}")
def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# 请注意 time.sleep() 可被替换为任意一种
# 阻塞式 IO 密集型操作,例如文件操作。
time.sleep(5)
print(f"blocking_io complete at {time.strftime('%X')}")
async def taskExcute1():
# print(f"started main at {time.strftime('%X')}")
await asyncio.sleep(2)
print(f"finished task1 at {time.strftime('%X')}")
async def taskExcuteInThread():
print(f"started main at {time.strftime('%X')}")
await asyncio.gather(
asyncio.to_thread(blocking_io),
taskExcute1())
print(f"finished main at {time.strftime('%X')}")
async def task1(i):
await asyncio.sleep(i)
print(f"task {i} is done")
return f'the result {i}'
async def taskWaitAllComplete():
print(f"started main at {time.strftime('%X')}")
# 多个并发任务,完成时间取决于最慢的任务
tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]
# done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.ALL_COMPLETED) # 不设置超时时间
for d in done:
print(d.result())
for p in pending:
print("超时未完成task")
print(p)
print(f"finished main at {time.strftime('%X')}")
async def taskWaitOneComplete():
print(f"started main at {time.strftime('%X')}")
# 多个并发任务,完成时间取决于最慢的任务
tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]
# done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.FIRST_COMPLETED) # 不设置超时时间
for d in done:
print(d.result())
for p in pending:
print("超时未完成task")
print(p)
p.cancel()
print(f"finished main at {time.strftime('%X')}")
async def taskLock(name):
# 创建一个锁
async with lock:
print(f"started task_{name} at {time.strftime('%X')}")
print(f"lock acquired at {time.strftime('%X')}")
await asyncio.sleep(1)
print(f"lock released at {time.strftime('%X')}")
async def runTaskLock():
async with asyncio.TaskGroup() as tg:
tg.create_task(taskLock("A"))
tg.create_task(taskLock("B"))
tg.create_task(taskLock("C"))
print("All tasks are done")
async def taskEvent(event):
print(f"waiting event at {time.strftime('%X')}")
await event.wait()
print(f"done event at {time.strftime('%X')}")
async def taskEventSet(event):
await asyncio.sleep(5)
event.set()
async def runTaskEvent():
event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
tg.create_task(taskEvent(event))
tg.create_task(taskEvent(event))
tg.create_task(taskEvent(event))
tg.create_task(taskEventSet(event))
async def taskConditionProducer(condition):
async with condition:
await asyncio.sleep(2)
print(f"condition nofify at {time.strftime('%X')}")
# condition.notify()
condition.notify_all()
# for i in range(10):
# await asyncio.sleep(1)
# condition.notify()
async def taskConditionConsumer(condition, name):
async with condition:
while True:
print(f"{name} waiting condition at {time.strftime('%X')}")
await condition.wait()
print(f"{name} done condition at {time.strftime('%X')}")
break
# print(f"waiting condition at {time.strftime('%X')}")
# await condition.wait()
# print(f"done condition at {time.strftime('%X')}")
async def runTaskCondition():
condition = asyncio.Condition()
async with asyncio.TaskGroup() as tg:
tg.create_task(taskConditionConsumer(condition, "A"))
tg.create_task(taskConditionConsumer(condition, "B"))
tg.create_task(taskConditionConsumer(condition, "C"))
tg.create_task(taskConditionConsumer(condition, "D"))
tg.create_task(taskConditionConsumer(condition, "E"))
tg.create_task(taskConditionConsumer(condition, "F"))
tg.create_task(taskConditionConsumer(condition, "G"))
tg.create_task(taskConditionProducer(condition))
async def taskSemaphore(semaphore, i):
async with semaphore:
print(f"started task_{i} at {time.strftime('%X')}")
print(f"semaphore acquired at {time.strftime('%X')}")
await asyncio.sleep(1)
print(f"semaphore released at {time.strftime('%X')}")
async def runTaskSemaphore():
# 信号量, 限制最多3个并发任务
semaphore = asyncio.Semaphore(3)
async with asyncio.TaskGroup() as tg:
for i in range(10):
tg.create_task(taskSemaphore(semaphore, i))
async def taskBarrier(barrier, i):
print(f"started task_{i} at {time.strftime('%X')}")
await asyncio.sleep(1 + i * 0.5)
await barrier.wait()
print(f"done task_{i} at {time.strftime('%X')}")
async def runTaskBarrier():
# 任务屏障, 限制最多3个并发任务
barrier = asyncio.Barrier(3)
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(taskBarrier(barrier, i))
async def runTaskBarrier1():
# 任务屏障, 限制最多3个并发任务
barrier = asyncio.Barrier(3)
asyncio.create_task(taskBarrier(barrier, 0))
asyncio.create_task(taskBarrier(barrier, 1))
await asyncio.sleep(2)
await barrier.wait()
print("All tasks are done")
async def taskQueueProducer(queue):
for i in range(20):
await queue.put(i)
print(f"put {i} to queue")
await asyncio.sleep(1)
await queue.join()
print("All items have been processed")
raise TaskDoneException()
async def taskQueueConsumer(queue):
while True:
item = await queue.get()
await asyncio.sleep(1)
queue.task_done()
print(f"get {item} from queue")
class TaskDoneException(Exception):
"""Exception raised to terminate a task group."""
async def runTaskQueue():
queue = asyncio.Queue()
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(taskQueueProducer(queue))
tg.create_task(taskQueueConsumer(queue))
tg.create_task(taskQueueConsumer(queue))
except* TaskDoneException:
print("Tasks is done!")
if __name__ == '__main__':
# asyncio.run(main())
# 任务回调
# asyncio.run(taskCallback())
# 任务组取消 (模拟抛异常终结)
# asyncio.run(taskGroupCancel())
# 任务取消 (模拟正常终结)
# asyncio.run(taskCancel())
# 任务超时异常
# asyncio.run(taskTimeout())
# 任务超时异常
# asyncio.run(taskTimeoutAt())
# 任务超时异常
# asyncio.run(taskWaitFor())
# 针对于阻塞IO密集型操作,在新线程调用运行
# asyncio.run(taskExcuteInThread())
# 任务简单等待 (不会抛超时异常, 分别返回完成、未完成任务future)
# asyncio.run(taskWaitAllComplete())
# 任务简单等待 (会抛超时异常, 分别返回完成、未完成任务future)
# asyncio.run(taskWaitOneComplete())
# lock = asyncio.Lock()
# # 任务锁
# asyncio.run(runTaskLock())
# 任务事件event 协程之间的信号传递, 事件可被用来通知多个 asyncio 任务已经有事件发生。
# asyncio.run(runTaskEvent())
# 任务条件condition
# asyncio.run(runTaskCondition())
# 信号量, 限制最多3个并发任务
# asyncio.run(runTaskSemaphore())
# 任务屏障, 限制最多3个并发任务
# asyncio.run(runTaskBarrier())
# asyncio.run(runTaskBarrier1())
# 任务队列 (生产者-消费者模式) 任务完成后自动退出
asyncio.run(runTaskQueue())