| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 | from __future__ import annotationsimport asyncioimport inspectimport sysimport typing as tfrom functools import partialfrom weakref import reffrom blinker._saferef import BoundMethodWeakrefIdentityType = t.Union[t.Tuple[int, int], str, int]class _symbol:    def __init__(self, name):        """Construct a new named symbol."""        self.__name__ = self.name = name    def __reduce__(self):        return symbol, (self.name,)    def __repr__(self):        return self.name_symbol.__name__ = "symbol"class symbol:    """A constant symbol.    >>> symbol('foo') is symbol('foo')    True    >>> symbol('foo')    foo    A slight refinement of the MAGICCOOKIE=object() pattern.  The primary    advantage of symbol() is its repr().  They are also singletons.    Repeated calls of symbol('name') will all return the same instance.    """    symbols = {}  # type: ignore[var-annotated]    def __new__(cls, name):        try:            return cls.symbols[name]        except KeyError:            return cls.symbols.setdefault(name, _symbol(name))def hashable_identity(obj: object) -> IdentityType:    if hasattr(obj, "__func__"):        return (id(obj.__func__), id(obj.__self__))  # type: ignore[attr-defined]    elif hasattr(obj, "im_func"):        return (id(obj.im_func), id(obj.im_self))  # type: ignore[attr-defined]    elif isinstance(obj, (int, str)):        return obj    else:        return id(obj)WeakTypes = (ref, BoundMethodWeakref)class annotatable_weakref(ref):    """A weakref.ref that supports custom instance attributes."""    receiver_id: t.Optional[IdentityType]    sender_id: t.Optional[IdentityType]def reference(  # type: ignore[no-untyped-def]    object, callback=None, **annotations) -> annotatable_weakref:    """Return an annotated weak ref."""    if callable(object):        weak = callable_reference(object, callback)    else:        weak = annotatable_weakref(object, callback)    for key, value in annotations.items():        setattr(weak, key, value)    return weak  # type: ignore[no-any-return]def callable_reference(object, callback=None):    """Return an annotated weak ref, supporting bound instance methods."""    if hasattr(object, "im_self") and object.im_self is not None:        return BoundMethodWeakref(target=object, on_delete=callback)    elif hasattr(object, "__self__") and object.__self__ is not None:        return BoundMethodWeakref(target=object, on_delete=callback)    return annotatable_weakref(object, callback)class lazy_property:    """A @property that is only evaluated once."""    def __init__(self, deferred):        self._deferred = deferred        self.__doc__ = deferred.__doc__    def __get__(self, obj, cls):        if obj is None:            return self        value = self._deferred(obj)        setattr(obj, self._deferred.__name__, value)        return valuedef is_coroutine_function(func: t.Any) -> bool:    # Python < 3.8 does not correctly determine partially wrapped    # coroutine functions are coroutine functions, hence the need for    # this to exist. Code taken from CPython.    if sys.version_info >= (3, 8):        return asyncio.iscoroutinefunction(func)    else:        # Note that there is something special about the AsyncMock        # such that it isn't determined as a coroutine function        # without an explicit check.        try:            from unittest.mock import AsyncMock  # type: ignore[attr-defined]            if isinstance(func, AsyncMock):                return True        except ImportError:            # Not testing, no asynctest to import            pass        while inspect.ismethod(func):            func = func.__func__        while isinstance(func, partial):            func = func.func        if not inspect.isfunction(func):            return False        if func.__code__.co_flags & inspect.CO_COROUTINE:            return True        acic = asyncio.coroutines._is_coroutine  # type: ignore[attr-defined]        return getattr(func, "_is_coroutine", None) is acic
 |