# test_async.py
import asyncio
wait_task_finish = True
async def task_run(_future_obj_, k):
print(f"task {k} running ...")
_future_obj_.set_result(k + 100)
await asyncio.sleep(3)
print(f"task {k} finish.")
return k
async def async_tasks(k):
obj_list=[]
if k > 0:
print(f"begin to create {k} tasks ...")
task_list = []
for i in range(k):
obj_list.append(asyncio.Future())
task_list.append(asyncio.create_task(task_run(obj_list[i], i)))
if wait_task_finish:
print(f"Wait all tasks finish...")
for _task_ in task_list:
result = await _task_
print(f"task {_task_} return {result}")
else:
await asyncio.sleep(1) # if sleep time is not enough, task_run may not be finished
print(f"async_tasks finish.")
return obj_list
print(f"asyncio.run ...")
result = asyncio.run(async_tasks(4))
print(f"print finish result ...")
for _obj_ in result:
if _obj_.done():
print(_obj_.result())
else:
print(f"{_obj_} is not set_result!")
可以更改 wait_task_finish 為 True 或 False, 驗證執行結果: python3 test_async.py
沒有留言:
張貼留言