| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 | from __future__ import annotationsimport loggingimport reimport sysimport typing as tfrom datetime import datetimefrom datetime import timezoneif t.TYPE_CHECKING:    from _typeshed.wsgi import WSGIEnvironment    from .wrappers.request import Request_logger: logging.Logger | None = Noneclass _Missing:    def __repr__(self) -> str:        return "no value"    def __reduce__(self) -> str:        return "_missing"_missing = _Missing()def _wsgi_decoding_dance(s: str) -> str:    return s.encode("latin1").decode(errors="replace")def _wsgi_encoding_dance(s: str) -> str:    return s.encode().decode("latin1")def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:    env = getattr(obj, "environ", obj)    assert isinstance(        env, dict    ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"    return envdef _has_level_handler(logger: logging.Logger) -> bool:    """Check if there is a handler in the logging chain that will handle    the given logger's effective level.    """    level = logger.getEffectiveLevel()    current = logger    while current:        if any(handler.level <= level for handler in current.handlers):            return True        if not current.propagate:            break        current = current.parent  # type: ignore    return Falseclass _ColorStreamHandler(logging.StreamHandler):    """On Windows, wrap stream with Colorama for ANSI style support."""    def __init__(self) -> None:        try:            import colorama        except ImportError:            stream = None        else:            stream = colorama.AnsiToWin32(sys.stderr)        super().__init__(stream)def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:    """Log a message to the 'werkzeug' logger.    The logger is created the first time it is needed. If there is no    level set, it is set to :data:`logging.INFO`. If there is no handler    for the logger's effective level, a :class:`logging.StreamHandler`    is added.    """    global _logger    if _logger is None:        _logger = logging.getLogger("werkzeug")        if _logger.level == logging.NOTSET:            _logger.setLevel(logging.INFO)        if not _has_level_handler(_logger):            _logger.addHandler(_ColorStreamHandler())    getattr(_logger, type)(message.rstrip(), *args, **kwargs)@t.overloaddef _dt_as_utc(dt: None) -> None:    ...@t.overloaddef _dt_as_utc(dt: datetime) -> datetime:    ...def _dt_as_utc(dt: datetime | None) -> datetime | None:    if dt is None:        return dt    if dt.tzinfo is None:        return dt.replace(tzinfo=timezone.utc)    elif dt.tzinfo != timezone.utc:        return dt.astimezone(timezone.utc)    return dt_TAccessorValue = t.TypeVar("_TAccessorValue")class _DictAccessorProperty(t.Generic[_TAccessorValue]):    """Baseclass for `environ_property` and `header_property`."""    read_only = False    def __init__(        self,        name: str,        default: _TAccessorValue | None = None,        load_func: t.Callable[[str], _TAccessorValue] | None = None,        dump_func: t.Callable[[_TAccessorValue], str] | None = None,        read_only: bool | None = None,        doc: str | None = None,    ) -> None:        self.name = name        self.default = default        self.load_func = load_func        self.dump_func = dump_func        if read_only is not None:            self.read_only = read_only        self.__doc__ = doc    def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:        raise NotImplementedError    @t.overload    def __get__(        self, instance: None, owner: type    ) -> _DictAccessorProperty[_TAccessorValue]:        ...    @t.overload    def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue:        ...    def __get__(        self, instance: t.Any | None, owner: type    ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:        if instance is None:            return self        storage = self.lookup(instance)        if self.name not in storage:            return self.default  # type: ignore        value = storage[self.name]        if self.load_func is not None:            try:                return self.load_func(value)            except (ValueError, TypeError):                return self.default  # type: ignore        return value  # type: ignore    def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:        if self.read_only:            raise AttributeError("read only property")        if self.dump_func is not None:            self.lookup(instance)[self.name] = self.dump_func(value)        else:            self.lookup(instance)[self.name] = value    def __delete__(self, instance: t.Any) -> None:        if self.read_only:            raise AttributeError("read only property")        self.lookup(instance).pop(self.name, None)    def __repr__(self) -> str:        return f"<{type(self).__name__} {self.name}>"_plain_int_re = re.compile(r"-?\d+", re.ASCII)def _plain_int(value: str) -> int:    """Parse an int only if it is only ASCII digits and ``-``.    This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but    are not allowed in HTTP header values.    Any leading or trailing whitespace is stripped    """    value = value.strip()    if _plain_int_re.fullmatch(value) is None:        raise ValueError    return int(value)
 |