cansel.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import asyncio
  2. from asyncio import CancelledError
  3. from util import delay
  4. async def test_1():
  5. """Отмена задачи"""
  6. long_task = asyncio.create_task(delay(10))
  7. seconds_elapsed = 0
  8. while not long_task.done():
  9. print('Задача не закончилась, следующая проверка через секунду.')
  10. await asyncio.sleep(1)
  11. seconds_elapsed = seconds_elapsed + 1
  12. if seconds_elapsed == 5:
  13. long_task.cancel()
  14. try:
  15. await long_task
  16. except CancelledError:
  17. print('Наша задача была снята')
  18. async def test_2():
  19. """timout"""
  20. delay_task = asyncio.create_task(delay(2))
  21. try:
  22. result = await asyncio.wait_for(delay_task, timeout=1)
  23. print(result)
  24. except asyncio.exceptions.TimeoutError:
  25. print('Timeout!')
  26. print(f'Задача была снята? {delay_task.cancelled()}')
  27. async def test_3():
  28. task = asyncio.create_task(delay(10))
  29. try:
  30. result = await asyncio.wait_for(asyncio.shield(task), 5)
  31. print(result)
  32. except TimeoutError:
  33. print("Задача заняла более 5 с, скоро она закончится!")
  34. result = await task
  35. print(result)
  36. if __name__ == '__main__':
  37. # asyncio.run(test_1())
  38. # asyncio.run(test_2())
  39. asyncio.run(test_3())