|
@@ -0,0 +1,51 @@
|
|
|
+import asyncio
|
|
|
+from asyncio import CancelledError
|
|
|
+from util import delay
|
|
|
+
|
|
|
+
|
|
|
+async def test_1():
|
|
|
+ """Отмена задачи"""
|
|
|
+ long_task = asyncio.create_task(delay(10))
|
|
|
+ seconds_elapsed = 0
|
|
|
+
|
|
|
+ while not long_task.done():
|
|
|
+ print('Задача не закончилась, следующая проверка через секунду.')
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ seconds_elapsed = seconds_elapsed + 1
|
|
|
+ if seconds_elapsed == 5:
|
|
|
+ long_task.cancel()
|
|
|
+
|
|
|
+ try:
|
|
|
+ await long_task
|
|
|
+ except CancelledError:
|
|
|
+ print('Наша задача была снята')
|
|
|
+
|
|
|
+
|
|
|
+async def test_2():
|
|
|
+ """timout"""
|
|
|
+ delay_task = asyncio.create_task(delay(2))
|
|
|
+ try:
|
|
|
+ result = await asyncio.wait_for(delay_task, timeout=1)
|
|
|
+ print(result)
|
|
|
+ except asyncio.exceptions.TimeoutError:
|
|
|
+ print('Timeout!')
|
|
|
+ print(f'Задача была снята? {delay_task.cancelled()}')
|
|
|
+
|
|
|
+
|
|
|
+async def test_3():
|
|
|
+ task = asyncio.create_task(delay(10))
|
|
|
+
|
|
|
+ try:
|
|
|
+ result = await asyncio.wait_for(asyncio.shield(task), 5)
|
|
|
+ print(result)
|
|
|
+ except TimeoutError:
|
|
|
+ print("Задача заняла более 5 с, скоро она закончится!")
|
|
|
+ result = await task
|
|
|
+ print(result)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # asyncio.run(test_1())
|
|
|
+ # asyncio.run(test_2())
|
|
|
+ asyncio.run(test_3())
|