local.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import asyncio
  2. import contextlib
  3. import contextvars
  4. import threading
  5. from typing import Any, Dict, Union
  6. class _CVar:
  7. """Storage utility for Local."""
  8. def __init__(self) -> None:
  9. self._data: "contextvars.ContextVar[Dict[str, Any]]" = contextvars.ContextVar(
  10. "asgiref.local"
  11. )
  12. def __getattr__(self, key):
  13. storage_object = self._data.get({})
  14. try:
  15. return storage_object[key]
  16. except KeyError:
  17. raise AttributeError(f"{self!r} object has no attribute {key!r}")
  18. def __setattr__(self, key: str, value: Any) -> None:
  19. if key == "_data":
  20. return super().__setattr__(key, value)
  21. storage_object = self._data.get({})
  22. storage_object[key] = value
  23. self._data.set(storage_object)
  24. def __delattr__(self, key: str) -> None:
  25. storage_object = self._data.get({})
  26. if key in storage_object:
  27. del storage_object[key]
  28. self._data.set(storage_object)
  29. else:
  30. raise AttributeError(f"{self!r} object has no attribute {key!r}")
  31. class Local:
  32. """Local storage for async tasks.
  33. This is a namespace object (similar to `threading.local`) where data is
  34. also local to the current async task (if there is one).
  35. In async threads, local means in the same sense as the `contextvars`
  36. module - i.e. a value set in an async frame will be visible:
  37. - to other async code `await`-ed from this frame.
  38. - to tasks spawned using `asyncio` utilities (`create_task`, `wait_for`,
  39. `gather` and probably others).
  40. - to code scheduled in a sync thread using `sync_to_async`
  41. In "sync" threads (a thread with no async event loop running), the
  42. data is thread-local, but additionally shared with async code executed
  43. via the `async_to_sync` utility, which schedules async code in a new thread
  44. and copies context across to that thread.
  45. If `thread_critical` is True, then the local will only be visible per-thread,
  46. behaving exactly like `threading.local` if the thread is sync, and as
  47. `contextvars` if the thread is async. This allows genuinely thread-sensitive
  48. code (such as DB handles) to be kept stricly to their initial thread and
  49. disable the sharing across `sync_to_async` and `async_to_sync` wrapped calls.
  50. Unlike plain `contextvars` objects, this utility is threadsafe.
  51. """
  52. def __init__(self, thread_critical: bool = False) -> None:
  53. self._thread_critical = thread_critical
  54. self._thread_lock = threading.RLock()
  55. self._storage: "Union[threading.local, _CVar]"
  56. if thread_critical:
  57. # Thread-local storage
  58. self._storage = threading.local()
  59. else:
  60. # Contextvar storage
  61. self._storage = _CVar()
  62. @contextlib.contextmanager
  63. def _lock_storage(self):
  64. # Thread safe access to storage
  65. if self._thread_critical:
  66. try:
  67. # this is a test for are we in a async or sync
  68. # thread - will raise RuntimeError if there is
  69. # no current loop
  70. asyncio.get_running_loop()
  71. except RuntimeError:
  72. # We are in a sync thread, the storage is
  73. # just the plain thread local (i.e, "global within
  74. # this thread" - it doesn't matter where you are
  75. # in a call stack you see the same storage)
  76. yield self._storage
  77. else:
  78. # We are in an async thread - storage is still
  79. # local to this thread, but additionally should
  80. # behave like a context var (is only visible with
  81. # the same async call stack)
  82. # Ensure context exists in the current thread
  83. if not hasattr(self._storage, "cvar"):
  84. self._storage.cvar = _CVar()
  85. # self._storage is a thread local, so the members
  86. # can't be accessed in another thread (we don't
  87. # need any locks)
  88. yield self._storage.cvar
  89. else:
  90. # Lock for thread_critical=False as other threads
  91. # can access the exact same storage object
  92. with self._thread_lock:
  93. yield self._storage
  94. def __getattr__(self, key):
  95. with self._lock_storage() as storage:
  96. return getattr(storage, key)
  97. def __setattr__(self, key, value):
  98. if key in ("_local", "_storage", "_thread_critical", "_thread_lock"):
  99. return super().__setattr__(key, value)
  100. with self._lock_storage() as storage:
  101. setattr(storage, key, value)
  102. def __delattr__(self, key):
  103. with self._lock_storage() as storage:
  104. delattr(storage, key)