testing.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import asyncio
  2. import contextvars
  3. import time
  4. from .compatibility import guarantee_single_callable
  5. from .timeout import timeout as async_timeout
  6. class ApplicationCommunicator:
  7. """
  8. Runs an ASGI application in a test mode, allowing sending of
  9. messages to it and retrieval of messages it sends.
  10. """
  11. def __init__(self, application, scope):
  12. self.application = guarantee_single_callable(application)
  13. self.scope = scope
  14. self.input_queue = asyncio.Queue()
  15. self.output_queue = asyncio.Queue()
  16. # Clear context - this ensures that context vars set in the testing scope
  17. # are not "leaked" into the application which would normally begin with
  18. # an empty context. In Python >= 3.11 this could also be written as:
  19. # asyncio.create_task(..., context=contextvars.Context())
  20. self.future = contextvars.Context().run(
  21. asyncio.create_task,
  22. self.application(scope, self.input_queue.get, self.output_queue.put),
  23. )
  24. async def wait(self, timeout=1):
  25. """
  26. Waits for the application to stop itself and returns any exceptions.
  27. """
  28. try:
  29. async with async_timeout(timeout):
  30. try:
  31. await self.future
  32. self.future.result()
  33. except asyncio.CancelledError:
  34. pass
  35. finally:
  36. if not self.future.done():
  37. self.future.cancel()
  38. try:
  39. await self.future
  40. except asyncio.CancelledError:
  41. pass
  42. def stop(self, exceptions=True):
  43. if not self.future.done():
  44. self.future.cancel()
  45. elif exceptions:
  46. # Give a chance to raise any exceptions
  47. self.future.result()
  48. def __del__(self):
  49. # Clean up on deletion
  50. try:
  51. self.stop(exceptions=False)
  52. except RuntimeError:
  53. # Event loop already stopped
  54. pass
  55. async def send_input(self, message):
  56. """
  57. Sends a single message to the application
  58. """
  59. # Give it the message
  60. await self.input_queue.put(message)
  61. async def receive_output(self, timeout=1):
  62. """
  63. Receives a single message from the application, with optional timeout.
  64. """
  65. # Make sure there's not an exception to raise from the task
  66. if self.future.done():
  67. self.future.result()
  68. # Wait and receive the message
  69. try:
  70. async with async_timeout(timeout):
  71. return await self.output_queue.get()
  72. except asyncio.TimeoutError as e:
  73. # See if we have another error to raise inside
  74. if self.future.done():
  75. self.future.result()
  76. else:
  77. self.future.cancel()
  78. try:
  79. await self.future
  80. except asyncio.CancelledError:
  81. pass
  82. raise e
  83. async def receive_nothing(self, timeout=0.1, interval=0.01):
  84. """
  85. Checks that there is no message to receive in the given time.
  86. """
  87. # `interval` has precedence over `timeout`
  88. start = time.monotonic()
  89. while time.monotonic() - start < timeout:
  90. if not self.output_queue.empty():
  91. return False
  92. await asyncio.sleep(interval)
  93. return self.output_queue.empty()