_utilities.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from __future__ import annotations
  2. import asyncio
  3. import inspect
  4. import sys
  5. import typing as t
  6. from functools import partial
  7. from weakref import ref
  8. from blinker._saferef import BoundMethodWeakref
  9. IdentityType = t.Union[t.Tuple[int, int], str, int]
  10. class _symbol:
  11. def __init__(self, name):
  12. """Construct a new named symbol."""
  13. self.__name__ = self.name = name
  14. def __reduce__(self):
  15. return symbol, (self.name,)
  16. def __repr__(self):
  17. return self.name
  18. _symbol.__name__ = "symbol"
  19. class symbol:
  20. """A constant symbol.
  21. >>> symbol('foo') is symbol('foo')
  22. True
  23. >>> symbol('foo')
  24. foo
  25. A slight refinement of the MAGICCOOKIE=object() pattern. The primary
  26. advantage of symbol() is its repr(). They are also singletons.
  27. Repeated calls of symbol('name') will all return the same instance.
  28. """
  29. symbols = {} # type: ignore[var-annotated]
  30. def __new__(cls, name):
  31. try:
  32. return cls.symbols[name]
  33. except KeyError:
  34. return cls.symbols.setdefault(name, _symbol(name))
  35. def hashable_identity(obj: object) -> IdentityType:
  36. if hasattr(obj, "__func__"):
  37. return (id(obj.__func__), id(obj.__self__)) # type: ignore[attr-defined]
  38. elif hasattr(obj, "im_func"):
  39. return (id(obj.im_func), id(obj.im_self)) # type: ignore[attr-defined]
  40. elif isinstance(obj, (int, str)):
  41. return obj
  42. else:
  43. return id(obj)
  44. WeakTypes = (ref, BoundMethodWeakref)
  45. class annotatable_weakref(ref):
  46. """A weakref.ref that supports custom instance attributes."""
  47. receiver_id: t.Optional[IdentityType]
  48. sender_id: t.Optional[IdentityType]
  49. def reference( # type: ignore[no-untyped-def]
  50. object, callback=None, **annotations
  51. ) -> annotatable_weakref:
  52. """Return an annotated weak ref."""
  53. if callable(object):
  54. weak = callable_reference(object, callback)
  55. else:
  56. weak = annotatable_weakref(object, callback)
  57. for key, value in annotations.items():
  58. setattr(weak, key, value)
  59. return weak # type: ignore[no-any-return]
  60. def callable_reference(object, callback=None):
  61. """Return an annotated weak ref, supporting bound instance methods."""
  62. if hasattr(object, "im_self") and object.im_self is not None:
  63. return BoundMethodWeakref(target=object, on_delete=callback)
  64. elif hasattr(object, "__self__") and object.__self__ is not None:
  65. return BoundMethodWeakref(target=object, on_delete=callback)
  66. return annotatable_weakref(object, callback)
  67. class lazy_property:
  68. """A @property that is only evaluated once."""
  69. def __init__(self, deferred):
  70. self._deferred = deferred
  71. self.__doc__ = deferred.__doc__
  72. def __get__(self, obj, cls):
  73. if obj is None:
  74. return self
  75. value = self._deferred(obj)
  76. setattr(obj, self._deferred.__name__, value)
  77. return value
  78. def is_coroutine_function(func: t.Any) -> bool:
  79. # Python < 3.8 does not correctly determine partially wrapped
  80. # coroutine functions are coroutine functions, hence the need for
  81. # this to exist. Code taken from CPython.
  82. if sys.version_info >= (3, 8):
  83. return asyncio.iscoroutinefunction(func)
  84. else:
  85. # Note that there is something special about the AsyncMock
  86. # such that it isn't determined as a coroutine function
  87. # without an explicit check.
  88. try:
  89. from unittest.mock import AsyncMock # type: ignore[attr-defined]
  90. if isinstance(func, AsyncMock):
  91. return True
  92. except ImportError:
  93. # Not testing, no asynctest to import
  94. pass
  95. while inspect.ismethod(func):
  96. func = func.__func__
  97. while isinstance(func, partial):
  98. func = func.func
  99. if not inspect.isfunction(func):
  100. return False
  101. if func.__code__.co_flags & inspect.CO_COROUTINE:
  102. return True
  103. acic = asyncio.coroutines._is_coroutine # type: ignore[attr-defined]
  104. return getattr(func, "_is_coroutine", None) is acic