| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 | from __future__ import annotationsimport reimport typing as tfrom .._internal import _missingfrom ..exceptions import BadRequestKeyErrorfrom .mixins import ImmutableHeadersMixinfrom .structures import iter_multi_itemsfrom .structures import MultiDictclass Headers:    """An object that stores some headers. It has a dict-like interface,    but is ordered, can store the same key multiple times, and iterating    yields ``(key, value)`` pairs instead of only keys.    This data structure is useful if you want a nicer way to handle WSGI    headers which are stored as tuples in a list.    From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is    also a subclass of the :class:`~exceptions.BadRequest` HTTP exception    and will render a page for a ``400 BAD REQUEST`` if caught in a    catch-all for HTTP exceptions.    Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`    class, with the exception of `__getitem__`.  :mod:`wsgiref` will return    `None` for ``headers['missing']``, whereas :class:`Headers` will raise    a :class:`KeyError`.    To create a new ``Headers`` object, pass it a list, dict, or    other ``Headers`` object with default values. These values are    validated the same way values added later are.    :param defaults: The list of default values for the :class:`Headers`.    .. versionchanged:: 2.1.0        Default values are validated the same as values added later.    .. versionchanged:: 0.9       This data structure now stores unicode values similar to how the       multi dicts do it.  The main difference is that bytes can be set as       well which will automatically be latin1 decoded.    .. versionchanged:: 0.9       The :meth:`linked` function was removed without replacement as it       was an API that does not support the changes to the encoding model.    """    def __init__(self, defaults=None):        self._list = []        if defaults is not None:            self.extend(defaults)    def __getitem__(self, key, _get_mode=False):        if not _get_mode:            if isinstance(key, int):                return self._list[key]            elif isinstance(key, slice):                return self.__class__(self._list[key])        if not isinstance(key, str):            raise BadRequestKeyError(key)        ikey = key.lower()        for k, v in self._list:            if k.lower() == ikey:                return v        # micro optimization: if we are in get mode we will catch that        # exception one stack level down so we can raise a standard        # key error instead of our special one.        if _get_mode:            raise KeyError()        raise BadRequestKeyError(key)    def __eq__(self, other):        def lowered(item):            return (item[0].lower(),) + item[1:]        return other.__class__ is self.__class__ and set(            map(lowered, other._list)        ) == set(map(lowered, self._list))    __hash__ = None    def get(self, key, default=None, type=None):        """Return the default value if the requested data doesn't exist.        If `type` is provided and is a callable it should convert the value,        return it or raise a :exc:`ValueError` if that is not possible.  In        this case the function will return the default as if the value was not        found:        >>> d = Headers([('Content-Length', '42')])        >>> d.get('Content-Length', type=int)        42        :param key: The key to be looked up.        :param default: The default value to be returned if the key can't                        be looked up.  If not further specified `None` is                        returned.        :param type: A callable that is used to cast the value in the                     :class:`Headers`.  If a :exc:`ValueError` is raised                     by this callable the default value is returned.        .. versionchanged:: 3.0            The ``as_bytes`` parameter was removed.        .. versionchanged:: 0.9            The ``as_bytes`` parameter was added.        """        try:            rv = self.__getitem__(key, _get_mode=True)        except KeyError:            return default        if type is None:            return rv        try:            return type(rv)        except ValueError:            return default    def getlist(self, key, type=None):        """Return the list of items for a given key. If that key is not in the        :class:`Headers`, the return value will be an empty list.  Just like        :meth:`get`, :meth:`getlist` accepts a `type` parameter.  All items will        be converted with the callable defined there.        :param key: The key to be looked up.        :param type: A callable that is used to cast the value in the                     :class:`Headers`.  If a :exc:`ValueError` is raised                     by this callable the value will be removed from the list.        :return: a :class:`list` of all the values for the key.        .. versionchanged:: 3.0            The ``as_bytes`` parameter was removed.        .. versionchanged:: 0.9            The ``as_bytes`` parameter was added.        """        ikey = key.lower()        result = []        for k, v in self:            if k.lower() == ikey:                if type is not None:                    try:                        v = type(v)                    except ValueError:                        continue                result.append(v)        return result    def get_all(self, name):        """Return a list of all the values for the named field.        This method is compatible with the :mod:`wsgiref`        :meth:`~wsgiref.headers.Headers.get_all` method.        """        return self.getlist(name)    def items(self, lower=False):        for key, value in self:            if lower:                key = key.lower()            yield key, value    def keys(self, lower=False):        for key, _ in self.items(lower):            yield key    def values(self):        for _, value in self.items():            yield value    def extend(self, *args, **kwargs):        """Extend headers in this object with items from another object        containing header items as well as keyword arguments.        To replace existing keys instead of extending, use        :meth:`update` instead.        If provided, the first argument can be another :class:`Headers`        object, a :class:`MultiDict`, :class:`dict`, or iterable of        pairs.        .. versionchanged:: 1.0            Support :class:`MultiDict`. Allow passing ``kwargs``.        """        if len(args) > 1:            raise TypeError(f"update expected at most 1 arguments, got {len(args)}")        if args:            for key, value in iter_multi_items(args[0]):                self.add(key, value)        for key, value in iter_multi_items(kwargs):            self.add(key, value)    def __delitem__(self, key, _index_operation=True):        if _index_operation and isinstance(key, (int, slice)):            del self._list[key]            return        key = key.lower()        new = []        for k, v in self._list:            if k.lower() != key:                new.append((k, v))        self._list[:] = new    def remove(self, key):        """Remove a key.        :param key: The key to be removed.        """        return self.__delitem__(key, _index_operation=False)    def pop(self, key=None, default=_missing):        """Removes and returns a key or index.        :param key: The key to be popped.  If this is an integer the item at                    that position is removed, if it's a string the value for                    that key is.  If the key is omitted or `None` the last                    item is removed.        :return: an item.        """        if key is None:            return self._list.pop()        if isinstance(key, int):            return self._list.pop(key)        try:            rv = self[key]            self.remove(key)        except KeyError:            if default is not _missing:                return default            raise        return rv    def popitem(self):        """Removes a key or index and returns a (key, value) item."""        return self.pop()    def __contains__(self, key):        """Check if a key is present."""        try:            self.__getitem__(key, _get_mode=True)        except KeyError:            return False        return True    def __iter__(self):        """Yield ``(key, value)`` tuples."""        return iter(self._list)    def __len__(self):        return len(self._list)    def add(self, _key, _value, **kw):        """Add a new header tuple to the list.        Keyword arguments can specify additional parameters for the header        value, with underscores converted to dashes::        >>> d = Headers()        >>> d.add('Content-Type', 'text/plain')        >>> d.add('Content-Disposition', 'attachment', filename='foo.png')        The keyword argument dumping uses :func:`dump_options_header`        behind the scenes.        .. versionadded:: 0.4.1            keyword arguments were added for :mod:`wsgiref` compatibility.        """        if kw:            _value = _options_header_vkw(_value, kw)        _value = _str_header_value(_value)        self._list.append((_key, _value))    def add_header(self, _key, _value, **_kw):        """Add a new header tuple to the list.        An alias for :meth:`add` for compatibility with the :mod:`wsgiref`        :meth:`~wsgiref.headers.Headers.add_header` method.        """        self.add(_key, _value, **_kw)    def clear(self):        """Clears all headers."""        del self._list[:]    def set(self, _key, _value, **kw):        """Remove all header tuples for `key` and add a new one.  The newly        added key either appears at the end of the list if there was no        entry or replaces the first one.        Keyword arguments can specify additional parameters for the header        value, with underscores converted to dashes.  See :meth:`add` for        more information.        .. versionchanged:: 0.6.1           :meth:`set` now accepts the same arguments as :meth:`add`.        :param key: The key to be inserted.        :param value: The value to be inserted.        """        if kw:            _value = _options_header_vkw(_value, kw)        _value = _str_header_value(_value)        if not self._list:            self._list.append((_key, _value))            return        listiter = iter(self._list)        ikey = _key.lower()        for idx, (old_key, _old_value) in enumerate(listiter):            if old_key.lower() == ikey:                # replace first occurrence                self._list[idx] = (_key, _value)                break        else:            self._list.append((_key, _value))            return        self._list[idx + 1 :] = [t for t in listiter if t[0].lower() != ikey]    def setlist(self, key, values):        """Remove any existing values for a header and add new ones.        :param key: The header key to set.        :param values: An iterable of values to set for the key.        .. versionadded:: 1.0        """        if values:            values_iter = iter(values)            self.set(key, next(values_iter))            for value in values_iter:                self.add(key, value)        else:            self.remove(key)    def setdefault(self, key, default):        """Return the first value for the key if it is in the headers,        otherwise set the header to the value given by ``default`` and        return that.        :param key: The header key to get.        :param default: The value to set for the key if it is not in the            headers.        """        if key in self:            return self[key]        self.set(key, default)        return default    def setlistdefault(self, key, default):        """Return the list of values for the key if it is in the        headers, otherwise set the header to the list of values given        by ``default`` and return that.        Unlike :meth:`MultiDict.setlistdefault`, modifying the returned        list will not affect the headers.        :param key: The header key to get.        :param default: An iterable of values to set for the key if it            is not in the headers.        .. versionadded:: 1.0        """        if key not in self:            self.setlist(key, default)        return self.getlist(key)    def __setitem__(self, key, value):        """Like :meth:`set` but also supports index/slice based setting."""        if isinstance(key, (slice, int)):            if isinstance(key, int):                value = [value]            value = [(k, _str_header_value(v)) for (k, v) in value]            if isinstance(key, int):                self._list[key] = value[0]            else:                self._list[key] = value        else:            self.set(key, value)    def update(self, *args, **kwargs):        """Replace headers in this object with items from another        headers object and keyword arguments.        To extend existing keys instead of replacing, use :meth:`extend`        instead.        If provided, the first argument can be another :class:`Headers`        object, a :class:`MultiDict`, :class:`dict`, or iterable of        pairs.        .. versionadded:: 1.0        """        if len(args) > 1:            raise TypeError(f"update expected at most 1 arguments, got {len(args)}")        if args:            mapping = args[0]            if isinstance(mapping, (Headers, MultiDict)):                for key in mapping.keys():                    self.setlist(key, mapping.getlist(key))            elif isinstance(mapping, dict):                for key, value in mapping.items():                    if isinstance(value, (list, tuple)):                        self.setlist(key, value)                    else:                        self.set(key, value)            else:                for key, value in mapping:                    self.set(key, value)        for key, value in kwargs.items():            if isinstance(value, (list, tuple)):                self.setlist(key, value)            else:                self.set(key, value)    def to_wsgi_list(self):        """Convert the headers into a list suitable for WSGI.        :return: list        """        return list(self)    def copy(self):        return self.__class__(self._list)    def __copy__(self):        return self.copy()    def __str__(self):        """Returns formatted headers suitable for HTTP transmission."""        strs = []        for key, value in self.to_wsgi_list():            strs.append(f"{key}: {value}")        strs.append("\r\n")        return "\r\n".join(strs)    def __repr__(self):        return f"{type(self).__name__}({list(self)!r})"def _options_header_vkw(value: str, kw: dict[str, t.Any]):    return http.dump_options_header(        value, {k.replace("_", "-"): v for k, v in kw.items()}    )_newline_re = re.compile(r"[\r\n]")def _str_header_value(value: t.Any) -> str:    if not isinstance(value, str):        value = str(value)    if _newline_re.search(value) is not None:        raise ValueError("Header values must not contain newline characters.")    return valueclass EnvironHeaders(ImmutableHeadersMixin, Headers):    """Read only version of the headers from a WSGI environment.  This    provides the same interface as `Headers` and is constructed from    a WSGI environment.    From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a    subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will    render a page for a ``400 BAD REQUEST`` if caught in a catch-all for    HTTP exceptions.    """    def __init__(self, environ):        self.environ = environ    def __eq__(self, other):        return self.environ is other.environ    __hash__ = None    def __getitem__(self, key, _get_mode=False):        # _get_mode is a no-op for this class as there is no index but        # used because get() calls it.        if not isinstance(key, str):            raise KeyError(key)        key = key.upper().replace("-", "_")        if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:            return self.environ[key]        return self.environ[f"HTTP_{key}"]    def __len__(self):        # the iter is necessary because otherwise list calls our        # len which would call list again and so forth.        return len(list(iter(self)))    def __iter__(self):        for key, value in self.environ.items():            if key.startswith("HTTP_") and key not in {                "HTTP_CONTENT_TYPE",                "HTTP_CONTENT_LENGTH",            }:                yield key[5:].replace("_", "-").title(), value            elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:                yield key.replace("_", "-").title(), value    def copy(self):        raise TypeError(f"cannot create {type(self).__name__!r} copies")# circular dependenciesfrom .. import http
 |