File CVE-2021-41945-copy_with-data-leak.patch of Package python-httpx
From d07c4b4407b4f7ee1fef29b9e3f6fd57de407b51 Mon Sep 17 00:00:00 2001
From: Tom Christie <tom@tomchristie.com>
Date: Wed, 16 Feb 2022 21:02:13 +0000
Subject: [PATCH] Move URL and QueryParams to new '_urls.py' module (#2084)
---
httpx/__init__.py | 3 +-
httpx/_client.py | 3 +-
httpx/_models.py | 772 +--------------------------------------------
httpx/_types.py | 3 +-
httpx/_urls.py | 773 ++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 780 insertions(+), 774 deletions(-)
create mode 100644 httpx/_urls.py
diff --git a/httpx/__init__.py b/httpx/__init__.py
index b6303deb3..b93ca9222 100644
--- a/httpx/__init__.py
+++ b/httpx/__init__.py
@@ -34,7 +34,7 @@
WriteError,
WriteTimeout,
)
-from ._models import URL, Cookies, Headers, QueryParams, Request, Response
+from ._models import Cookies, Headers, Request, Response
from ._status_codes import codes
from ._transports.asgi import ASGITransport
from ._transports.base import AsyncBaseTransport, BaseTransport
@@ -42,6 +42,7 @@
from ._transports.mock import MockTransport
from ._transports.wsgi import WSGITransport
from ._types import AsyncByteStream, SyncByteStream
+from ._urls import URL, QueryParams
try:
from ._main import main
diff --git a/httpx/_client.py b/httpx/_client.py
index 5e4c8e271..c57cfb6ea 100644
--- a/httpx/_client.py
+++ b/httpx/_client.py
@@ -23,7 +23,7 @@
TooManyRedirects,
request_context,
)
-from ._models import URL, Cookies, Headers, QueryParams, Request, Response
+from ._models import Cookies, Headers, Request, Response
from ._status_codes import codes
from ._transports.asgi import ASGITransport
from ._transports.base import AsyncBaseTransport, BaseTransport
@@ -45,6 +45,7 @@
URLTypes,
VerifyTypes,
)
+from ._urls import URL, QueryParams
from ._utils import (
NetRCInfo,
Timer,
diff --git a/httpx/_models.py b/httpx/_models.py
index 341b28ee0..5a213c356 100644
--- a/httpx/_models.py
+++ b/httpx/_models.py
@@ -6,12 +6,8 @@
import urllib.request
from collections.abc import MutableMapping
from http.cookiejar import Cookie, CookieJar
-from urllib.parse import parse_qs, quote, unquote, urlencode
import charset_normalizer
-import idna
-import rfc3986
-import rfc3986.exceptions
from ._content import ByteStream, UnattachedStream, encode_request, encode_response
from ._decoders import (
@@ -27,7 +23,6 @@
from ._exceptions import (
CookieConflict,
HTTPStatusError,
- InvalidURL,
RequestNotRead,
ResponseNotRead,
StreamClosed,
@@ -39,7 +34,6 @@
AsyncByteStream,
CookieTypes,
HeaderTypes,
- PrimitiveData,
QueryParamTypes,
RawURL,
RequestContent,
@@ -47,8 +41,8 @@
RequestFiles,
ResponseContent,
SyncByteStream,
- URLTypes,
)
+from ._urls import URL
from ._utils import (
guess_json_utf,
is_known_encoding,
@@ -56,773 +50,9 @@
normalize_header_value,
obfuscate_sensitive_headers,
parse_header_links,
- primitive_value_to_str,
)
-class URL:
- """
- url = httpx.URL("HTTPS://jo%40email.com:a%20secret@müller.de:1234/pa%20th?search=ab#anchorlink")
-
- assert url.scheme == "https"
- assert url.username == "jo@email.com"
- assert url.password == "a secret"
- assert url.userinfo == b"jo%40email.com:a%20secret"
- assert url.host == "müller.de"
- assert url.raw_host == b"xn--mller-kva.de"
- assert url.port == 1234
- assert url.netloc == b"xn--mller-kva.de:1234"
- assert url.path == "/pa th"
- assert url.query == b"?search=ab"
- assert url.raw_path == b"/pa%20th?search=ab"
- assert url.fragment == "anchorlink"
-
- The components of a URL are broken down like this:
-
- https://jo%40email.com:a%20secret@müller.de:1234/pa%20th?search=ab#anchorlink
- [scheme] [ username ] [password] [ host ][port][ path ] [ query ] [fragment]
- [ userinfo ] [ netloc ][ raw_path ]
-
- Note that:
-
- * `url.scheme` is normalized to always be lowercased.
-
- * `url.host` is normalized to always be lowercased. Internationalized domain
- names are represented in unicode, without IDNA encoding applied. For instance:
-
- url = httpx.URL("http://中国.icom.museum")
- assert url.host == "中国.icom.museum"
- url = httpx.URL("http://xn--fiqs8s.icom.museum")
- assert url.host == "中国.icom.museum"
-
- * `url.raw_host` is normalized to always be lowercased, and is IDNA encoded.
-
- url = httpx.URL("http://中国.icom.museum")
- assert url.raw_host == b"xn--fiqs8s.icom.museum"
- url = httpx.URL("http://xn--fiqs8s.icom.museum")
- assert url.raw_host == b"xn--fiqs8s.icom.museum"
-
- * `url.port` is either None or an integer. URLs that include the default port for
- "http", "https", "ws", "wss", and "ftp" schemes have their port normalized to `None`.
-
- assert httpx.URL("http://example.com") == httpx.URL("http://example.com:80")
- assert httpx.URL("http://example.com").port is None
- assert httpx.URL("http://example.com:80").port is None
-
- * `url.userinfo` is raw bytes, without URL escaping. Usually you'll want to work with
- `url.username` and `url.password` instead, which handle the URL escaping.
-
- * `url.raw_path` is raw bytes of both the path and query, without URL escaping.
- This portion is used as the target when constructing HTTP requests. Usually you'll
- want to work with `url.path` instead.
-
- * `url.query` is raw bytes, without URL escaping. A URL query string portion can only
- be properly URL escaped when decoding the parameter names and values themselves.
- """
-
- def __init__(
- self, url: typing.Union["URL", str, RawURL] = "", **kwargs: typing.Any
- ) -> None:
- if isinstance(url, (str, tuple)):
- if isinstance(url, tuple):
- raw_scheme, raw_host, port, raw_path = url
- scheme = raw_scheme.decode("ascii")
- host = raw_host.decode("ascii")
- if host and ":" in host and host[0] != "[":
- # it's an IPv6 address, so it should be enclosed in "[" and "]"
- # ref: https://tools.ietf.org/html/rfc2732#section-2
- # ref: https://tools.ietf.org/html/rfc3986#section-3.2.2
- host = f"[{host}]"
- port_str = "" if port is None else f":{port}"
- path = raw_path.decode("ascii")
- url = f"{scheme}://{host}{port_str}{path}"
-
- try:
- self._uri_reference = rfc3986.iri_reference(url).encode()
- except rfc3986.exceptions.InvalidAuthority as exc:
- raise InvalidURL(message=str(exc)) from None
-
- if self.is_absolute_url:
- # We don't want to normalize relative URLs, since doing so
- # removes any leading `../` portion.
- self._uri_reference = self._uri_reference.normalize()
- elif isinstance(url, URL):
- self._uri_reference = url._uri_reference
- else:
- raise TypeError(
- f"Invalid type for url. Expected str or httpx.URL, got {type(url)}: {url!r}"
- )
-
- # Perform port normalization, following the WHATWG spec for default ports.
- #
- # See:
- # * https://tools.ietf.org/html/rfc3986#section-3.2.3
- # * https://url.spec.whatwg.org/#url-miscellaneous
- # * https://url.spec.whatwg.org/#scheme-state
- default_port = {
- "ftp": ":21",
- "http": ":80",
- "https": ":443",
- "ws": ":80",
- "wss": ":443",
- }.get(self._uri_reference.scheme, "")
- authority = self._uri_reference.authority or ""
- if default_port and authority.endswith(default_port):
- authority = authority[: -len(default_port)]
- self._uri_reference = self._uri_reference.copy_with(authority=authority)
-
- if kwargs:
- self._uri_reference = self.copy_with(**kwargs)._uri_reference
-
- @property
- def scheme(self) -> str:
- """
- The URL scheme, such as "http", "https".
- Always normalised to lowercase.
- """
- return self._uri_reference.scheme or ""
-
- @property
- def raw_scheme(self) -> bytes:
- """
- The raw bytes representation of the URL scheme, such as b"http", b"https".
- Always normalised to lowercase.
- """
- return self.scheme.encode("ascii")
-
- @property
- def userinfo(self) -> bytes:
- """
- The URL userinfo as a raw bytestring.
- For example: b"jo%40email.com:a%20secret".
- """
- userinfo = self._uri_reference.userinfo or ""
- return userinfo.encode("ascii")
-
- @property
- def username(self) -> str:
- """
- The URL username as a string, with URL decoding applied.
- For example: "jo@email.com"
- """
- userinfo = self._uri_reference.userinfo or ""
- return unquote(userinfo.partition(":")[0])
-
- @property
- def password(self) -> str:
- """
- The URL password as a string, with URL decoding applied.
- For example: "a secret"
- """
- userinfo = self._uri_reference.userinfo or ""
- return unquote(userinfo.partition(":")[2])
-
- @property
- def host(self) -> str:
- """
- The URL host as a string.
- Always normalized to lowercase, with IDNA hosts decoded into unicode.
-
- Examples:
-
- url = httpx.URL("http://www.EXAMPLE.org")
- assert url.host == "www.example.org"
-
- url = httpx.URL("http://中国.icom.museum")
- assert url.host == "中国.icom.museum"
-
- url = httpx.URL("http://xn--fiqs8s.icom.museum")
- assert url.host == "中国.icom.museum"
-
- url = httpx.URL("https://[::ffff:192.168.0.1]")
- assert url.host == "::ffff:192.168.0.1"
- """
- host: str = self._uri_reference.host or ""
-
- if host and ":" in host and host[0] == "[":
- # it's an IPv6 address
- host = host.lstrip("[").rstrip("]")
-
- if host.startswith("xn--"):
- host = idna.decode(host)
-
- return host
-
- @property
- def raw_host(self) -> bytes:
- """
- The raw bytes representation of the URL host.
- Always normalized to lowercase, and IDNA encoded.
-
- Examples:
-
- url = httpx.URL("http://www.EXAMPLE.org")
- assert url.raw_host == b"www.example.org"
-
- url = httpx.URL("http://中国.icom.museum")
- assert url.raw_host == b"xn--fiqs8s.icom.museum"
-
- url = httpx.URL("http://xn--fiqs8s.icom.museum")
- assert url.raw_host == b"xn--fiqs8s.icom.museum"
-
- url = httpx.URL("https://[::ffff:192.168.0.1]")
- assert url.raw_host == b"::ffff:192.168.0.1"
- """
- host: str = self._uri_reference.host or ""
-
- if host and ":" in host and host[0] == "[":
- # it's an IPv6 address
- host = host.lstrip("[").rstrip("]")
-
- return host.encode("ascii")
-
- @property
- def port(self) -> typing.Optional[int]:
- """
- The URL port as an integer.
-
- Note that the URL class performs port normalization as per the WHATWG spec.
- Default ports for "http", "https", "ws", "wss", and "ftp" schemes are always
- treated as `None`.
-
- For example:
-
- assert httpx.URL("http://www.example.com") == httpx.URL("http://www.example.com:80")
- assert httpx.URL("http://www.example.com:80").port is None
- """
- port = self._uri_reference.port
- return int(port) if port else None
-
- @property
- def netloc(self) -> bytes:
- """
- Either `<host>` or `<host>:<port>` as bytes.
- Always normalized to lowercase, and IDNA encoded.
-
- This property may be used for generating the value of a request
- "Host" header.
- """
- host = self._uri_reference.host or ""
- port = self._uri_reference.port
- netloc = host.encode("ascii")
- if port:
- netloc = netloc + b":" + port.encode("ascii")
- return netloc
-
- @property
- def path(self) -> str:
- """
- The URL path as a string. Excluding the query string, and URL decoded.
-
- For example:
-
- url = httpx.URL("https://example.com/pa%20th")
- assert url.path == "/pa th"
- """
- path = self._uri_reference.path or "/"
- return unquote(path)
-
- @property
- def query(self) -> bytes:
- """
- The URL query string, as raw bytes, excluding the leading b"?".
-
- This is necessarily a bytewise interface, because we cannot
- perform URL decoding of this representation until we've parsed
- the keys and values into a QueryParams instance.
-
- For example:
-
- url = httpx.URL("https://example.com/?filter=some%20search%20terms")
- assert url.query == b"filter=some%20search%20terms"
- """
- query = self._uri_reference.query or ""
- return query.encode("ascii")
-
- @property
- def params(self) -> "QueryParams":
- """
- The URL query parameters, neatly parsed and packaged into an immutable
- multidict representation.
- """
- return QueryParams(self._uri_reference.query)
-
- @property
- def raw_path(self) -> bytes:
- """
- The complete URL path and query string as raw bytes.
- Used as the target when constructing HTTP requests.
-
- For example:
-
- GET /users?search=some%20text HTTP/1.1
- Host: www.example.org
- Connection: close
- """
- path = self._uri_reference.path or "/"
- if self._uri_reference.query is not None:
- path += "?" + self._uri_reference.query
- return path.encode("ascii")
-
- @property
- def fragment(self) -> str:
- """
- The URL fragments, as used in HTML anchors.
- As a string, without the leading '#'.
- """
- return unquote(self._uri_reference.fragment or "")
-
- @property
- def raw(self) -> RawURL:
- """
- The URL in the raw representation used by the low level
- transport API. See `BaseTransport.handle_request`.
-
- Provides the (scheme, host, port, target) for the outgoing request.
- """
- return (
- self.raw_scheme,
- self.raw_host,
- self.port,
- self.raw_path,
- )
-
- @property
- def is_absolute_url(self) -> bool:
- """
- Return `True` for absolute URLs such as 'http://example.com/path',
- and `False` for relative URLs such as '/path'.
- """
- # We don't use `.is_absolute` from `rfc3986` because it treats
- # URLs with a fragment portion as not absolute.
- # What we actually care about is if the URL provides
- # a scheme and hostname to which connections should be made.
- return bool(self._uri_reference.scheme and self._uri_reference.host)
-
- @property
- def is_relative_url(self) -> bool:
- """
- Return `False` for absolute URLs such as 'http://example.com/path',
- and `True` for relative URLs such as '/path'.
- """
- return not self.is_absolute_url
-
- def copy_with(self, **kwargs: typing.Any) -> "URL":
- """
- Copy this URL, returning a new URL with some components altered.
- Accepts the same set of parameters as the components that are made
- available via properties on the `URL` class.
-
- For example:
-
- url = httpx.URL("https://www.example.com").copy_with(username="jo@gmail.com", password="a secret")
- assert url == "https://jo%40email.com:a%20secret@www.example.com"
- """
- allowed = {
- "scheme": str,
- "username": str,
- "password": str,
- "userinfo": bytes,
- "host": str,
- "port": int,
- "netloc": bytes,
- "path": str,
- "query": bytes,
- "raw_path": bytes,
- "fragment": str,
- "params": object,
- }
-
- # Step 1
- # ======
- #
- # Perform type checking for all supported keyword arguments.
- for key, value in kwargs.items():
- if key not in allowed:
- message = f"{key!r} is an invalid keyword argument for copy_with()"
- raise TypeError(message)
- if value is not None and not isinstance(value, allowed[key]):
- expected = allowed[key].__name__
- seen = type(value).__name__
- message = f"Argument {key!r} must be {expected} but got {seen}"
- raise TypeError(message)
-
- # Step 2
- # ======
- #
- # Consolidate "username", "password", "userinfo", "host", "port" and "netloc"
- # into a single "authority" keyword, for `rfc3986`.
- if "username" in kwargs or "password" in kwargs:
- # Consolidate "username" and "password" into "userinfo".
- username = quote(kwargs.pop("username", self.username) or "")
- password = quote(kwargs.pop("password", self.password) or "")
- userinfo = f"{username}:{password}" if password else username
- kwargs["userinfo"] = userinfo.encode("ascii")
-
- if "host" in kwargs or "port" in kwargs:
- # Consolidate "host" and "port" into "netloc".
- host = kwargs.pop("host", self.host) or ""
- port = kwargs.pop("port", self.port)
-
- if host and ":" in host and host[0] != "[":
- # IPv6 addresses need to be escaped within square brackets.
- host = f"[{host}]"
-
- kwargs["netloc"] = (
- f"{host}:{port}".encode("ascii")
- if port is not None
- else host.encode("ascii")
- )
-
- if "userinfo" in kwargs or "netloc" in kwargs:
- # Consolidate "userinfo" and "netloc" into authority.
- userinfo = (kwargs.pop("userinfo", self.userinfo) or b"").decode("ascii")
- netloc = (kwargs.pop("netloc", self.netloc) or b"").decode("ascii")
- authority = f"{userinfo}@{netloc}" if userinfo else netloc
- kwargs["authority"] = authority
-
- # Step 3
- # ======
- #
- # Wrangle any "path", "query", "raw_path" and "params" keywords into
- # "query" and "path" keywords for `rfc3986`.
- if "raw_path" in kwargs:
- # If "raw_path" is included, then split it into "path" and "query" components.
- raw_path = kwargs.pop("raw_path") or b""
- path, has_query, query = raw_path.decode("ascii").partition("?")
- kwargs["path"] = path
- kwargs["query"] = query if has_query else None
-
- else:
- if kwargs.get("path") is not None:
- # Ensure `kwargs["path"] = <url quoted str>` for `rfc3986`.
- kwargs["path"] = quote(kwargs["path"])
-
- if kwargs.get("query") is not None:
- # Ensure `kwargs["query"] = <str>` for `rfc3986`.
- #
- # Note that `.copy_with(query=None)` and `.copy_with(query=b"")`
- # are subtly different. The `None` style will not include an empty
- # trailing "?" character.
- kwargs["query"] = kwargs["query"].decode("ascii")
-
- if "params" in kwargs:
- # Replace any "params" keyword with the raw "query" instead.
- #
- # Ensure that empty params use `kwargs["query"] = None` rather
- # than `kwargs["query"] = ""`, so that generated URLs do not
- # include an empty trailing "?".
- params = kwargs.pop("params")
- kwargs["query"] = None if not params else str(QueryParams(params))
-
- # Step 4
- # ======
- #
- # Ensure any fragment component is quoted.
- if kwargs.get("fragment") is not None:
- kwargs["fragment"] = quote(kwargs["fragment"])
-
- # Step 5
- # ======
- #
- # At this point kwargs may include keys for "scheme", "authority", "path",
- # "query" and "fragment". Together these constitute the entire URL.
- #
- # See https://tools.ietf.org/html/rfc3986#section-3
- #
- # foo://example.com:8042/over/there?name=ferret#nose
- # \_/ \______________/\_________/ \_________/ \__/
- # | | | | |
- # scheme authority path query fragment
- return URL(self._uri_reference.copy_with(**kwargs).unsplit())
-
- def copy_set_param(self, key: str, value: typing.Any = None) -> "URL":
- return self.copy_with(params=self.params.set(key, value))
-
- def copy_add_param(self, key: str, value: typing.Any = None) -> "URL":
- return self.copy_with(params=self.params.add(key, value))
-
- def copy_remove_param(self, key: str) -> "URL":
- return self.copy_with(params=self.params.remove(key))
-
- def copy_merge_params(self, params: QueryParamTypes) -> "URL":
- return self.copy_with(params=self.params.merge(params))
-
- def join(self, url: URLTypes) -> "URL":
- """
- Return an absolute URL, using this URL as the base.
-
- Eg.
-
- url = httpx.URL("https://www.example.com/test")
- url = url.join("/new/path")
- assert url == "https://www.example.com/new/path"
- """
- if self.is_relative_url:
- # Workaround to handle relative URLs, which otherwise raise
- # rfc3986.exceptions.ResolutionError when used as an argument
- # in `.resolve_with`.
- return (
- self.copy_with(scheme="http", host="example.com")
- .join(url)
- .copy_with(scheme=None, host=None)
- )
-
- # We drop any fragment portion, because RFC 3986 strictly
- # treats URLs with a fragment portion as not being absolute URLs.
- base_uri = self._uri_reference.copy_with(fragment=None)
- relative_url = URL(url)
- return URL(relative_url._uri_reference.resolve_with(base_uri).unsplit())
-
- def __hash__(self) -> int:
- return hash(str(self))
-
- def __eq__(self, other: typing.Any) -> bool:
- return isinstance(other, (URL, str)) and str(self) == str(URL(other))
-
- def __str__(self) -> str:
- return self._uri_reference.unsplit()
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- url_str = str(self)
- if self._uri_reference.userinfo:
- # Mask any password component in the URL representation, to lower the
- # risk of unintended leakage, such as in debug information and logging.
- username = quote(self.username)
- url_str = (
- rfc3986.urlparse(url_str)
- .copy_with(userinfo=f"{username}:[secure]")
- .unsplit()
- )
- return f"{class_name}({url_str!r})"
-
-
-class QueryParams(typing.Mapping[str, str]):
- """
- URL query parameters, as a multi-dict.
- """
-
- def __init__(self, *args: QueryParamTypes, **kwargs: typing.Any) -> None:
- assert len(args) < 2, "Too many arguments."
- assert not (args and kwargs), "Cannot mix named and unnamed arguments."
-
- value = args[0] if args else kwargs
-
- items: typing.Sequence[typing.Tuple[str, PrimitiveData]]
- if value is None or isinstance(value, (str, bytes)):
- value = value.decode("ascii") if isinstance(value, bytes) else value
- self._dict = parse_qs(value)
- elif isinstance(value, QueryParams):
- self._dict = {k: list(v) for k, v in value._dict.items()}
- else:
- dict_value: typing.Dict[typing.Any, typing.List[typing.Any]] = {}
- if isinstance(value, (list, tuple)):
- # Convert list inputs like:
- # [("a", "123"), ("a", "456"), ("b", "789")]
- # To a dict representation, like:
- # {"a": ["123", "456"], "b": ["789"]}
- for item in value:
- dict_value.setdefault(item[0], []).append(item[1])
- else:
- # Convert dict inputs like:
- # {"a": "123", "b": ["456", "789"]}
- # To dict inputs where values are always lists, like:
- # {"a": ["123"], "b": ["456", "789"]}
- dict_value = {
- k: list(v) if isinstance(v, (list, tuple)) else [v]
- for k, v in value.items()
- }
-
- # Ensure that keys and values are neatly coerced to strings.
- # We coerce values `True` and `False` to JSON-like "true" and "false"
- # representations, and coerce `None` values to the empty string.
- self._dict = {
- str(k): [primitive_value_to_str(item) for item in v]
- for k, v in dict_value.items()
- }
-
- def keys(self) -> typing.KeysView:
- """
- Return all the keys in the query params.
-
- Usage:
-
- q = httpx.QueryParams("a=123&a=456&b=789")
- assert list(q.keys()) == ["a", "b"]
- """
- return self._dict.keys()
-
- def values(self) -> typing.ValuesView:
- """
- Return all the values in the query params. If a key occurs more than once
- only the first item for that key is returned.
-
- Usage:
-
- q = httpx.QueryParams("a=123&a=456&b=789")
- assert list(q.values()) == ["123", "789"]
- """
- return {k: v[0] for k, v in self._dict.items()}.values()
-
- def items(self) -> typing.ItemsView:
- """
- Return all items in the query params. If a key occurs more than once
- only the first item for that key is returned.
-
- Usage:
-
- q = httpx.QueryParams("a=123&a=456&b=789")
- assert list(q.items()) == [("a", "123"), ("b", "789")]
- """
- return {k: v[0] for k, v in self._dict.items()}.items()
-
- def multi_items(self) -> typing.List[typing.Tuple[str, str]]:
- """
- Return all items in the query params. Allow duplicate keys to occur.
-
- Usage:
-
- q = httpx.QueryParams("a=123&a=456&b=789")
- assert list(q.multi_items()) == [("a", "123"), ("a", "456"), ("b", "789")]
- """
- multi_items: typing.List[typing.Tuple[str, str]] = []
- for k, v in self._dict.items():
- multi_items.extend([(k, i) for i in v])
- return multi_items
-
- def get(self, key: typing.Any, default: typing.Any = None) -> typing.Any:
- """
- Get a value from the query param for a given key. If the key occurs
- more than once, then only the first value is returned.
-
- Usage:
-
- q = httpx.QueryParams("a=123&a=456&b=789")
- assert q.get("a") == "123"
- """
- if key in self._dict:
- return self._dict[str(key)][0]
- return default
-
- def get_list(self, key: str) -> typing.List[str]:
- """
- Get all values from the query param for a given key.
-
- Usage:
-
- q = httpx.QueryParams("a=123&a=456&b=789")
- assert q.get_list("a") == ["123", "456"]
- """
- return list(self._dict.get(str(key), []))
-
- def set(self, key: str, value: typing.Any = None) -> "QueryParams":
- """
- Return a new QueryParams instance, setting the value of a key.
-
- Usage:
-
- q = httpx.QueryParams("a=123")
- q = q.set("a", "456")
- assert q == httpx.QueryParams("a=456")
- """
- q = QueryParams()
- q._dict = dict(self._dict)
- q._dict[str(key)] = [primitive_value_to_str(value)]
- return q
-
- def add(self, key: str, value: typing.Any = None) -> "QueryParams":
- """
- Return a new QueryParams instance, setting or appending the value of a key.
-
- Usage:
-
- q = httpx.QueryParams("a=123")
- q = q.add("a", "456")
- assert q == httpx.QueryParams("a=123&a=456")
- """
- q = QueryParams()
- q._dict = dict(self._dict)
- q._dict[str(key)] = q.get_list(key) + [primitive_value_to_str(value)]
- return q
-
- def remove(self, key: str) -> "QueryParams":
- """
- Return a new QueryParams instance, removing the value of a key.
-
- Usage:
-
- q = httpx.QueryParams("a=123")
- q = q.remove("a")
- assert q == httpx.QueryParams("")
- """
- q = QueryParams()
- q._dict = dict(self._dict)
- q._dict.pop(str(key), None)
- return q
-
- def merge(self, params: QueryParamTypes = None) -> "QueryParams":
- """
- Return a new QueryParams instance, updated with.
-
- Usage:
-
- q = httpx.QueryParams("a=123")
- q = q.merge({"b": "456"})
- assert q == httpx.QueryParams("a=123&b=456")
-
- q = httpx.QueryParams("a=123")
- q = q.merge({"a": "456", "b": "789"})
- assert q == httpx.QueryParams("a=456&b=789")
- """
- q = QueryParams(params)
- q._dict = {**self._dict, **q._dict}
- return q
-
- def __getitem__(self, key: typing.Any) -> str:
- return self._dict[key][0]
-
- def __contains__(self, key: typing.Any) -> bool:
- return key in self._dict
-
- def __iter__(self) -> typing.Iterator[typing.Any]:
- return iter(self.keys())
-
- def __len__(self) -> int:
- return len(self._dict)
-
- def __bool__(self) -> bool:
- return bool(self._dict)
-
- def __hash__(self) -> int:
- return hash(str(self))
-
- def __eq__(self, other: typing.Any) -> bool:
- if not isinstance(other, self.__class__):
- return False
- return sorted(self.multi_items()) == sorted(other.multi_items())
-
- def __str__(self) -> str:
- return urlencode(self.multi_items())
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- query_string = str(self)
- return f"{class_name}({query_string!r})"
-
- def update(self, params: QueryParamTypes = None) -> None:
- raise RuntimeError(
- "QueryParams are immutable since 0.18.0. "
- "Use `q = q.merge(...)` to create an updated copy."
- )
-
- def __setitem__(self, key: str, value: str) -> None:
- raise RuntimeError(
- "QueryParams are immutable since 0.18.0. "
- "Use `q = q.set(key, value)` to create an updated copy."
- )
-
-
class Headers(typing.MutableMapping[str, str]):
"""
HTTP headers, as a case-insensitive multi-dict.
diff --git a/httpx/_types.py b/httpx/_types.py
index f7ba4486c..be2744dcf 100644
--- a/httpx/_types.py
+++ b/httpx/_types.py
@@ -24,7 +24,8 @@
if TYPE_CHECKING: # pragma: no cover
from ._auth import Auth # noqa: F401
from ._config import Proxy, Timeout # noqa: F401
- from ._models import URL, Cookies, Headers, QueryParams, Request # noqa: F401
+ from ._models import Cookies, Headers, Request # noqa: F401
+ from ._urls import URL, QueryParams # noqa: F401
PrimitiveData = Optional[Union[str, int, float, bool]]
diff --git a/httpx/_urls.py b/httpx/_urls.py
new file mode 100644
index 000000000..70486bc9e
--- /dev/null
+++ b/httpx/_urls.py
@@ -0,0 +1,777 @@
+import typing
+from urllib.parse import parse_qs, quote, unquote, urlencode
+
+import idna
+import rfc3986
+import rfc3986.exceptions
+
+from ._exceptions import InvalidURL
+from ._types import PrimitiveData, QueryParamTypes, RawURL, URLTypes
+from ._utils import primitive_value_to_str
+
+
+class URL:
+ """
+ url = httpx.URL("HTTPS://jo%40email.com:a%20secret@müller.de:1234/pa%20th?search=ab#anchorlink")
+
+ assert url.scheme == "https"
+ assert url.username == "jo@email.com"
+ assert url.password == "a secret"
+ assert url.userinfo == b"jo%40email.com:a%20secret"
+ assert url.host == "müller.de"
+ assert url.raw_host == b"xn--mller-kva.de"
+ assert url.port == 1234
+ assert url.netloc == b"xn--mller-kva.de:1234"
+ assert url.path == "/pa th"
+ assert url.query == b"?search=ab"
+ assert url.raw_path == b"/pa%20th?search=ab"
+ assert url.fragment == "anchorlink"
+
+ The components of a URL are broken down like this:
+
+ https://jo%40email.com:a%20secret@müller.de:1234/pa%20th?search=ab#anchorlink
+ [scheme] [ username ] [password] [ host ][port][ path ] [ query ] [fragment]
+ [ userinfo ] [ netloc ][ raw_path ]
+
+ Note that:
+
+ * `url.scheme` is normalized to always be lowercased.
+
+ * `url.host` is normalized to always be lowercased. Internationalized domain
+ names are represented in unicode, without IDNA encoding applied. For instance:
+
+ url = httpx.URL("http://中国.icom.museum")
+ assert url.host == "中国.icom.museum"
+ url = httpx.URL("http://xn--fiqs8s.icom.museum")
+ assert url.host == "中国.icom.museum"
+
+ * `url.raw_host` is normalized to always be lowercased, and is IDNA encoded.
+
+ url = httpx.URL("http://中国.icom.museum")
+ assert url.raw_host == b"xn--fiqs8s.icom.museum"
+ url = httpx.URL("http://xn--fiqs8s.icom.museum")
+ assert url.raw_host == b"xn--fiqs8s.icom.museum"
+
+ * `url.port` is either None or an integer. URLs that include the default port for
+ "http", "https", "ws", "wss", and "ftp" schemes have their port normalized to `None`.
+
+ assert httpx.URL("http://example.com") == httpx.URL("http://example.com:80")
+ assert httpx.URL("http://example.com").port is None
+ assert httpx.URL("http://example.com:80").port is None
+
+ * `url.userinfo` is raw bytes, without URL escaping. Usually you'll want to work with
+ `url.username` and `url.password` instead, which handle the URL escaping.
+
+ * `url.raw_path` is raw bytes of both the path and query, without URL escaping.
+ This portion is used as the target when constructing HTTP requests. Usually you'll
+ want to work with `url.path` instead.
+
+ * `url.query` is raw bytes, without URL escaping. A URL query string portion can only
+ be properly URL escaped when decoding the parameter names and values themselves.
+ """
+
+ def __init__(
+ self, url: typing.Union["URL", str, RawURL] = "", **kwargs: typing.Any
+ ) -> None:
+ if isinstance(url, (str, tuple)):
+ if isinstance(url, tuple):
+ raw_scheme, raw_host, port, raw_path = url
+ scheme = raw_scheme.decode("ascii")
+ host = raw_host.decode("ascii")
+ if host and ":" in host and host[0] != "[":
+ # it's an IPv6 address, so it should be enclosed in "[" and "]"
+ # ref: https://tools.ietf.org/html/rfc2732#section-2
+ # ref: https://tools.ietf.org/html/rfc3986#section-3.2.2
+ host = f"[{host}]"
+ port_str = "" if port is None else f":{port}"
+ path = raw_path.decode("ascii")
+ url = f"{scheme}://{host}{port_str}{path}"
+
+ try:
+ self._uri_reference = rfc3986.iri_reference(url).encode()
+ except rfc3986.exceptions.InvalidAuthority as exc:
+ raise InvalidURL(message=str(exc)) from None
+
+ if self.is_absolute_url:
+ # We don't want to normalize relative URLs, since doing so
+ # removes any leading `../` portion.
+ self._uri_reference = self._uri_reference.normalize()
+ elif isinstance(url, URL):
+ self._uri_reference = url._uri_reference
+ else:
+ raise TypeError(
+ f"Invalid type for url. Expected str or httpx.URL, got {type(url)}: {url!r}"
+ )
+
+ # Perform port normalization, following the WHATWG spec for default ports.
+ #
+ # See:
+ # * https://tools.ietf.org/html/rfc3986#section-3.2.3
+ # * https://url.spec.whatwg.org/#url-miscellaneous
+ # * https://url.spec.whatwg.org/#scheme-state
+ default_port = {
+ "ftp": ":21",
+ "http": ":80",
+ "https": ":443",
+ "ws": ":80",
+ "wss": ":443",
+ }.get(self._uri_reference.scheme, "")
+ authority = self._uri_reference.authority or ""
+ if default_port and authority.endswith(default_port):
+ authority = authority[: -len(default_port)]
+ self._uri_reference = self._uri_reference.copy_with(authority=authority)
+
+ if kwargs:
+ self._uri_reference = self.copy_with(**kwargs)._uri_reference
+
+ @property
+ def scheme(self) -> str:
+ """
+ The URL scheme, such as "http", "https".
+ Always normalised to lowercase.
+ """
+ return self._uri_reference.scheme or ""
+
+ @property
+ def raw_scheme(self) -> bytes:
+ """
+ The raw bytes representation of the URL scheme, such as b"http", b"https".
+ Always normalised to lowercase.
+ """
+ return self.scheme.encode("ascii")
+
+ @property
+ def userinfo(self) -> bytes:
+ """
+ The URL userinfo as a raw bytestring.
+ For example: b"jo%40email.com:a%20secret".
+ """
+ userinfo = self._uri_reference.userinfo or ""
+ return userinfo.encode("ascii")
+
+ @property
+ def username(self) -> str:
+ """
+ The URL username as a string, with URL decoding applied.
+ For example: "jo@email.com"
+ """
+ userinfo = self._uri_reference.userinfo or ""
+ return unquote(userinfo.partition(":")[0])
+
+ @property
+ def password(self) -> str:
+ """
+ The URL password as a string, with URL decoding applied.
+ For example: "a secret"
+ """
+ userinfo = self._uri_reference.userinfo or ""
+ return unquote(userinfo.partition(":")[2])
+
+ @property
+ def host(self) -> str:
+ """
+ The URL host as a string.
+ Always normalized to lowercase, with IDNA hosts decoded into unicode.
+
+ Examples:
+
+ url = httpx.URL("http://www.EXAMPLE.org")
+ assert url.host == "www.example.org"
+
+ url = httpx.URL("http://中国.icom.museum")
+ assert url.host == "中国.icom.museum"
+
+ url = httpx.URL("http://xn--fiqs8s.icom.museum")
+ assert url.host == "中国.icom.museum"
+
+ url = httpx.URL("https://[::ffff:192.168.0.1]")
+ assert url.host == "::ffff:192.168.0.1"
+ """
+ host: str = self._uri_reference.host or ""
+
+ if host and ":" in host and host[0] == "[":
+ # it's an IPv6 address
+ host = host.lstrip("[").rstrip("]")
+
+ if host.startswith("xn--"):
+ host = idna.decode(host)
+
+ return host
+
+ @property
+ def raw_host(self) -> bytes:
+ """
+ The raw bytes representation of the URL host.
+ Always normalized to lowercase, and IDNA encoded.
+
+ Examples:
+
+ url = httpx.URL("http://www.EXAMPLE.org")
+ assert url.raw_host == b"www.example.org"
+
+ url = httpx.URL("http://中国.icom.museum")
+ assert url.raw_host == b"xn--fiqs8s.icom.museum"
+
+ url = httpx.URL("http://xn--fiqs8s.icom.museum")
+ assert url.raw_host == b"xn--fiqs8s.icom.museum"
+
+ url = httpx.URL("https://[::ffff:192.168.0.1]")
+ assert url.raw_host == b"::ffff:192.168.0.1"
+ """
+ host: str = self._uri_reference.host or ""
+
+ if host and ":" in host and host[0] == "[":
+ # it's an IPv6 address
+ host = host.lstrip("[").rstrip("]")
+
+ return host.encode("ascii")
+
+ @property
+ def port(self) -> typing.Optional[int]:
+ """
+ The URL port as an integer.
+
+ Note that the URL class performs port normalization as per the WHATWG spec.
+ Default ports for "http", "https", "ws", "wss", and "ftp" schemes are always
+ treated as `None`.
+
+ For example:
+
+ assert httpx.URL("http://www.example.com") == httpx.URL("http://www.example.com:80")
+ assert httpx.URL("http://www.example.com:80").port is None
+ """
+ port = self._uri_reference.port
+ return int(port) if port else None
+
+ @property
+ def netloc(self) -> bytes:
+ """
+ Either `<host>` or `<host>:<port>` as bytes.
+ Always normalized to lowercase, and IDNA encoded.
+
+ This property may be used for generating the value of a request
+ "Host" header.
+ """
+ host = self._uri_reference.host or ""
+ port = self._uri_reference.port
+ netloc = host.encode("ascii")
+ if port:
+ netloc = netloc + b":" + port.encode("ascii")
+ return netloc
+
+ @property
+ def path(self) -> str:
+ """
+ The URL path as a string. Excluding the query string, and URL decoded.
+
+ For example:
+
+ url = httpx.URL("https://example.com/pa%20th")
+ assert url.path == "/pa th"
+ """
+ path = self._uri_reference.path or "/"
+ return unquote(path)
+
+ @property
+ def query(self) -> bytes:
+ """
+ The URL query string, as raw bytes, excluding the leading b"?".
+
+ This is necessarily a bytewise interface, because we cannot
+ perform URL decoding of this representation until we've parsed
+ the keys and values into a QueryParams instance.
+
+ For example:
+
+ url = httpx.URL("https://example.com/?filter=some%20search%20terms")
+ assert url.query == b"filter=some%20search%20terms"
+ """
+ query = self._uri_reference.query or ""
+ return query.encode("ascii")
+
+ @property
+ def params(self) -> "QueryParams":
+ """
+ The URL query parameters, neatly parsed and packaged into an immutable
+ multidict representation.
+ """
+ return QueryParams(self._uri_reference.query)
+
+ @property
+ def raw_path(self) -> bytes:
+ """
+ The complete URL path and query string as raw bytes.
+ Used as the target when constructing HTTP requests.
+
+ For example:
+
+ GET /users?search=some%20text HTTP/1.1
+ Host: www.example.org
+ Connection: close
+ """
+ path = self._uri_reference.path or "/"
+ if self._uri_reference.query is not None:
+ path += "?" + self._uri_reference.query
+ return path.encode("ascii")
+
+ @property
+ def fragment(self) -> str:
+ """
+ The URL fragments, as used in HTML anchors.
+ As a string, without the leading '#'.
+ """
+ return unquote(self._uri_reference.fragment or "")
+
+ @property
+ def raw(self) -> RawURL:
+ """
+ The URL in the raw representation used by the low level
+ transport API. See `BaseTransport.handle_request`.
+
+ Provides the (scheme, host, port, target) for the outgoing request.
+ """
+ return (
+ self.raw_scheme,
+ self.raw_host,
+ self.port,
+ self.raw_path,
+ )
+
+ @property
+ def is_absolute_url(self) -> bool:
+ """
+ Return `True` for absolute URLs such as 'http://example.com/path',
+ and `False` for relative URLs such as '/path'.
+ """
+ # We don't use `.is_absolute` from `rfc3986` because it treats
+ # URLs with a fragment portion as not absolute.
+ # What we actually care about is if the URL provides
+ # a scheme and hostname to which connections should be made.
+ return bool(self._uri_reference.scheme and self._uri_reference.host)
+
+ @property
+ def is_relative_url(self) -> bool:
+ """
+ Return `False` for absolute URLs such as 'http://example.com/path',
+ and `True` for relative URLs such as '/path'.
+ """
+ return not self.is_absolute_url
+
+ def copy_with(self, **kwargs: typing.Any) -> "URL":
+ """
+ Copy this URL, returning a new URL with some components altered.
+ Accepts the same set of parameters as the components that are made
+ available via properties on the `URL` class.
+
+ For example:
+
+ url = httpx.URL("https://www.example.com").copy_with(username="jo@gmail.com", password="a secret")
+ assert url == "https://jo%40email.com:a%20secret@www.example.com"
+ """
+ allowed = {
+ "scheme": str,
+ "username": str,
+ "password": str,
+ "userinfo": bytes,
+ "host": str,
+ "port": int,
+ "netloc": bytes,
+ "path": str,
+ "query": bytes,
+ "raw_path": bytes,
+ "fragment": str,
+ "params": object,
+ }
+
+ # Step 1
+ # ======
+ #
+ # Perform type checking for all supported keyword arguments.
+ for key, value in kwargs.items():
+ if key not in allowed:
+ message = f"{key!r} is an invalid keyword argument for copy_with()"
+ raise TypeError(message)
+ if value is not None and not isinstance(value, allowed[key]):
+ expected = allowed[key].__name__
+ seen = type(value).__name__
+ message = f"Argument {key!r} must be {expected} but got {seen}"
+ raise TypeError(message)
+
+ # Step 2
+ # ======
+ #
+ # Consolidate "username", "password", "userinfo", "host", "port" and "netloc"
+ # into a single "authority" keyword, for `rfc3986`.
+ if "username" in kwargs or "password" in kwargs:
+ # Consolidate "username" and "password" into "userinfo".
+ username = quote(kwargs.pop("username", self.username) or "")
+ password = quote(kwargs.pop("password", self.password) or "")
+ userinfo = f"{username}:{password}" if password else username
+ kwargs["userinfo"] = userinfo.encode("ascii")
+
+ if "host" in kwargs or "port" in kwargs:
+ # Consolidate "host" and "port" into "netloc".
+ host = kwargs.pop("host", self.host) or ""
+ port = kwargs.pop("port", self.port)
+
+ if host and ":" in host and host[0] != "[":
+ # IPv6 addresses need to be escaped within square brackets.
+ host = f"[{host}]"
+
+ kwargs["netloc"] = (
+ f"{host}:{port}".encode("ascii")
+ if port is not None
+ else host.encode("ascii")
+ )
+
+ if "userinfo" in kwargs or "netloc" in kwargs:
+ # Consolidate "userinfo" and "netloc" into authority.
+ userinfo = (kwargs.pop("userinfo", self.userinfo) or b"").decode("ascii")
+ netloc = (kwargs.pop("netloc", self.netloc) or b"").decode("ascii")
+ authority = f"{userinfo}@{netloc}" if userinfo else netloc
+ kwargs["authority"] = authority
+
+ # Step 3
+ # ======
+ #
+ # Wrangle any "path", "query", "raw_path" and "params" keywords into
+ # "query" and "path" keywords for `rfc3986`.
+ if "raw_path" in kwargs:
+ # If "raw_path" is included, then split it into "path" and "query" components.
+ raw_path = kwargs.pop("raw_path") or b""
+ path, has_query, query = raw_path.decode("ascii").partition("?")
+ kwargs["path"] = path
+ kwargs["query"] = query if has_query else None
+
+ else:
+ if kwargs.get("path") is not None:
+ # Ensure `kwargs["path"] = <url quoted str>` for `rfc3986`.
+ kwargs["path"] = quote(kwargs["path"])
+
+ if kwargs.get("query") is not None:
+ # Ensure `kwargs["query"] = <str>` for `rfc3986`.
+ #
+ # Note that `.copy_with(query=None)` and `.copy_with(query=b"")`
+ # are subtly different. The `None` style will not include an empty
+ # trailing "?" character.
+ kwargs["query"] = kwargs["query"].decode("ascii")
+
+ if "params" in kwargs:
+ # Replace any "params" keyword with the raw "query" instead.
+ #
+ # Ensure that empty params use `kwargs["query"] = None` rather
+ # than `kwargs["query"] = ""`, so that generated URLs do not
+ # include an empty trailing "?".
+ params = kwargs.pop("params")
+ kwargs["query"] = None if not params else str(QueryParams(params))
+
+ # Step 4
+ # ======
+ #
+ # Ensure any fragment component is quoted.
+ if kwargs.get("fragment") is not None:
+ kwargs["fragment"] = quote(kwargs["fragment"])
+
+ # Step 5
+ # ======
+ #
+ # At this point kwargs may include keys for "scheme", "authority", "path",
+ # "query" and "fragment". Together these constitute the entire URL.
+ #
+ # See https://tools.ietf.org/html/rfc3986#section-3
+ #
+ # foo://example.com:8042/over/there?name=ferret#nose
+ # \_/ \______________/\_________/ \_________/ \__/
+ # | | | | |
+ # scheme authority path query fragment
+ new_url = URL(self)
+ new_url._uri_reference = self._uri_reference.copy_with(**kwargs)
+ if new_url.is_absolute_url:
+ new_url._uri_reference = new_url._uri_reference.normalize()
+ return URL(new_url)
+
+ def copy_set_param(self, key: str, value: typing.Any = None) -> "URL":
+ return self.copy_with(params=self.params.set(key, value))
+
+ def copy_add_param(self, key: str, value: typing.Any = None) -> "URL":
+ return self.copy_with(params=self.params.add(key, value))
+
+ def copy_remove_param(self, key: str) -> "URL":
+ return self.copy_with(params=self.params.remove(key))
+
+ def copy_merge_params(self, params: QueryParamTypes) -> "URL":
+ return self.copy_with(params=self.params.merge(params))
+
+ def join(self, url: URLTypes) -> "URL":
+ """
+ Return an absolute URL, using this URL as the base.
+
+ Eg.
+
+ url = httpx.URL("https://www.example.com/test")
+ url = url.join("/new/path")
+ assert url == "https://www.example.com/new/path"
+ """
+ if self.is_relative_url:
+ # Workaround to handle relative URLs, which otherwise raise
+ # rfc3986.exceptions.ResolutionError when used as an argument
+ # in `.resolve_with`.
+ return (
+ self.copy_with(scheme="http", host="example.com")
+ .join(url)
+ .copy_with(scheme=None, host=None)
+ )
+
+ # We drop any fragment portion, because RFC 3986 strictly
+ # treats URLs with a fragment portion as not being absolute URLs.
+ base_uri = self._uri_reference.copy_with(fragment=None)
+ relative_url = URL(url)
+ return URL(relative_url._uri_reference.resolve_with(base_uri).unsplit())
+
+ def __hash__(self) -> int:
+ return hash(str(self))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, (URL, str)) and str(self) == str(URL(other))
+
+ def __str__(self) -> str:
+ return self._uri_reference.unsplit()
+
+ def __repr__(self) -> str:
+ class_name = self.__class__.__name__
+ url_str = str(self)
+ if self._uri_reference.userinfo:
+ # Mask any password component in the URL representation, to lower the
+ # risk of unintended leakage, such as in debug information and logging.
+ username = quote(self.username)
+ url_str = (
+ rfc3986.urlparse(url_str)
+ .copy_with(userinfo=f"{username}:[secure]")
+ .unsplit()
+ )
+ return f"{class_name}({url_str!r})"
+
+
+class QueryParams(typing.Mapping[str, str]):
+ """
+ URL query parameters, as a multi-dict.
+ """
+
+ def __init__(self, *args: QueryParamTypes, **kwargs: typing.Any) -> None:
+ assert len(args) < 2, "Too many arguments."
+ assert not (args and kwargs), "Cannot mix named and unnamed arguments."
+
+ value = args[0] if args else kwargs
+
+ items: typing.Sequence[typing.Tuple[str, PrimitiveData]]
+ if value is None or isinstance(value, (str, bytes)):
+ value = value.decode("ascii") if isinstance(value, bytes) else value
+ self._dict = parse_qs(value)
+ elif isinstance(value, QueryParams):
+ self._dict = {k: list(v) for k, v in value._dict.items()}
+ else:
+ dict_value: typing.Dict[typing.Any, typing.List[typing.Any]] = {}
+ if isinstance(value, (list, tuple)):
+ # Convert list inputs like:
+ # [("a", "123"), ("a", "456"), ("b", "789")]
+ # To a dict representation, like:
+ # {"a": ["123", "456"], "b": ["789"]}
+ for item in value:
+ dict_value.setdefault(item[0], []).append(item[1])
+ else:
+ # Convert dict inputs like:
+ # {"a": "123", "b": ["456", "789"]}
+ # To dict inputs where values are always lists, like:
+ # {"a": ["123"], "b": ["456", "789"]}
+ dict_value = {
+ k: list(v) if isinstance(v, (list, tuple)) else [v]
+ for k, v in value.items()
+ }
+
+ # Ensure that keys and values are neatly coerced to strings.
+ # We coerce values `True` and `False` to JSON-like "true" and "false"
+ # representations, and coerce `None` values to the empty string.
+ self._dict = {
+ str(k): [primitive_value_to_str(item) for item in v]
+ for k, v in dict_value.items()
+ }
+
+ def keys(self) -> typing.KeysView:
+ """
+ Return all the keys in the query params.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123&a=456&b=789")
+ assert list(q.keys()) == ["a", "b"]
+ """
+ return self._dict.keys()
+
+ def values(self) -> typing.ValuesView:
+ """
+ Return all the values in the query params. If a key occurs more than once
+ only the first item for that key is returned.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123&a=456&b=789")
+ assert list(q.values()) == ["123", "789"]
+ """
+ return {k: v[0] for k, v in self._dict.items()}.values()
+
+ def items(self) -> typing.ItemsView:
+ """
+ Return all items in the query params. If a key occurs more than once
+ only the first item for that key is returned.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123&a=456&b=789")
+ assert list(q.items()) == [("a", "123"), ("b", "789")]
+ """
+ return {k: v[0] for k, v in self._dict.items()}.items()
+
+ def multi_items(self) -> typing.List[typing.Tuple[str, str]]:
+ """
+ Return all items in the query params. Allow duplicate keys to occur.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123&a=456&b=789")
+ assert list(q.multi_items()) == [("a", "123"), ("a", "456"), ("b", "789")]
+ """
+ multi_items: typing.List[typing.Tuple[str, str]] = []
+ for k, v in self._dict.items():
+ multi_items.extend([(k, i) for i in v])
+ return multi_items
+
+ def get(self, key: typing.Any, default: typing.Any = None) -> typing.Any:
+ """
+ Get a value from the query param for a given key. If the key occurs
+ more than once, then only the first value is returned.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123&a=456&b=789")
+ assert q.get("a") == "123"
+ """
+ if key in self._dict:
+ return self._dict[str(key)][0]
+ return default
+
+ def get_list(self, key: str) -> typing.List[str]:
+ """
+ Get all values from the query param for a given key.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123&a=456&b=789")
+ assert q.get_list("a") == ["123", "456"]
+ """
+ return list(self._dict.get(str(key), []))
+
+ def set(self, key: str, value: typing.Any = None) -> "QueryParams":
+ """
+ Return a new QueryParams instance, setting the value of a key.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123")
+ q = q.set("a", "456")
+ assert q == httpx.QueryParams("a=456")
+ """
+ q = QueryParams()
+ q._dict = dict(self._dict)
+ q._dict[str(key)] = [primitive_value_to_str(value)]
+ return q
+
+ def add(self, key: str, value: typing.Any = None) -> "QueryParams":
+ """
+ Return a new QueryParams instance, setting or appending the value of a key.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123")
+ q = q.add("a", "456")
+ assert q == httpx.QueryParams("a=123&a=456")
+ """
+ q = QueryParams()
+ q._dict = dict(self._dict)
+ q._dict[str(key)] = q.get_list(key) + [primitive_value_to_str(value)]
+ return q
+
+ def remove(self, key: str) -> "QueryParams":
+ """
+ Return a new QueryParams instance, removing the value of a key.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123")
+ q = q.remove("a")
+ assert q == httpx.QueryParams("")
+ """
+ q = QueryParams()
+ q._dict = dict(self._dict)
+ q._dict.pop(str(key), None)
+ return q
+
+ def merge(self, params: QueryParamTypes = None) -> "QueryParams":
+ """
+ Return a new QueryParams instance, updated with.
+
+ Usage:
+
+ q = httpx.QueryParams("a=123")
+ q = q.merge({"b": "456"})
+ assert q == httpx.QueryParams("a=123&b=456")
+
+ q = httpx.QueryParams("a=123")
+ q = q.merge({"a": "456", "b": "789"})
+ assert q == httpx.QueryParams("a=456&b=789")
+ """
+ q = QueryParams(params)
+ q._dict = {**self._dict, **q._dict}
+ return q
+
+ def __getitem__(self, key: typing.Any) -> str:
+ return self._dict[key][0]
+
+ def __contains__(self, key: typing.Any) -> bool:
+ return key in self._dict
+
+ def __iter__(self) -> typing.Iterator[typing.Any]:
+ return iter(self.keys())
+
+ def __len__(self) -> int:
+ return len(self._dict)
+
+ def __bool__(self) -> bool:
+ return bool(self._dict)
+
+ def __hash__(self) -> int:
+ return hash(str(self))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ if not isinstance(other, self.__class__):
+ return False
+ return sorted(self.multi_items()) == sorted(other.multi_items())
+
+ def __str__(self) -> str:
+ return urlencode(self.multi_items())
+
+ def __repr__(self) -> str:
+ class_name = self.__class__.__name__
+ query_string = str(self)
+ return f"{class_name}({query_string!r})"
+
+ def update(self, params: QueryParamTypes = None) -> None:
+ raise RuntimeError(
+ "QueryParams are immutable since 0.18.0. "
+ "Use `q = q.merge(...)` to create an updated copy."
+ )
+
+ def __setitem__(self, key: str, value: str) -> None:
+ raise RuntimeError(
+ "QueryParams are immutable since 0.18.0. "
+ "Use `q = q.set(key, value)` to create an updated copy."
+ )