123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- import asyncio
- from asyncio import CancelledError
- from util import delay
- async def test_1():
- """Отмена задачи"""
- long_task = asyncio.create_task(delay(10))
- seconds_elapsed = 0
- while not long_task.done():
- print('Задача не закончилась, следующая проверка через секунду.')
- await asyncio.sleep(1)
- seconds_elapsed = seconds_elapsed + 1
- if seconds_elapsed == 5:
- long_task.cancel()
- try:
- await long_task
- except CancelledError:
- print('Наша задача была снята')
- async def test_2():
- """timout"""
- delay_task = asyncio.create_task(delay(2))
- try:
- result = await asyncio.wait_for(delay_task, timeout=1)
- print(result)
- except asyncio.exceptions.TimeoutError:
- print('Timeout!')
- print(f'Задача была снята? {delay_task.cancelled()}')
- async def test_3():
- task = asyncio.create_task(delay(10))
- try:
- result = await asyncio.wait_for(asyncio.shield(task), 5)
- print(result)
- except TimeoutError:
- print("Задача заняла более 5 с, скоро она закончится!")
- result = await task
- print(result)
-
- if __name__ == '__main__':
- # asyncio.run(test_1())
- # asyncio.run(test_2())
- asyncio.run(test_3())
|