File asgiref-3.4.1.0+git.c897542.obscpio of Package python-asgiref
07070100000000000041ED00000000000000000000000260DDEA6C00000000000000000000000000000000000000000000002400000000asgiref-3.4.1.0+git.c897542/.github07070100000001000041ED00000000000000000000000260DDEA6C00000000000000000000000000000000000000000000002E00000000asgiref-3.4.1.0+git.c897542/.github/workflows07070100000002000081A400000000000000000000000160DDEA6C00000454000000000000000000000000000000000000003800000000asgiref-3.4.1.0+git.c897542/.github/workflows/tests.ymlname: Tests
on:
push:
branches:
- main
pull_request:
jobs:
tests:
name: Python ${{ matrix.python-version }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version:
- 3.6
- 3.7
- 3.8
- 3.9
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
python -m pip install --upgrade tox tox-py
- name: Run tox targets for ${{ matrix.python-version }}
run: tox --py current
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip tox
- name: Run lint
run: tox -e qa
07070100000003000081A400000000000000000000000160DDEA6C00000071000000000000000000000000000000000000002700000000asgiref-3.4.1.0+git.c897542/.gitignore*.egg-info
dist/
build/
_build/
__pycache__/
*.pyc
.tox/
*~
.cache
.eggs
.python-version
.pytest_cache/
.vscode/
07070100000004000081A400000000000000000000000160DDEA6C000002EF000000000000000000000000000000000000003400000000asgiref-3.4.1.0+git.c897542/.pre-commit-config.yamlrepos:
- repo: https://github.com/asottile/pyupgrade
rev: v2.9.0
hooks:
- id: pyupgrade
args: ["--py36-plus"]
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
args: ["--target-version=py36"]
- repo: https://github.com/pycqa/isort
rev: 5.7.0
hooks:
- id: isort
args: ["--profile=black"]
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.4
hooks:
- id: flake8
- repo: https://github.com/asottile/yesqa
rev: v1.2.2
hooks:
- id: yesqa
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.4.0
hooks:
- id: check-merge-conflict
- id: check-toml
- id: check-yaml
- id: mixed-line-ending
07070100000005000081A400000000000000000000000160DDEA6C00001E88000000000000000000000000000000000000002A00000000asgiref-3.4.1.0+git.c897542/CHANGELOG.txt3.4.1 (2021-07-01)
------------------
* Fixed an issue with the deadlock detection where it had false positives
during exception handling.
3.4.0 (2021-06-27)
------------------
* Calling sync_to_async directly from inside itself (which causes a deadlock
when in the default, thread-sensitive mode) now has deadlock detection.
* asyncio usage has been updated to use the new versions of get_event_loop,
ensure_future, wait and gather, avoiding deprecation warnings in Python 3.10.
Python 3.6 installs continue to use the old versions; this is only for 3.7+
* sync_to_async and async_to_sync now have improved type hints that pass
through the underlying function type correctly.
* All Websocket* types are now spelled WebSocket, to match our specs and the
official spelling. The old names will work until release 3.5.0, but will
raise deprecation warnings.
* The typing for WebSocketScope and HTTPScope's `extensions` key has been
fixed.
3.3.4 (2021-04-06)
------------------
* The async_to_sync type error is now a warning due the high false negative
rate when trying to detect coroutine-returning callables in Python.
3.3.3 (2021-04-06)
------------------
* The sync conversion functions now correctly detect functools.partial and other
wrappers around async functions on earlier Python releases.
3.3.2 (2021-04-05)
------------------
* SyncToAsync now takes an optional "executor" argument if you want to supply
your own executor rather than using the built-in one.
* async_to_sync and sync_to_async now check their arguments are functions of
the correct type.
* Raising CancelledError inside a SyncToAsync function no longer stops a future
call from functioning.
* ThreadSensitive now provides context hooks/override options so it can be
made to be sensitive in a unit smaller than threads (e.g. per request)
* Drop Python 3.5 support.
* Add type annotations.
3.3.1 (2020-11-09)
------------------
* Updated StatelessServer to use ASGI v3 single-callable applications.
3.3.0 (2020-10-09)
------------------
* sync_to_async now defaults to thread-sensitive mode being on
* async_to_sync now works inside of forked processes
* WsgiToAsgi now correctly clamps its response body when Content-Length is set
3.2.10 (2020-08-18)
-------------------
* Fixed bugs due to bad WeakRef handling introduced in 3.2.8
3.2.9 (2020-06-16)
------------------
* Fixed regression with exception handling in 3.2.8 related to the contextvars fix.
3.2.8 (2020-06-15)
------------------
* Fixed small memory leak in local.Local
* contextvars are now persisted through AsyncToSync
3.2.7 (2020-03-24)
------------------
* Bug fixed in local.Local where deleted Locals would occasionally inherit
their storage into new Locals due to memory reuse.
3.2.6 (2020-03-23)
------------------
* local.Local now works in all threading situations, no longer requires
periodic garbage collection, and works with libraries that monkeypatch
threading (like gevent)
3.2.5 (2020-03-11)
------------------
* __self__ is now preserved on methods by async_to_sync
3.2.4 (2020-03-10)
------------------
* Pending tasks/async generators are now cancelled when async_to_sync exits
* Contextvars now propagate changes both ways through sync_to_async
* sync_to_async now preserves attributes on functions it wraps
3.2.3 (2019-10-23)
------------------
* Added support and testing for Python 3.8.
3.2.2 (2019-08-29)
------------------
* WsgiToAsgi maps multi-part request bodies into a single WSGI input file
* WsgiToAsgi passes the `root_path` scope as SCRIPT_NAME
* WsgiToAsgi now checks the scope type to handle `lifespan` better
* WsgiToAsgi now passes the server port as a string, like WSGI
* SyncToAsync values are now identified as coroutine functions by asyncio
* SyncToAsync now handles __self__ correctly for methods
3.2.1 (2019-08-05)
------------------
* sys.exc_info() is now propagated across thread boundaries
3.2.0 (2019-07-29)
------------------
* New "thread_sensitive" argument to SyncToAsync allows for pinning of code into
the same thread as other thread_sensitive code.
* Test collection on Python 3.7 fixed
3.1.4 (2019-07-07)
------------------
* Fixed an incompatibility with Python 3.5 introduced in the last release.
3.1.3 (2019-07-05)
------------------
* async_timeout has been removed as a dependency, so there are now no required
dependencies.
* The WSGI adapter now sets ``REMOTE_ADDR`` from the ASGI ``client``.
3.1.2 (2019-04-17)
------------------
* New thread_critical argument to Local to tell it to not inherit contexts
across threads/tasks.
* Local now inherits across any number of sync_to_async to async_to_sync calls
nested inside each other
3.1.1 (2019-04-13)
------------------
* Local now cleans up storage of old threads and tasks to prevent a memory leak.
3.1.0 (2019-04-13)
------------------
* Added ``asgiref.local`` module to provide threading.local drop-in replacement.
3.0.0 (2019-03-20)
------------------
* Updated to match new ASGI 3.0 spec
* Compatibility library added that allows adapting ASGI 2 apps into ASGI 3 apps
losslessly
2.3.2 (2018-05-23)
------------------
* Packaging fix to allow old async_timeout dependencies (2.0 as well as 3.0)
2.3.1 (2018-05-23)
------------------
* WSGI-to-ASGI adapter now works with empty bodies in responses
* Update async-timeout dependency
2.3.0 (2018-04-11)
------------------
* ApplicationCommunicator now has a receive_nothing() test available
2.2.0 (2018-03-06)
------------------
* Cancelled tasks now correctly cascade-cancel their children
* Communicator.wait() no longer re-raises CancelledError from inner coroutines
2.1.6 (2018-02-19)
------------------
* async_to_sync now works inside of threads (but is still not allowed in threads
that have an active event loop)
2.1.5 (2018-02-14)
------------------
* Fixed issues with async_to_sync not setting the event loop correctly
* Stop async_to_sync being called from threads with an active event loop
2.1.4 (2018-02-07)
------------------
* Values are now correctly returned from sync_to_async and async_to_sync
* ASGI_THREADS environment variable now works correctly
2.1.3 (2018-02-04)
------------------
* Add an ApplicationCommunicator.wait() method to allow you to wait for an
application instance to exit before seeing what it did.
2.1.2 (2018-02-03)
------------------
* Allow AsyncToSync to work if called from a non-async-wrapped sync context.
2.1.1 (2018-02-02)
------------------
* Allow AsyncToSync constructor to be called inside SyncToAsync.
2.1.0 (2018-01-19)
------------------
* Add `asgiref.testing` module with ApplicationCommunicator testing helper
2.0.1 (2017-11-28)
------------------
* Bugfix release to have HTTP response content message as the correct
"http.response.content" not the older "http.response.chunk".
2.0.0 (2017-11-28)
------------------
* Complete rewrite for new async-based ASGI mechanisms and removal of
channel layers.
1.1.2 (2017-05-16)
-----------------
* Conformance test suite now allows for retries and tests group_send's behaviour with capacity
* valid_channel_names now has a receive parameter
1.1.1 (2017-04-02)
------------------
* Error with sending to multi-process channels with the same message fixed
1.1.0 (2017-04-01)
------------------
* Process-specific channel behaviour has been changed, and the base layer
and conformance suites updated to match.
1.0.1 (2017-03-19)
------------------
* Improved channel and group name validation
* Test rearrangements and improvements
1.0.0 (2016-04-11)
------------------
* `receive_many` is now `receive`
* In-memory layer deepcopies messages so they cannot be mutated post-send
* Better errors for bad channel/group names
07070100000006000081A400000000000000000000000160DDEA6C00000610000000000000000000000000000000000000002400000000asgiref-3.4.1.0+git.c897542/LICENSECopyright (c) Django Software Foundation and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Django nor the names of its contributors may be used
to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
07070100000007000081A400000000000000000000000160DDEA6C00000046000000000000000000000000000000000000002800000000asgiref-3.4.1.0+git.c897542/MANIFEST.ininclude LICENSE
include asgiref/py.typed
recursive-include tests *.py
07070100000008000081A400000000000000000000000160DDEA6C00000086000000000000000000000000000000000000002500000000asgiref-3.4.1.0+git.c897542/Makefile.PHONY: release clean
clean:
rm -r build/ dist/ asgiref.egg-info/
release: clean
python3 -m build
python3 -m twine upload dist/*
07070100000009000081A400000000000000000000000160DDEA6C00001E4C000000000000000000000000000000000000002700000000asgiref-3.4.1.0+git.c897542/README.rstasgiref
=======
.. image:: https://api.travis-ci.org/django/asgiref.svg
:target: https://travis-ci.org/django/asgiref
.. image:: https://img.shields.io/pypi/v/asgiref.svg
:target: https://pypi.python.org/pypi/asgiref
ASGI is a standard for Python asynchronous web apps and servers to communicate
with each other, and positioned as an asynchronous successor to WSGI. You can
read more at https://asgi.readthedocs.io/en/latest/
This package includes ASGI base libraries, such as:
* Sync-to-async and async-to-sync function wrappers, ``asgiref.sync``
* Server base classes, ``asgiref.server``
* A WSGI-to-ASGI adapter, in ``asgiref.wsgi``
Function wrappers
-----------------
These allow you to wrap or decorate async or sync functions to call them from
the other style (so you can call async functions from a synchronous thread,
or vice-versa).
In particular:
* AsyncToSync lets a synchronous subthread stop and wait while the async
function is called on the main thread's event loop, and then control is
returned to the thread when the async function is finished.
* SyncToAsync lets async code call a synchronous function, which is run in
a threadpool and control returned to the async coroutine when the synchronous
function completes.
The idea is to make it easier to call synchronous APIs from async code and
asynchronous APIs from synchronous code so it's easier to transition code from
one style to the other. In the case of Channels, we wrap the (synchronous)
Django view system with SyncToAsync to allow it to run inside the (asynchronous)
ASGI server.
Note that exactly what threads things run in is very specific, and aimed to
keep maximum compatibility with old synchronous code. See
"Synchronous code & Threads" below for a full explanation. By default,
``sync_to_async`` will run all synchronous code in the program in the same
thread for safety reasons; you can disable this for more performance with
``@sync_to_async(thread_sensitive=False)``, but make sure that your code does
not rely on anything bound to threads (like database connections) when you do.
Threadlocal replacement
-----------------------
This is a drop-in replacement for ``threading.local`` that works with both
threads and asyncio Tasks. Even better, it will proxy values through from a
task-local context to a thread-local context when you use ``sync_to_async``
to run things in a threadpool, and vice-versa for ``async_to_sync``.
If you instead want true thread- and task-safety, you can set
``thread_critical`` on the Local object to ensure this instead.
Server base classes
-------------------
Includes a ``StatelessServer`` class which provides all the hard work of
writing a stateless server (as in, does not handle direct incoming sockets
but instead consumes external streams or sockets to work out what is happening).
An example of such a server would be a chatbot server that connects out to
a central chat server and provides a "connection scope" per user chatting to
it. There's only one actual connection, but the server has to separate things
into several scopes for easier writing of the code.
You can see an example of this being used in `frequensgi <https://github.com/andrewgodwin/frequensgi>`_.
WSGI-to-ASGI adapter
--------------------
Allows you to wrap a WSGI application so it appears as a valid ASGI application.
Simply wrap it around your WSGI application like so::
asgi_application = WsgiToAsgi(wsgi_application)
The WSGI application will be run in a synchronous threadpool, and the wrapped
ASGI application will be one that accepts ``http`` class messages.
Please note that not all extended features of WSGI may be supported (such as
file handles for incoming POST bodies).
Dependencies
------------
``asgiref`` requires Python 3.6 or higher.
Contributing
------------
Please refer to the
`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
Testing
'''''''
To run tests, make sure you have installed the ``tests`` extra with the package::
cd asgiref/
pip install -e .[tests]
pytest
Building the documentation
''''''''''''''''''''''''''
The documentation uses `Sphinx <http://www.sphinx-doc.org>`_::
cd asgiref/docs/
pip install sphinx
To build the docs, you can use the default tools::
sphinx-build -b html . _build/html # or `make html`, if you've got make set up
cd _build/html
python -m http.server
...or you can use ``sphinx-autobuild`` to run a server and rebuild/reload
your documentation changes automatically::
pip install sphinx-autobuild
sphinx-autobuild . _build/html
Releasing
'''''''''
To release, first add details to CHANGELOG.txt and update the version number in ``asgiref/__init__.py``.
Then, build and push the packages::
python -m build
twine upload dist/*
rm -r build/ dist/
Implementation Details
----------------------
Synchronous code & threads
''''''''''''''''''''''''''
The ``asgiref.sync`` module provides two wrappers that let you go between
asynchronous and synchronous code at will, while taking care of the rough edges
for you.
Unfortunately, the rough edges are numerous, and the code has to work especially
hard to keep things in the same thread as much as possible. Notably, the
restrictions we are working with are:
* All synchronous code called through ``SyncToAsync`` and marked with
``thread_sensitive`` should run in the same thread as each other (and if the
outer layer of the program is synchronous, the main thread)
* If a thread already has a running async loop, ``AsyncToSync`` can't run things
on that loop if it's blocked on synchronous code that is above you in the
call stack.
The first compromise you get to might be that ``thread_sensitive`` code should
just run in the same thread and not spawn in a sub-thread, fulfilling the first
restriction, but that immediately runs you into the second restriction.
The only real solution is to essentially have a variant of ThreadPoolExecutor
that executes any ``thread_sensitive`` code on the outermost synchronous
thread - either the main thread, or a single spawned subthread.
This means you now have two basic states:
* If the outermost layer of your program is synchronous, then all async code
run through ``AsyncToSync`` will run in a per-call event loop in arbitrary
sub-threads, while all ``thread_sensitive`` code will run in the main thread.
* If the outermost layer of your program is asynchronous, then all async code
runs on the main thread's event loop, and all ``thread_sensitive`` synchronous
code will run in a single shared sub-thread.
Crucially, this means that in both cases there is a thread which is a shared
resource that all ``thread_sensitive`` code must run on, and there is a chance
that this thread is currently blocked on its own ``AsyncToSync`` call. Thus,
``AsyncToSync`` needs to act as an executor for thread code while it's blocking.
The ``CurrentThreadExecutor`` class provides this functionality; rather than
simply waiting on a Future, you can call its ``run_until_future`` method and
it will run submitted code until that Future is done. This means that code
inside the call can then run code on your thread.
Maintenance and Security
------------------------
To report security issues, please contact security@djangoproject.com. For GPG
signatures and more security process information, see
https://docs.djangoproject.com/en/dev/internals/security/.
To report bugs or request new features, please open a new GitHub issue.
This repository is part of the Channels project. For the shepherd and maintenance team, please see the
`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
0707010000000A000041ED00000000000000000000000260DDEA6C00000000000000000000000000000000000000000000002400000000asgiref-3.4.1.0+git.c897542/asgiref0707010000000B000081A400000000000000000000000160DDEA6C00000016000000000000000000000000000000000000003000000000asgiref-3.4.1.0+git.c897542/asgiref/__init__.py__version__ = "3.4.1"
0707010000000C000081A400000000000000000000000160DDEA6C00000A7C000000000000000000000000000000000000002F00000000asgiref-3.4.1.0+git.c897542/asgiref/_pep562.py"""
Backport of PEP 562.
https://pypi.org/search/?q=pep562
Licensed under MIT
Copyright (c) 2018 Isaac Muse <isaacmuse@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
"""
import sys
from typing import Any, Callable, List, Optional
class Pep562:
"""
Backport of PEP 562 <https://pypi.org/search/?q=pep562>.
Wraps the module in a class that exposes the mechanics to override `__dir__` and `__getattr__`.
The given module will be searched for overrides of `__dir__` and `__getattr__` and use them when needed.
"""
def __init__(self, name: str) -> None:
"""Acquire `__getattr__` and `__dir__`, but only replace module for versions less than Python 3.7."""
self._module = sys.modules[name]
self._get_attr = getattr(self._module, "__getattr__", None)
self._get_dir: Optional[Callable[..., List[str]]] = getattr(
self._module, "__dir__", None
)
sys.modules[name] = self # type: ignore[assignment]
def __dir__(self) -> List[str]:
"""Return the overridden `dir` if one was provided, else apply `dir` to the module."""
return self._get_dir() if self._get_dir else dir(self._module)
def __getattr__(self, name: str) -> Any:
"""
Attempt to retrieve the attribute from the module, and if missing, use the overridden function if present.
"""
try:
return getattr(self._module, name)
except AttributeError:
if self._get_attr:
return self._get_attr(name)
raise
def pep562(module_name: str) -> None:
"""Helper function to apply PEP 562."""
if sys.version_info < (3, 7):
Pep562(module_name)
0707010000000D000081A400000000000000000000000160DDEA6C000007E6000000000000000000000000000000000000003500000000asgiref-3.4.1.0+git.c897542/asgiref/compatibility.pyimport asyncio
import inspect
import sys
def is_double_callable(application):
"""
Tests to see if an application is a legacy-style (double-callable) application.
"""
# Look for a hint on the object first
if getattr(application, "_asgi_single_callable", False):
return False
if getattr(application, "_asgi_double_callable", False):
return True
# Uninstanted classes are double-callable
if inspect.isclass(application):
return True
# Instanted classes depend on their __call__
if hasattr(application, "__call__"):
# We only check to see if its __call__ is a coroutine function -
# if it's not, it still might be a coroutine function itself.
if asyncio.iscoroutinefunction(application.__call__):
return False
# Non-classes we just check directly
return not asyncio.iscoroutinefunction(application)
def double_to_single_callable(application):
"""
Transforms a double-callable ASGI application into a single-callable one.
"""
async def new_application(scope, receive, send):
instance = application(scope)
return await instance(receive, send)
return new_application
def guarantee_single_callable(application):
"""
Takes either a single- or double-callable application and always returns it
in single-callable style. Use this to add backwards compatibility for ASGI
2.0 applications to your server/test harness/etc.
"""
if is_double_callable(application):
application = double_to_single_callable(application)
return application
if sys.version_info >= (3, 7):
# these were introduced in 3.7
get_running_loop = asyncio.get_running_loop
run_future = asyncio.run
create_task = asyncio.create_task
else:
# marked as deprecated in 3.10, did not exist before 3.7
get_running_loop = asyncio.get_event_loop
run_future = asyncio.ensure_future
# does nothing, this is fine for <3.7
create_task = lambda task: task
0707010000000E000081A400000000000000000000000160DDEA6C00000AF1000000000000000000000000000000000000003F00000000asgiref-3.4.1.0+git.c897542/asgiref/current_thread_executor.pyimport queue
import threading
from concurrent.futures import Executor, Future
class _WorkItem:
"""
Represents an item needing to be run in the executor.
Copied from ThreadPoolExecutor (but it's private, so we're not going to rely on importing it)
"""
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc'
self = None
else:
self.future.set_result(result)
class CurrentThreadExecutor(Executor):
"""
An Executor that actually runs code in the thread it is instantiated in.
Passed to other threads running async code, so they can run sync code in
the thread they came from.
"""
def __init__(self):
self._work_thread = threading.current_thread()
self._work_queue = queue.Queue()
self._broken = False
def run_until_future(self, future):
"""
Runs the code in the work queue until a result is available from the future.
Should be run from the thread the executor is initialised in.
"""
# Check we're in the right thread
if threading.current_thread() != self._work_thread:
raise RuntimeError(
"You cannot run CurrentThreadExecutor from a different thread"
)
future.add_done_callback(self._work_queue.put)
# Keep getting and running work items until we get the future we're waiting for
# back via the future's done callback.
try:
while True:
# Get a work item and run it
work_item = self._work_queue.get()
if work_item is future:
return
work_item.run()
del work_item
finally:
self._broken = True
def submit(self, fn, *args, **kwargs):
# Check they're not submitting from the same thread
if threading.current_thread() == self._work_thread:
raise RuntimeError(
"You cannot submit onto CurrentThreadExecutor from its own thread"
)
# Check they're not too late or the executor errored
if self._broken:
raise RuntimeError("CurrentThreadExecutor already quit or is broken")
# Add to work queue
f = Future()
work_item = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(work_item)
# Return the future
return f
0707010000000F000081A400000000000000000000000160DDEA6C0000131C000000000000000000000000000000000000002D00000000asgiref-3.4.1.0+git.c897542/asgiref/local.pyimport random
import string
import sys
import threading
import weakref
class Local:
"""
A drop-in replacement for threading.locals that also works with asyncio
Tasks (via the current_task asyncio method), and passes locals through
sync_to_async and async_to_sync.
Specifically:
- Locals work per-coroutine on any thread not spawned using asgiref
- Locals work per-thread on any thread not spawned using asgiref
- Locals are shared with the parent coroutine when using sync_to_async
- Locals are shared with the parent thread when using async_to_sync
(and if that thread was launched using sync_to_async, with its parent
coroutine as well, with this working for indefinite levels of nesting)
Set thread_critical to True to not allow locals to pass from an async Task
to a thread it spawns. This is needed for code that truly needs
thread-safety, as opposed to things used for helpful context (e.g. sqlite
does not like being called from a different thread to the one it is from).
Thread-critical code will still be differentiated per-Task within a thread
as it is expected it does not like concurrent access.
This doesn't use contextvars as it needs to support 3.6. Once it can support
3.7 only, we can then reimplement the storage more nicely.
"""
CLEANUP_INTERVAL = 60 # seconds
def __init__(self, thread_critical: bool = False) -> None:
self._thread_critical = thread_critical
self._thread_lock = threading.RLock()
self._context_refs: "weakref.WeakSet[object]" = weakref.WeakSet()
# Random suffixes stop accidental reuse between different Locals,
# though we try to force deletion as well.
self._attr_name = "_asgiref_local_impl_{}_{}".format(
id(self),
"".join(random.choice(string.ascii_letters) for i in range(8)),
)
def _get_context_id(self):
"""
Get the ID we should use for looking up variables
"""
# Prevent a circular reference
from .sync import AsyncToSync, SyncToAsync
# First, pull the current task if we can
context_id = SyncToAsync.get_current_task()
context_is_async = True
# OK, let's try for a thread ID
if context_id is None:
context_id = threading.current_thread()
context_is_async = False
# If we're thread-critical, we stop here, as we can't share contexts.
if self._thread_critical:
return context_id
# Now, take those and see if we can resolve them through the launch maps
for i in range(sys.getrecursionlimit()):
try:
if context_is_async:
# Tasks have a source thread in AsyncToSync
context_id = AsyncToSync.launch_map[context_id]
context_is_async = False
else:
# Threads have a source task in SyncToAsync
context_id = SyncToAsync.launch_map[context_id]
context_is_async = True
except KeyError:
break
else:
# Catch infinite loops (they happen if you are screwing around
# with AsyncToSync implementations)
raise RuntimeError("Infinite launch_map loops")
return context_id
def _get_storage(self):
context_obj = self._get_context_id()
if not hasattr(context_obj, self._attr_name):
setattr(context_obj, self._attr_name, {})
self._context_refs.add(context_obj)
return getattr(context_obj, self._attr_name)
def __del__(self):
try:
for context_obj in self._context_refs:
try:
delattr(context_obj, self._attr_name)
except AttributeError:
pass
except TypeError:
# WeakSet.__iter__ can crash when interpreter is shutting down due
# to _IterationGuard being None.
pass
def __getattr__(self, key):
with self._thread_lock:
storage = self._get_storage()
if key in storage:
return storage[key]
else:
raise AttributeError(f"{self!r} object has no attribute {key!r}")
def __setattr__(self, key, value):
if key in ("_context_refs", "_thread_critical", "_thread_lock", "_attr_name"):
return super().__setattr__(key, value)
with self._thread_lock:
storage = self._get_storage()
storage[key] = value
def __delattr__(self, key):
with self._thread_lock:
storage = self._get_storage()
if key in storage:
del storage[key]
else:
raise AttributeError(f"{self!r} object has no attribute {key!r}")
07070100000010000081A400000000000000000000000160DDEA6C00000000000000000000000000000000000000000000002D00000000asgiref-3.4.1.0+git.c897542/asgiref/py.typed07070100000011000081A400000000000000000000000160DDEA6C00001782000000000000000000000000000000000000002E00000000asgiref-3.4.1.0+git.c897542/asgiref/server.pyimport asyncio
import logging
import time
import traceback
from .compatibility import get_running_loop, guarantee_single_callable, run_future
logger = logging.getLogger(__name__)
class StatelessServer:
"""
Base server class that handles basic concepts like application instance
creation/pooling, exception handling, and similar, for stateless protocols
(i.e. ones without actual incoming connections to the process)
Your code should override the handle() method, doing whatever it needs to,
and calling get_or_create_application_instance with a unique `scope_id`
and `scope` for the scope it wants to get.
If an application instance is found with the same `scope_id`, you are
given its input queue, otherwise one is made for you with the scope provided
and you are given that fresh new input queue. Either way, you should do
something like:
input_queue = self.get_or_create_application_instance(
"user-123456",
{"type": "testprotocol", "user_id": "123456", "username": "andrew"},
)
input_queue.put_nowait(message)
If you try and create an application instance and there are already
`max_application` instances, the oldest/least recently used one will be
reclaimed and shut down to make space.
Application coroutines that error will be found periodically (every 100ms
by default) and have their exceptions printed to the console. Override
application_exception() if you want to do more when this happens.
If you override run(), make sure you handle things like launching the
application checker.
"""
application_checker_interval = 0.1
def __init__(self, application, max_applications=1000):
# Parameters
self.application = application
self.max_applications = max_applications
# Initialisation
self.application_instances = {}
### Mainloop and handling
def run(self):
"""
Runs the asyncio event loop with our handler loop.
"""
event_loop = get_running_loop()
asyncio.ensure_future(self.application_checker())
try:
event_loop.run_until_complete(self.handle())
except KeyboardInterrupt:
logger.info("Exiting due to Ctrl-C/interrupt")
async def handle(self):
raise NotImplementedError("You must implement handle()")
async def application_send(self, scope, message):
"""
Receives outbound sends from applications and handles them.
"""
raise NotImplementedError("You must implement application_send()")
### Application instance management
def get_or_create_application_instance(self, scope_id, scope):
"""
Creates an application instance and returns its queue.
"""
if scope_id in self.application_instances:
self.application_instances[scope_id]["last_used"] = time.time()
return self.application_instances[scope_id]["input_queue"]
# See if we need to delete an old one
while len(self.application_instances) > self.max_applications:
self.delete_oldest_application_instance()
# Make an instance of the application
input_queue = asyncio.Queue()
application_instance = guarantee_single_callable(self.application)
# Run it, and stash the future for later checking
future = run_future(
application_instance(
scope=scope,
receive=input_queue.get,
send=lambda message: self.application_send(scope, message),
),
)
self.application_instances[scope_id] = {
"input_queue": input_queue,
"future": future,
"scope": scope,
"last_used": time.time(),
}
return input_queue
def delete_oldest_application_instance(self):
"""
Finds and deletes the oldest application instance
"""
oldest_time = min(
details["last_used"] for details in self.application_instances.values()
)
for scope_id, details in self.application_instances.items():
if details["last_used"] == oldest_time:
self.delete_application_instance(scope_id)
# Return to make sure we only delete one in case two have
# the same oldest time
return
def delete_application_instance(self, scope_id):
"""
Removes an application instance (makes sure its task is stopped,
then removes it from the current set)
"""
details = self.application_instances[scope_id]
del self.application_instances[scope_id]
if not details["future"].done():
details["future"].cancel()
async def application_checker(self):
"""
Goes through the set of current application instance Futures and cleans up
any that are done/prints exceptions for any that errored.
"""
while True:
await asyncio.sleep(self.application_checker_interval)
for scope_id, details in list(self.application_instances.items()):
if details["future"].done():
exception = details["future"].exception()
if exception:
await self.application_exception(exception, details)
try:
del self.application_instances[scope_id]
except KeyError:
# Exception handling might have already got here before us. That's fine.
pass
async def application_exception(self, exception, application_details):
"""
Called whenever an application coroutine has an exception.
"""
logging.error(
"Exception inside application: %s\n%s%s",
exception,
"".join(traceback.format_tb(exception.__traceback__)),
f" {exception}",
)
07070100000012000081A400000000000000000000000160DDEA6C00004F56000000000000000000000000000000000000002C00000000asgiref-3.4.1.0+git.c897542/asgiref/sync.pyimport asyncio.coroutines
import functools
import inspect
import os
import sys
import threading
import warnings
import weakref
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Dict, Optional, overload
from .compatibility import get_running_loop
from .current_thread_executor import CurrentThreadExecutor
from .local import Local
if sys.version_info >= (3, 7):
import contextvars
else:
contextvars = None
def _restore_context(context):
# Check for changes in contextvars, and set them to the current
# context for downstream consumers
for cvar in context:
try:
if cvar.get() != context.get(cvar):
cvar.set(context.get(cvar))
except LookupError:
cvar.set(context.get(cvar))
def _iscoroutinefunction_or_partial(func: Any) -> bool:
# Python < 3.8 does not correctly determine partially wrapped
# coroutine functions are coroutine functions, hence the need for
# this to exist. Code taken from CPython.
if sys.version_info >= (3, 8):
return asyncio.iscoroutinefunction(func)
else:
while inspect.ismethod(func):
func = func.__func__
while isinstance(func, functools.partial):
func = func.func
return asyncio.iscoroutinefunction(func)
class ThreadSensitiveContext:
"""Async context manager to manage context for thread sensitive mode
This context manager controls which thread pool executor is used when in
thread sensitive mode. By default, a single thread pool executor is shared
within a process.
In Python 3.7+, the ThreadSensitiveContext() context manager may be used to
specify a thread pool per context.
In Python 3.6, usage of this context manager has no effect.
This context manager is re-entrant, so only the outer-most call to
ThreadSensitiveContext will set the context.
Usage:
>>> import time
>>> async with ThreadSensitiveContext():
... await sync_to_async(time.sleep, 1)()
"""
def __init__(self):
self.token = None
if contextvars:
async def __aenter__(self):
try:
SyncToAsync.thread_sensitive_context.get()
except LookupError:
self.token = SyncToAsync.thread_sensitive_context.set(self)
return self
async def __aexit__(self, exc, value, tb):
if not self.token:
return
executor = SyncToAsync.context_to_thread_executor.pop(self, None)
if executor:
executor.shutdown()
SyncToAsync.thread_sensitive_context.reset(self.token)
else:
async def __aenter__(self):
return self
async def __aexit__(self, exc, value, tb):
pass
class AsyncToSync:
"""
Utility class which turns an awaitable that only works on the thread with
the event loop into a synchronous callable that works in a subthread.
If the call stack contains an async loop, the code runs there.
Otherwise, the code runs in a new loop in a new thread.
Either way, this thread then pauses and waits to run any thread_sensitive
code called from further down the call stack using SyncToAsync, before
finally exiting once the async task returns.
"""
# Maps launched Tasks to the threads that launched them (for locals impl)
launch_map: "Dict[asyncio.Task[object], threading.Thread]" = {}
# Keeps track of which CurrentThreadExecutor to use. This uses an asgiref
# Local, not a threadlocal, so that tasks can work out what their parent used.
executors = Local()
def __init__(self, awaitable, force_new_loop=False):
if not callable(awaitable) or not _iscoroutinefunction_or_partial(awaitable):
# Python does not have very reliable detection of async functions
# (lots of false negatives) so this is just a warning.
warnings.warn("async_to_sync was passed a non-async-marked callable")
self.awaitable = awaitable
try:
self.__self__ = self.awaitable.__self__
except AttributeError:
pass
if force_new_loop:
# They have asked that we always run in a new sub-loop.
self.main_event_loop = None
else:
try:
self.main_event_loop = get_running_loop()
except RuntimeError:
# There's no event loop in this thread. Look for the threadlocal if
# we're inside SyncToAsync
main_event_loop_pid = getattr(
SyncToAsync.threadlocal, "main_event_loop_pid", None
)
# We make sure the parent loop is from the same process - if
# they've forked, this is not going to be valid any more (#194)
if main_event_loop_pid and main_event_loop_pid == os.getpid():
self.main_event_loop = getattr(
SyncToAsync.threadlocal, "main_event_loop", None
)
else:
self.main_event_loop = None
def __call__(self, *args, **kwargs):
# You can't call AsyncToSync from a thread with a running event loop
try:
event_loop = get_running_loop()
except RuntimeError:
pass
else:
if event_loop.is_running():
raise RuntimeError(
"You cannot use AsyncToSync in the same thread as an async event loop - "
"just await the async function directly."
)
if contextvars is not None:
# Wrapping context in list so it can be reassigned from within
# `main_wrap`.
context = [contextvars.copy_context()]
else:
context = None
# Make a future for the return information
call_result = Future()
# Get the source thread
source_thread = threading.current_thread()
# Make a CurrentThreadExecutor we'll use to idle in this thread - we
# need one for every sync frame, even if there's one above us in the
# same thread.
if hasattr(self.executors, "current"):
old_current_executor = self.executors.current
else:
old_current_executor = None
current_executor = CurrentThreadExecutor()
self.executors.current = current_executor
# Use call_soon_threadsafe to schedule a synchronous callback on the
# main event loop's thread if it's there, otherwise make a new loop
# in this thread.
try:
awaitable = self.main_wrap(
args, kwargs, call_result, source_thread, sys.exc_info(), context
)
if not (self.main_event_loop and self.main_event_loop.is_running()):
# Make our own event loop - in a new thread - and run inside that.
loop = asyncio.new_event_loop()
loop_executor = ThreadPoolExecutor(max_workers=1)
loop_future = loop_executor.submit(
self._run_event_loop, loop, awaitable
)
if current_executor:
# Run the CurrentThreadExecutor until the future is done
current_executor.run_until_future(loop_future)
# Wait for future and/or allow for exception propagation
loop_future.result()
else:
# Call it inside the existing loop
self.main_event_loop.call_soon_threadsafe(
self.main_event_loop.create_task, awaitable
)
if current_executor:
# Run the CurrentThreadExecutor until the future is done
current_executor.run_until_future(call_result)
finally:
# Clean up any executor we were running
if hasattr(self.executors, "current"):
del self.executors.current
if old_current_executor:
self.executors.current = old_current_executor
if contextvars is not None:
_restore_context(context[0])
# Wait for results from the future.
return call_result.result()
def _run_event_loop(self, loop, coro):
"""
Runs the given event loop (designed to be called in a thread).
"""
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(coro)
finally:
try:
# mimic asyncio.run() behavior
# cancel unexhausted async generators
if sys.version_info >= (3, 7, 0):
tasks = asyncio.all_tasks(loop)
else:
tasks = asyncio.Task.all_tasks(loop)
for task in tasks:
task.cancel()
async def gather():
await asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(gather())
for task in tasks:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during loop shutdown",
"exception": task.exception(),
"task": task,
}
)
if hasattr(loop, "shutdown_asyncgens"):
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
asyncio.set_event_loop(self.main_event_loop)
def __get__(self, parent, objtype):
"""
Include self for methods
"""
func = functools.partial(self.__call__, parent)
return functools.update_wrapper(func, self.awaitable)
async def main_wrap(
self, args, kwargs, call_result, source_thread, exc_info, context
):
"""
Wraps the awaitable with something that puts the result into the
result/exception future.
"""
if context is not None:
_restore_context(context[0])
current_task = SyncToAsync.get_current_task()
self.launch_map[current_task] = source_thread
try:
# If we have an exception, run the function inside the except block
# after raising it so exc_info is correctly populated.
if exc_info[1]:
try:
raise exc_info[1]
except BaseException:
result = await self.awaitable(*args, **kwargs)
else:
result = await self.awaitable(*args, **kwargs)
except BaseException as e:
call_result.set_exception(e)
else:
call_result.set_result(result)
finally:
del self.launch_map[current_task]
if context is not None:
context[0] = contextvars.copy_context()
class SyncToAsync:
"""
Utility class which turns a synchronous callable into an awaitable that
runs in a threadpool. It also sets a threadlocal inside the thread so
calls to AsyncToSync can escape it.
If thread_sensitive is passed, the code will run in the same thread as any
outer code. This is needed for underlying Python code that is not
threadsafe (for example, code which handles SQLite database connections).
If the outermost program is async (i.e. SyncToAsync is outermost), then
this will be a dedicated single sub-thread that all sync code runs in,
one after the other. If the outermost program is sync (i.e. AsyncToSync is
outermost), this will just be the main thread. This is achieved by idling
with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
rather than just blocking.
If executor is passed in, that will be used instead of the loop's default executor.
In order to pass in an executor, thread_sensitive must be set to False, otherwise
a TypeError will be raised.
"""
# If they've set ASGI_THREADS, update the default asyncio executor for now
if "ASGI_THREADS" in os.environ:
loop = get_running_loop()
loop.set_default_executor(
ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))
)
# Maps launched threads to the coroutines that spawned them
launch_map: "Dict[threading.Thread, asyncio.Task[object]]" = {}
# Storage for main event loop references
threadlocal = threading.local()
# Single-thread executor for thread-sensitive code
single_thread_executor = ThreadPoolExecutor(max_workers=1)
# Maintain a contextvar for the current execution context. Optionally used
# for thread sensitive mode.
if sys.version_info >= (3, 7):
thread_sensitive_context: "contextvars.ContextVar[str]" = (
contextvars.ContextVar("thread_sensitive_context")
)
else:
thread_sensitive_context: None = None
# Contextvar that is used to detect if the single thread executor
# would be awaited on while already being used in the same context
if sys.version_info >= (3, 7):
deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
"deadlock_context"
)
else:
deadlock_context: None = None
# Maintaining a weak reference to the context ensures that thread pools are
# erased once the context goes out of scope. This terminates the thread pool.
context_to_thread_executor: "weakref.WeakKeyDictionary[object, ThreadPoolExecutor]" = (
weakref.WeakKeyDictionary()
)
def __init__(
self,
func: Callable[..., Any],
thread_sensitive: bool = True,
executor: Optional["ThreadPoolExecutor"] = None,
) -> None:
if not callable(func) or _iscoroutinefunction_or_partial(func):
raise TypeError("sync_to_async can only be applied to sync functions.")
self.func = func
functools.update_wrapper(self, func)
self._thread_sensitive = thread_sensitive
self._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
if thread_sensitive and executor is not None:
raise TypeError("executor must not be set when thread_sensitive is True")
self._executor = executor
try:
self.__self__ = func.__self__ # type: ignore
except AttributeError:
pass
async def __call__(self, *args, **kwargs):
loop = get_running_loop()
# Work out what thread to run the code in
if self._thread_sensitive:
if hasattr(AsyncToSync.executors, "current"):
# If we have a parent sync thread above somewhere, use that
executor = AsyncToSync.executors.current
elif self.thread_sensitive_context and self.thread_sensitive_context.get(
None
):
# If we have a way of retrieving the current context, attempt
# to use a per-context thread pool executor
thread_sensitive_context = self.thread_sensitive_context.get()
if thread_sensitive_context in self.context_to_thread_executor:
# Re-use thread executor in current context
executor = self.context_to_thread_executor[thread_sensitive_context]
else:
# Create new thread executor in current context
executor = ThreadPoolExecutor(max_workers=1)
self.context_to_thread_executor[thread_sensitive_context] = executor
elif self.deadlock_context and self.deadlock_context.get(False):
raise RuntimeError(
"Single thread executor already being used, would deadlock"
)
else:
# Otherwise, we run it in a fixed single thread
executor = self.single_thread_executor
if self.deadlock_context:
self.deadlock_context.set(True)
else:
# Use the passed in executor, or the loop's default if it is None
executor = self._executor
if contextvars is not None:
context = contextvars.copy_context()
child = functools.partial(self.func, *args, **kwargs)
func = context.run
args = (child,)
kwargs = {}
else:
func = self.func
try:
# Run the code in the right thread
future = loop.run_in_executor(
executor,
functools.partial(
self.thread_handler,
loop,
self.get_current_task(),
sys.exc_info(),
func,
*args,
**kwargs,
),
)
ret = await asyncio.wait_for(future, timeout=None)
finally:
if contextvars is not None:
_restore_context(context)
if self.deadlock_context:
self.deadlock_context.set(False)
return ret
def __get__(self, parent, objtype):
"""
Include self for methods
"""
return functools.partial(self.__call__, parent)
def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs):
"""
Wraps the sync application with exception handling.
"""
# Set the threadlocal for AsyncToSync
self.threadlocal.main_event_loop = loop
self.threadlocal.main_event_loop_pid = os.getpid()
# Set the task mapping (used for the locals module)
current_thread = threading.current_thread()
if AsyncToSync.launch_map.get(source_task) == current_thread:
# Our parent task was launched from this same thread, so don't make
# a launch map entry - let it shortcut over us! (and stop infinite loops)
parent_set = False
else:
self.launch_map[current_thread] = source_task
parent_set = True
# Run the function
try:
# If we have an exception, run the function inside the except block
# after raising it so exc_info is correctly populated.
if exc_info[1]:
try:
raise exc_info[1]
except BaseException:
return func(*args, **kwargs)
else:
return func(*args, **kwargs)
finally:
# Only delete the launch_map parent if we set it, otherwise it is
# from someone else.
if parent_set:
del self.launch_map[current_thread]
@staticmethod
def get_current_task():
"""
Cross-version implementation of asyncio.current_task()
Returns None if there is no task.
"""
try:
if hasattr(asyncio, "current_task"):
# Python 3.7 and up
return asyncio.current_task()
else:
# Python 3.6
return asyncio.Task.current_task()
except RuntimeError:
return None
# Lowercase aliases (and decorator friendliness)
async_to_sync = AsyncToSync
@overload
def sync_to_async(
func: None = None,
thread_sensitive: bool = True,
executor: Optional["ThreadPoolExecutor"] = None,
) -> Callable[[Callable[..., Any]], SyncToAsync]:
...
@overload
def sync_to_async(
func: Callable[..., Any],
thread_sensitive: bool = True,
executor: Optional["ThreadPoolExecutor"] = None,
) -> SyncToAsync:
...
def sync_to_async(
func=None,
thread_sensitive=True,
executor=None,
):
if func is None:
return lambda f: SyncToAsync(
f,
thread_sensitive=thread_sensitive,
executor=executor,
)
return SyncToAsync(
func,
thread_sensitive=thread_sensitive,
executor=executor,
)
07070100000013000081A400000000000000000000000160DDEA6C00000C2F000000000000000000000000000000000000002F00000000asgiref-3.4.1.0+git.c897542/asgiref/testing.pyimport asyncio
import time
from .compatibility import guarantee_single_callable
from .timeout import timeout as async_timeout
class ApplicationCommunicator:
"""
Runs an ASGI application in a test mode, allowing sending of
messages to it and retrieval of messages it sends.
"""
def __init__(self, application, scope):
self.application = guarantee_single_callable(application)
self.scope = scope
self.input_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self.future = asyncio.ensure_future(
self.application(scope, self.input_queue.get, self.output_queue.put)
)
async def wait(self, timeout=1):
"""
Waits for the application to stop itself and returns any exceptions.
"""
try:
async with async_timeout(timeout):
try:
await self.future
self.future.result()
except asyncio.CancelledError:
pass
finally:
if not self.future.done():
self.future.cancel()
try:
await self.future
except asyncio.CancelledError:
pass
def stop(self, exceptions=True):
if not self.future.done():
self.future.cancel()
elif exceptions:
# Give a chance to raise any exceptions
self.future.result()
def __del__(self):
# Clean up on deletion
try:
self.stop(exceptions=False)
except RuntimeError:
# Event loop already stopped
pass
async def send_input(self, message):
"""
Sends a single message to the application
"""
# Give it the message
await self.input_queue.put(message)
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
return await self.output_queue.get()
except asyncio.TimeoutError as e:
# See if we have another error to raise inside
if self.future.done():
self.future.result()
else:
self.future.cancel()
try:
await self.future
except asyncio.CancelledError:
pass
raise e
async def receive_nothing(self, timeout=0.1, interval=0.01):
"""
Checks that there is no message to receive in the given time.
"""
# `interval` has precedence over `timeout`
start = time.monotonic()
while time.monotonic() - start < timeout:
if not self.output_queue.empty():
return False
await asyncio.sleep(interval)
return self.output_queue.empty()
07070100000014000081A400000000000000000000000160DDEA6C00000F31000000000000000000000000000000000000002F00000000asgiref-3.4.1.0+git.c897542/asgiref/timeout.py# This code is originally sourced from the aio-libs project "async_timeout",
# under the Apache 2.0 license. You may see the original project at
# https://github.com/aio-libs/async-timeout
# It is vendored here to reduce chain-dependencies on this library, and
# modified slightly to remove some features we don't use.
import asyncio
import sys
from types import TracebackType
from typing import Any, Optional, Type
class timeout:
"""timeout context manager.
Useful in cases when you want to apply timeout logic around block
of code or in cases when asyncio.wait_for is not suitable. For example:
>>> with timeout(0.001):
... async with aiohttp.get('https://github.com') as r:
... await r.text()
timeout - value in seconds or None to disable timeout logic
loop - asyncio compatible event loop
"""
def __init__(
self,
timeout: Optional[float],
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
self._timeout = timeout
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._task = None # type: Optional[asyncio.Task[Any]]
self._cancelled = False
self._cancel_handler = None # type: Optional[asyncio.Handle]
self._cancel_at = None # type: Optional[float]
def __enter__(self) -> "timeout":
return self._do_enter()
def __exit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> Optional[bool]:
self._do_exit(exc_type)
return None
async def __aenter__(self) -> "timeout":
return self._do_enter()
async def __aexit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
self._do_exit(exc_type)
@property
def expired(self) -> bool:
return self._cancelled
@property
def remaining(self) -> Optional[float]:
if self._cancel_at is not None:
return max(self._cancel_at - self._loop.time(), 0.0)
else:
return None
def _do_enter(self) -> "timeout":
# Support Tornado 5- without timeout
# Details: https://github.com/python/asyncio/issues/392
if self._timeout is None:
return self
self._task = current_task(self._loop)
if self._task is None:
raise RuntimeError(
"Timeout context manager should be used " "inside a task"
)
if self._timeout <= 0:
self._loop.call_soon(self._cancel_task)
return self
self._cancel_at = self._loop.time() + self._timeout
self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
return self
def _do_exit(self, exc_type: Type[BaseException]) -> None:
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
raise asyncio.TimeoutError
if self._timeout is not None and self._cancel_handler is not None:
self._cancel_handler.cancel()
self._cancel_handler = None
self._task = None
return None
def _cancel_task(self) -> None:
if self._task is not None:
self._task.cancel()
self._cancelled = True
def current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]":
if sys.version_info >= (3, 7):
task = asyncio.current_task(loop=loop)
else:
task = asyncio.Task.current_task(loop=loop)
if task is None:
# this should be removed, tokio must use register_task and family API
fn = getattr(loop, "current_task", None)
if fn is not None:
task = fn()
return task
07070100000015000081A400000000000000000000000160DDEA6C00001A42000000000000000000000000000000000000002E00000000asgiref-3.4.1.0+git.c897542/asgiref/typing.pyimport sys
import warnings
from typing import (
Any,
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
from asgiref._pep562 import pep562
if sys.version_info >= (3, 8):
from typing import Literal, Protocol, TypedDict
else:
from typing_extensions import Literal, Protocol, TypedDict
__all__ = (
"ASGIVersions",
"HTTPScope",
"WebSocketScope",
"LifespanScope",
"WWWScope",
"Scope",
"HTTPRequestEvent",
"HTTPResponseStartEvent",
"HTTPResponseBodyEvent",
"HTTPServerPushEvent",
"HTTPDisconnectEvent",
"WebSocketConnectEvent",
"WebSocketAcceptEvent",
"WebSocketReceiveEvent",
"WebSocketSendEvent",
"WebSocketResponseStartEvent",
"WebSocketResponseBodyEvent",
"WebSocketDisconnectEvent",
"WebSocketCloseEvent",
"LifespanStartupEvent",
"LifespanShutdownEvent",
"LifespanStartupCompleteEvent",
"LifespanStartupFailedEvent",
"LifespanShutdownCompleteEvent",
"LifespanShutdownFailedEvent",
"ASGIReceiveEvent",
"ASGISendEvent",
"ASGIReceiveCallable",
"ASGISendCallable",
"ASGI2Protocol",
"ASGI2Application",
"ASGI3Application",
"ASGIApplication",
)
class ASGIVersions(TypedDict):
spec_version: str
version: Union[Literal["2.0"], Literal["3.0"]]
class HTTPScope(TypedDict):
type: Literal["http"]
asgi: ASGIVersions
http_version: str
method: str
scheme: str
path: str
raw_path: bytes
query_string: bytes
root_path: str
headers: Iterable[Tuple[bytes, bytes]]
client: Optional[Tuple[str, int]]
server: Optional[Tuple[str, Optional[int]]]
extensions: Optional[Dict[str, Dict[object, object]]]
class WebSocketScope(TypedDict):
type: Literal["websocket"]
asgi: ASGIVersions
http_version: str
scheme: str
path: str
raw_path: bytes
query_string: bytes
root_path: str
headers: Iterable[Tuple[bytes, bytes]]
client: Optional[Tuple[str, int]]
server: Optional[Tuple[str, Optional[int]]]
subprotocols: Iterable[str]
extensions: Optional[Dict[str, Dict[object, object]]]
class LifespanScope(TypedDict):
type: Literal["lifespan"]
asgi: ASGIVersions
WWWScope = Union[HTTPScope, WebSocketScope]
Scope = Union[HTTPScope, WebSocketScope, LifespanScope]
class HTTPRequestEvent(TypedDict):
type: Literal["http.request"]
body: bytes
more_body: bool
class HTTPResponseStartEvent(TypedDict):
type: Literal["http.response.start"]
status: int
headers: Iterable[Tuple[bytes, bytes]]
class HTTPResponseBodyEvent(TypedDict):
type: Literal["http.response.body"]
body: bytes
more_body: bool
class HTTPServerPushEvent(TypedDict):
type: Literal["http.response.push"]
path: str
headers: Iterable[Tuple[bytes, bytes]]
class HTTPDisconnectEvent(TypedDict):
type: Literal["http.disconnect"]
class WebSocketConnectEvent(TypedDict):
type: Literal["websocket.connect"]
class WebSocketAcceptEvent(TypedDict):
type: Literal["websocket.accept"]
subprotocol: Optional[str]
headers: Iterable[Tuple[bytes, bytes]]
class WebSocketReceiveEvent(TypedDict):
type: Literal["websocket.receive"]
bytes: Optional[bytes]
text: Optional[str]
class WebSocketSendEvent(TypedDict):
type: Literal["websocket.send"]
bytes: Optional[bytes]
text: Optional[str]
class WebSocketResponseStartEvent(TypedDict):
type: Literal["websocket.http.response.start"]
status: int
headers: Iterable[Tuple[bytes, bytes]]
class WebSocketResponseBodyEvent(TypedDict):
type: Literal["websocket.http.response.body"]
body: bytes
more_body: bool
class WebSocketDisconnectEvent(TypedDict):
type: Literal["websocket.disconnect"]
code: int
class WebSocketCloseEvent(TypedDict):
type: Literal["websocket.close"]
code: int
reason: Optional[str]
class LifespanStartupEvent(TypedDict):
type: Literal["lifespan.startup"]
class LifespanShutdownEvent(TypedDict):
type: Literal["lifespan.shutdown"]
class LifespanStartupCompleteEvent(TypedDict):
type: Literal["lifespan.startup.complete"]
class LifespanStartupFailedEvent(TypedDict):
type: Literal["lifespan.startup.failed"]
message: str
class LifespanShutdownCompleteEvent(TypedDict):
type: Literal["lifespan.shutdown.complete"]
class LifespanShutdownFailedEvent(TypedDict):
type: Literal["lifespan.shutdown.failed"]
message: str
ASGIReceiveEvent = Union[
HTTPRequestEvent,
HTTPDisconnectEvent,
WebSocketConnectEvent,
WebSocketReceiveEvent,
WebSocketDisconnectEvent,
LifespanStartupEvent,
LifespanShutdownEvent,
]
ASGISendEvent = Union[
HTTPResponseStartEvent,
HTTPResponseBodyEvent,
HTTPServerPushEvent,
HTTPDisconnectEvent,
WebSocketAcceptEvent,
WebSocketSendEvent,
WebSocketResponseStartEvent,
WebSocketResponseBodyEvent,
WebSocketCloseEvent,
LifespanStartupCompleteEvent,
LifespanStartupFailedEvent,
LifespanShutdownCompleteEvent,
LifespanShutdownFailedEvent,
]
ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]]
ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]]
class ASGI2Protocol(Protocol):
def __init__(self, scope: Scope) -> None:
...
async def __call__(
self, receive: ASGIReceiveCallable, send: ASGISendCallable
) -> None:
...
ASGI2Application = Type[ASGI2Protocol]
ASGI3Application = Callable[
[
Scope,
ASGIReceiveCallable,
ASGISendCallable,
],
Awaitable[None],
]
ASGIApplication = Union[ASGI2Application, ASGI3Application]
__deprecated__ = {
"WebsocketConnectEvent": WebSocketConnectEvent,
"WebsocketAcceptEvent": WebSocketAcceptEvent,
"WebsocketReceiveEvent": WebSocketReceiveEvent,
"WebsocketSendEvent": WebSocketSendEvent,
"WebsocketResponseStartEvent": WebSocketResponseStartEvent,
"WebsocketResponseBodyEvent": WebSocketResponseBodyEvent,
"WebsocketDisconnectEvent": WebSocketDisconnectEvent,
"WebsocketCloseEvent": WebSocketCloseEvent,
}
def __getattr__(name: str) -> Any:
deprecated = __deprecated__.get(name)
if deprecated:
stacklevel = 3 if sys.version_info >= (3, 7) else 4
warnings.warn(
f"'{name}' is deprecated. Use '{deprecated.__name__}' instead.",
category=DeprecationWarning,
stacklevel=stacklevel,
)
return deprecated
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
def __dir__() -> List[str]:
return sorted(list(__all__) + list(__deprecated__.keys()))
pep562(__name__)
07070100000016000081A400000000000000000000000160DDEA6C000019AF000000000000000000000000000000000000002C00000000asgiref-3.4.1.0+git.c897542/asgiref/wsgi.pyfrom io import BytesIO
from tempfile import SpooledTemporaryFile
from asgiref.sync import AsyncToSync, sync_to_async
class WsgiToAsgi:
"""
Wraps a WSGI application to make it into an ASGI application.
"""
def __init__(self, wsgi_application):
self.wsgi_application = wsgi_application
async def __call__(self, scope, receive, send):
"""
ASGI application instantiation point.
We return a new WsgiToAsgiInstance here with the WSGI app
and the scope, ready to respond when it is __call__ed.
"""
await WsgiToAsgiInstance(self.wsgi_application)(scope, receive, send)
class WsgiToAsgiInstance:
"""
Per-socket instance of a wrapped WSGI application
"""
def __init__(self, wsgi_application):
self.wsgi_application = wsgi_application
self.response_started = False
self.response_content_length = None
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
raise ValueError("WSGI wrapper received a non-HTTP scope")
self.scope = scope
with SpooledTemporaryFile(max_size=65536) as body:
# Alright, wait for the http.request messages
while True:
message = await receive()
if message["type"] != "http.request":
raise ValueError("WSGI wrapper received a non-HTTP-request message")
body.write(message.get("body", b""))
if not message.get("more_body"):
break
body.seek(0)
# Wrap send so it can be called from the subthread
self.sync_send = AsyncToSync(send)
# Call the WSGI app
await self.run_wsgi_app(body)
def build_environ(self, scope, body):
"""
Builds a scope and request body into a WSGI environ object.
"""
environ = {
"REQUEST_METHOD": scope["method"],
"SCRIPT_NAME": scope.get("root_path", "").encode("utf8").decode("latin1"),
"PATH_INFO": scope["path"].encode("utf8").decode("latin1"),
"QUERY_STRING": scope["query_string"].decode("ascii"),
"SERVER_PROTOCOL": "HTTP/%s" % scope["http_version"],
"wsgi.version": (1, 0),
"wsgi.url_scheme": scope.get("scheme", "http"),
"wsgi.input": body,
"wsgi.errors": BytesIO(),
"wsgi.multithread": True,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
# Get server name and port - required in WSGI, not in ASGI
if "server" in scope:
environ["SERVER_NAME"] = scope["server"][0]
environ["SERVER_PORT"] = str(scope["server"][1])
else:
environ["SERVER_NAME"] = "localhost"
environ["SERVER_PORT"] = "80"
if "client" in scope:
environ["REMOTE_ADDR"] = scope["client"][0]
# Go through headers and make them into environ entries
for name, value in self.scope.get("headers", []):
name = name.decode("latin1")
if name == "content-length":
corrected_name = "CONTENT_LENGTH"
elif name == "content-type":
corrected_name = "CONTENT_TYPE"
else:
corrected_name = "HTTP_%s" % name.upper().replace("-", "_")
# HTTPbis say only ASCII chars are allowed in headers, but we latin1 just in case
value = value.decode("latin1")
if corrected_name in environ:
value = environ[corrected_name] + "," + value
environ[corrected_name] = value
return environ
def start_response(self, status, response_headers, exc_info=None):
"""
WSGI start_response callable.
"""
# Don't allow re-calling once response has begun
if self.response_started:
raise exc_info[1].with_traceback(exc_info[2])
# Don't allow re-calling without exc_info
if hasattr(self, "response_start") and exc_info is None:
raise ValueError(
"You cannot call start_response a second time without exc_info"
)
# Extract status code
status_code, _ = status.split(" ", 1)
status_code = int(status_code)
# Extract headers
headers = [
(name.lower().encode("ascii"), value.encode("ascii"))
for name, value in response_headers
]
# Extract content-length
self.response_content_length = None
for name, value in response_headers:
if name.lower() == "content-length":
self.response_content_length = int(value)
# Build and send response start message.
self.response_start = {
"type": "http.response.start",
"status": status_code,
"headers": headers,
}
@sync_to_async
def run_wsgi_app(self, body):
"""
Called in a subthread to run the WSGI app. We encapsulate like
this so that the start_response callable is called in the same thread.
"""
# Translate the scope and incoming request body into a WSGI environ
environ = self.build_environ(self.scope, body)
# Run the WSGI app
bytes_sent = 0
for output in self.wsgi_application(environ, self.start_response):
# If this is the first response, include the response headers
if not self.response_started:
self.response_started = True
self.sync_send(self.response_start)
# If the application supplies a Content-Length header
if self.response_content_length is not None:
# The server should not transmit more bytes to the client than the header allows
bytes_allowed = self.response_content_length - bytes_sent
if len(output) > bytes_allowed:
output = output[:bytes_allowed]
self.sync_send(
{"type": "http.response.body", "body": output, "more_body": True}
)
bytes_sent += len(output)
# The server should stop iterating over the response when enough data has been sent
if bytes_sent == self.response_content_length:
break
# Close connection
if not self.response_started:
self.response_started = True
self.sync_send(self.response_start)
self.sync_send({"type": "http.response.body"})
07070100000017000041ED00000000000000000000000260DDEA6C00000000000000000000000000000000000000000000002100000000asgiref-3.4.1.0+git.c897542/docs07070100000018000081A400000000000000000000000160DDEA6C00000259000000000000000000000000000000000000002A00000000asgiref-3.4.1.0+git.c897542/docs/Makefile# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SPHINXPROJ = ASGI
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)07070100000019000081A400000000000000000000000160DDEA6C00001411000000000000000000000000000000000000002900000000asgiref-3.4.1.0+git.c897542/docs/conf.py#!/usr/bin/env python3
from typing import Dict, List
#
# ASGI documentation build configuration file, created by
# sphinx-quickstart on Thu May 17 21:22:10 2018.
#
# This file is execfile()d with the current directory set to its
# containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
# import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions: List[str] = []
# Add any paths that contain templates here, relative to this directory.
templates_path: List[str] = []
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = ".rst"
# The master toctree document.
master_doc = "index"
# General information about the project.
project = "ASGI"
copyright = "2018, ASGI Team"
author = "ASGI Team"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = "3.0"
# The full version, including alpha/beta/rc tags.
release = "3.0"
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = "sphinx"
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = False
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = "default"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ["_static"]
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# This is required for the alabaster theme
# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars
# html_sidebars = {
# '**': [
# 'about.html',
# 'navigation.html',
# 'relations.html',
# 'searchbox.html',
# 'donate.html',
# ]
# }
# -- Options for HTMLHelp output ------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = "ASGIdoc"
# -- Options for LaTeX output ---------------------------------------------
latex_elements: Dict[str, str] = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, "ASGI.tex", "ASGI Documentation", "ASGI Team", "manual"),
]
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [(master_doc, "asgi", "ASGI Documentation", [author], 1)]
# -- Options for Texinfo output -------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(
master_doc,
"ASGI",
"ASGI Documentation",
author,
"ASGI",
"One line description of project.",
"Miscellaneous",
),
]
0707010000001A000081A400000000000000000000000160DDEA6C000013C9000000000000000000000000000000000000003000000000asgiref-3.4.1.0+git.c897542/docs/extensions.rstExtensions
==========
The ASGI specification provides for server-specific extensions to be
used outside of the core ASGI specification. This document specifies
some common extensions.
Websocket Denial Response
-------------------------
Websocket connections start with the client sending a HTTP request
containing the appropriate upgrade headers. On receipt of this request
a server can choose to either upgrade the connection or respond with an
HTTP response (denying the upgrade). The core ASGI specification does
not allow for any control over the denial response, instead specifying
that the HTTP status code ``403`` should be returned, whereas this
extension allows an ASGI framework to control the
denial response. Rather than being a core part of
ASGI, this is an extension for what is considered a niche feature as most
clients do not utilise the denial response.
ASGI Servers that implement this extension will provide
``websocket.http.response`` in the extensions part of the scope::
"scope": {
...
"extensions": {
"websocket.http.response": {},
},
}
This will allow the ASGI Framework to send HTTP response messages
after the ``websocket.connect`` message. These messages cannot be
followed by any other websocket messages as the server should send a
HTTP response and then close the connection.
The messages themselves should be ``websocket.http.response.start``
and ``websocket.http.response.body`` with a structure that matches the
``http.response.start`` and ``http.response.body`` messages defined in
the HTTP part of the core ASGI specification.
HTTP/2 Server Push
------------------
HTTP/2 allows for a server to push a resource to a client by sending a
push promise. ASGI servers that implement this extension will provide
``http.response.push`` in the extensions part of the scope::
"scope": {
...
"extensions": {
"http.response.push": {},
},
}
An ASGI framework can initiate a server push by sending a message with
the following keys. This message can be sent at any time after the
*Response Start* message but before the final *Response Body* message.
Keys:
* ``type`` (*Unicode string*): ``"http.response.push"``
* ``path`` (*Unicode string*): HTTP path from URL, with percent-encoded
sequences and UTF-8 byte sequences decoded into characters.
* ``headers`` (*Iterable[[byte string, byte string]]*): An iterable of
``[name, value]`` two-item iterables, where ``name`` is the header name, and
``value`` is the header value. Header names must be lowercased. Pseudo
headers (present in HTTP/2 and HTTP/3) must not be present.
The ASGI server should then attempt to send a server push (or push
promise) to the client. If the client supports server push, the server
should create a new connection to a new instance of the application
and treat it as if the client had made a request.
The ASGI server should set the pseudo ``:authority`` header value to
be the same value as the request that triggered the push promise.
Zero Copy Send
--------------
Zero Copy Send allows you to send the contents of a file descriptor to the
HTTP client with zero copy (where the underlying OS directly handles the data
transfer from a source file or socket without loading it into Python and
writing it out again).
ASGI servers that implement this extension will provide
``http.response.zerocopysend`` in the extensions part of the scope::
"scope": {
...
"extensions": {
"http.response.zerocopysend": {},
},
}
The ASGI framework can initiate a zero-copy send by sending a message with
the following keys. This message can be sent at any time after the
*Response Start* message but before the final *Response Body* message,
and can be mixed with ``http.response.body``. It can also be called
multiple times in one response. Except for the characteristics of
zero-copy, it should behave the same as ordinary ``http.response.body``.
Keys:
* ``type`` (*Unicode string*): ``"http.response.zerocopysend"``
* ``file`` (*file descriptor object*): An opened file descriptor object
with an underlying OS file descriptor that can be used to call
``os.sendfile``. (e.g. not BytesIO)
* ``offset`` (*int*): Optional. If this value exists, it will specify
the offset at which sendfile starts to read data from ``file``.
Otherwise, it will be read from the current position of ``file``.
* ``count`` (*int*): Optional. ``count`` is the number of bytes to
copy between the file descriptors. If omitted, the file will be read until
its end.
* ``more_body`` (*bool*): Signifies if there is additional content
to come (as part of a Response Body message). If ``False``, response
will be taken as complete and closed, and any further messages on
the channel will be ignored. Optional; if missing defaults to
``False``.
After calling this extension to respond, the ASGI application itself should
actively close the used file descriptor - ASGI servers are not responsible for
closing descriptors.
0707010000001B000081A400000000000000000000000160DDEA6C00000935000000000000000000000000000000000000003500000000asgiref-3.4.1.0+git.c897542/docs/implementations.rst===============
Implementations
===============
Complete or upcoming implementations of ASGI - servers, frameworks, and other
useful pieces.
Servers
=======
Daphne
------
*Stable* / http://github.com/django/daphne
The current ASGI reference server, written in Twisted and maintained as part
of the Django Channels project. Supports HTTP/1, HTTP/2, and WebSockets.
Uvicorn
-------
*Stable* / https://www.uvicorn.org/
A fast ASGI server based on uvloop and httptools.
Supports HTTP/1 and WebSockets.
Hypercorn
---------
*Beta* / https://pgjones.gitlab.io/hypercorn/index.html
An ASGI server based on the sans-io hyper, h11, h2, and wsproto libraries.
Supports HTTP/1, HTTP/2, and WebSockets.
Application Frameworks
======================
Django/Channels
---------------
*Stable* / http://channels.readthedocs.io
Channels is the Django project to add asynchronous support to Django and is the
original driving force behind the ASGI project. Supports HTTP and WebSockets
with Django integration, and any protocol with ASGI-native code.
FastAPI
-------
*Beta* / https://github.com/tiangolo/fastapi
FastAPI is an ASGI web framework (made with Starlette) for building web APIs based on
standard Python type annotations and standards like OpenAPI, JSON Schema, and OAuth2.
Supports HTTP and WebSockets.
Quart
-----
*Beta* / https://github.com/pgjones/quart
Quart is a Python ASGI web microframework. It is intended to provide the easiest
way to use asyncio functionality in a web context, especially with existing Flask apps.
Supports HTTP.
Starlette
---------
*Beta* / https://github.com/encode/starlette
Starlette is a minimalist ASGI library for writing against basic but powerful
``Request`` and ``Response`` classes. Supports HTTP.
rpc.py
------
*Beta* / https://github.com/abersheeran/rpc.py
An easy-to-use and powerful RPC framework. RPC server base on WSGI & ASGI, client base
on ``httpx``. Supports synchronous functions, asynchronous functions, synchronous
generator functions, and asynchronous generator functions. Optional use of Type hint
for type conversion. Optional OpenAPI document generation.
Tools
=====
a2wsgi
------
*Stable* / https://github.com/abersheeran/a2wsgi
Convert WSGI application to ASGI application or ASGI application to WSGI application.
Pure Python. Only depend on the standard library.
0707010000001C000081A400000000000000000000000160DDEA6C00000392000000000000000000000000000000000000002B00000000asgiref-3.4.1.0+git.c897542/docs/index.rst
ASGI Documentation
==================
ASGI (*Asynchronous Server Gateway Interface*) is a spiritual successor to
WSGI, intended to provide a standard interface between async-capable Python
web servers, frameworks, and applications.
Where WSGI provided a standard for synchronous Python apps, ASGI provides one
for both asynchronous and synchronous apps, with a WSGI backwards-compatibility
implementation and multiple servers and application frameworks.
You can read more in the :doc:`introduction <introduction>` to ASGI, look
through the :doc:`specifications <specs/index>`, and see what
:doc:`implementations <implementations>` there already are or that are upcoming.
Contribution and discussion about ASGI is welcome, and mostly happens on
the `asgiref GitHub repository <https://github.com/django/asgiref>`_.
.. toctree::
:maxdepth: 1
introduction
specs/index
extensions
implementations
0707010000001D000081A400000000000000000000000160DDEA6C00000BCA000000000000000000000000000000000000003200000000asgiref-3.4.1.0+git.c897542/docs/introduction.rstIntroduction
============
ASGI is a spiritual successor to
`WSGI <https://www.python.org/dev/peps/pep-3333/>`_, the long-standing Python
standard for compatibility between web servers, frameworks, and applications.
WSGI succeeded in allowing much more freedom and innovation in the Python
web space, and ASGI's goal is to continue this onward into the land of
asynchronous Python.
What's wrong with WSGI?
-----------------------
You may ask "why not just upgrade WSGI"? This has been asked many times over
the years, and the problem usually ends up being that WSGI's single-callable
interface just isn't suitable for more involved Web protocols like WebSocket.
WSGI applications are a single, synchronous callable that takes a request and
returns a response; this doesn't allow for long-lived connections, like you
get with long-poll HTTP or WebSocket connections.
Even if we made this callable asynchronous, it still only has a single path
to provide a request, so protocols that have multiple incoming events (like
receiving WebSocket frames) can't trigger this.
How does ASGI work?
-------------------
ASGI is structured as a single, asynchronous callable. It takes a ``scope``,
which is a ``dict`` containing details about the specific connection,
``send``, an asynchronous callable, that lets the application send event messages
to the client, and ``receive``, an asynchronous callable which lets the application
receive event messages from the client.
This not only allows multiple incoming events and outgoing events for each
application, but also allows for a background coroutine so the application can
do other things (such as listening for events on an external trigger, like a
Redis queue).
In its simplest form, an application can be written as an asynchronous function,
like this::
async def application(scope, receive, send):
event = await receive()
...
await send({"type": "websocket.send", ...})
Every *event* that you send or receive is a Python ``dict``, with a predefined
format. It's these event formats that form the basis of the standard, and allow
applications to be swappable between servers.
These *events* each have a defined ``type`` key, which can be used to infer
the event's structure. Here's an example event that you might receive from
``receive`` with the body from a HTTP request::
{
"type": "http.request",
"body": b"Hello World",
"more_body": False,
}
And here's an example of an event you might pass to ``send`` to send an
outgoing WebSocket message::
{
"type": "websocket.send",
"text": "Hello world!",
}
WSGI compatibility
------------------
ASGI is also designed to be a superset of WSGI, and there's a defined way
of translating between the two, allowing WSGI applications to be run inside
ASGI servers through a translation wrapper (provided in the ``asgiref``
library). A threadpool can be used to run the synchronous WSGI applications
away from the async event loop.
0707010000001E000041ED00000000000000000000000260DDEA6C00000000000000000000000000000000000000000000002700000000asgiref-3.4.1.0+git.c897542/docs/specs0707010000001F000081A400000000000000000000000160DDEA6C0000016F000000000000000000000000000000000000003100000000asgiref-3.4.1.0+git.c897542/docs/specs/index.rstSpecifications
==============
These are the specifications for ASGI. The root specification outlines how
applications are structured and called, and the protocol specifications outline
the events that can be sent and received for each protocol.
.. toctree::
:maxdepth: 2
ASGI Specification <main>
HTTP and WebSocket protocol <www>
Lifespan <lifespan>
07070100000020000081A400000000000000000000000160DDEA6C00000026000000000000000000000000000000000000003400000000asgiref-3.4.1.0+git.c897542/docs/specs/lifespan.rst.. include:: ../../specs/lifespan.rst
07070100000021000081A400000000000000000000000160DDEA6C00000022000000000000000000000000000000000000003000000000asgiref-3.4.1.0+git.c897542/docs/specs/main.rst.. include:: ../../specs/asgi.rst
07070100000022000081A400000000000000000000000160DDEA6C00000021000000000000000000000000000000000000002F00000000asgiref-3.4.1.0+git.c897542/docs/specs/www.rst.. include:: ../../specs/www.rst
07070100000023000081A400000000000000000000000160DDEA6C00000A7A000000000000000000000000000000000000002600000000asgiref-3.4.1.0+git.c897542/setup.cfg[metadata]
name = asgiref
version = attr: asgiref.__version__
url = https://github.com/django/asgiref/
author = Django Software Foundation
author_email = foundation@djangoproject.com
description = ASGI specs, helper code, and adapters
long_description = file: README.rst
license = BSD
classifiers =
Development Status :: 5 - Production/Stable
Environment :: Web Environment
Intended Audience :: Developers
License :: OSI Approved :: BSD License
Operating System :: OS Independent
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: Internet :: WWW/HTTP
project_urls =
Documentation = https://asgi.readthedocs.io/
Further Documentation = https://docs.djangoproject.com/en/stable/topics/async/#async-adapter-functions
Changelog = https://github.com/django/asgiref/blob/master/CHANGELOG.txt
[options]
python_requires = >=3.6
packages = find:
include_package_data = true
install_requires =
typing_extensions; python_version < "3.8"
zip_safe = false
[options.extras_require]
tests =
pytest
pytest-asyncio
mypy>=0.800
[tool:pytest]
testpaths = tests
[flake8]
exclude = venv/*,tox/*,specs/*
ignore = E123,E128,E266,E402,W503,E731,W601
max-line-length = 119
[isort]
profile = black
multi_line_output = 3
[mypy]
warn_unused_ignores = True
strict = True
[mypy-asgiref.current_thread_executor]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-asgiref.local]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-asgiref.sync]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-asgiref.compatibility]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-asgiref.wsgi]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-asgiref.testing]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-asgiref.server]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-test_server]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-test_wsgi]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-test_testing]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-test_sync_contextvars]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-test_sync]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-test_local]
disallow_untyped_defs = False
check_untyped_defs = False
[mypy-test_compatibility]
disallow_untyped_defs = False
check_untyped_defs = False
07070100000024000081A400000000000000000000000160DDEA6C0000003E000000000000000000000000000000000000002500000000asgiref-3.4.1.0+git.c897542/setup.pyfrom setuptools import setup # type: ignore[import]
setup()
07070100000025000041ED00000000000000000000000260DDEA6C00000000000000000000000000000000000000000000002200000000asgiref-3.4.1.0+git.c897542/specs07070100000026000081A400000000000000000000000160DDEA6C00003CF1000000000000000000000000000000000000002B00000000asgiref-3.4.1.0+git.c897542/specs/asgi.rst==========================================================
ASGI (Asynchronous Server Gateway Interface) Specification
==========================================================
**Version**: 3.0 (2019-03-20)
Abstract
========
This document proposes a standard interface between network protocol
servers (particularly web servers) and Python applications, intended
to allow handling of multiple common protocol styles (including HTTP, HTTP/2,
and WebSocket).
This base specification is intended to fix in place the set of APIs by which
these servers interact and run application code;
each supported protocol (such as HTTP) has a sub-specification that outlines
how to encode and decode that protocol into messages.
Rationale
=========
The WSGI specification has worked well since it was introduced, and
allowed for great flexibility in Python framework and web server choice.
However, its design is irrevocably tied to the HTTP-style
request/response cycle, and more and more protocols that do not follow this
pattern are becoming a standard part of web programming (most notably,
WebSocket).
ASGI attempts to preserve a simple application interface, while providing an
abstraction that allows for data to be sent and received at any time, and from
different application threads or processes.
It also takes the principle of turning protocols into Python-compatible,
asynchronous-friendly sets of messages and generalises it into two parts;
a standardised interface for communication around which to build servers (this
document), and a set of standard message formats for each protocol.
Its primary goal is to provide a way to write HTTP/2 and WebSocket code
alongside normal HTTP handling code, however; part of this design means
ensuring there is an easy path to use both existing WSGI servers and
applications, as a large majority of Python web usage relies on WSGI and
providing an easy path forward is critical to adoption. Details on that
interoperability are covered in the ASGI-HTTP spec.
Overview
========
ASGI consists of two different components:
- A *protocol server*, which terminates sockets and translates them into
connections and per-connection event messages.
- An *application*, which lives inside a *protocol server*, is called once
per connection, and handles event messages as they happen, emitting its own
event messages back when necessary.
Like WSGI, the server hosts the application inside it, and dispatches incoming
requests to it in a standardized format. Unlike WSGI, however, applications
are asynchronous callables rather than simple callables, and they communicate with
the server by receiving and sending asynchronous event messages rather than receiving
a single input stream and returning a single iterable. ASGI applications must run as
``async`` / ``await`` compatible coroutines (i.e. ``asyncio``-compatible) (on the main thread;
they are free to use threading or other processes if they need synchronous
code).
Unlike WSGI, there are two separate parts to an ASGI connection:
- A *connection scope*, which represents a protocol connection to a user and
survives until the connection closes.
- *Events*, which are messages sent to the application as things happen on the
connection, and messages sent back by the application to be received by the server,
including data to be transmitted to the client.
Applications are called and awaited with a connection ``scope`` and two awaitable
callables to ``receive`` event messages and ``send`` event messages back. All this
happening in an asynchronous event loop.
Each call of the application callable maps to a single incoming "socket" or
connection, and is expected to last the lifetime of that connection plus a little
longer if there is cleanup to do. Some protocols may not use traditional sockets; ASGI
specifications for those protocols are expected to define what the scope lifetime is
and when it gets shut down.
Specification Details
=====================
Connection Scope
----------------
Every connection by a user to an ASGI application results in a call of the
application callable to handle that connection entirely. How long this lives,
and the information that describes each specific connection, is called the
*connection scope*.
Closely related, the first argument passed to an application callable is a
``scope`` dictionary with all the information describing that specific connection.
For example, under HTTP the connection scope lasts just one request, but the ``scope``
passed contains most of the request data (apart from the HTTP request body, as this
is streamed in via events).
Under WebSocket, though, the connection scope lasts for as long as the socket
is connected. And the ``scope`` passed contains information like the WebSocket's path, but
details like incoming messages come through as events instead.
Some protocols may give you a ``scope`` with very limited information up
front because they encapsulate something like a handshake. Each protocol
definition must contain information about how long its connection scope lasts,
and what information you will get in the ``scope`` parameter.
Depending on the protocol spec, applications may have to wait for an initial
opening message before communicating with the client.
Events
------
ASGI decomposes protocols into a series of *events* that an application must
*receive* and react to, and *events* the application might *send* in response.
For HTTP, this is as simple as *receiving* two events in order - ``http.request``
and ``http.disconnect``, and *sending* the corresponding event messages back. For
something like a WebSocket, it could be more like *receiving* ``websocket.connect``,
*sending* a ``websocket.send``, *receiving* a ``websocket.receive``, and finally
*receiving* a ``websocket.disconnect``.
Each event is a ``dict`` with a top-level ``type`` key that contains a
Unicode string of the message type. Users are free to invent their own message
types and send them between application instances for high-level events - for
example, a chat application might send chat messages with a user type of
``mychat.message``. It is expected that applications should be able to handle
a mixed set of events, some sourced from the incoming client connection and
some from other parts of the application.
Because these messages could be sent over a network, they need to be
serializable, and so they are only allowed to contain the following types:
* Byte strings
* Unicode strings
* Integers (within the signed 64-bit range)
* Floating point numbers (within the IEEE 754 double precision range; no
``Nan`` or infinities)
* Lists (tuples should be encoded as lists)
* Dicts (keys must be Unicode strings)
* Booleans
* ``None``
Applications
------------
.. note::
The application format changed in 3.0 to use a single callable, rather than
the prior two-callable format. Two-callable is documented below in
"Legacy Applications"; servers can easily implement support for it using
the ``asgiref.compatibility`` library, and should try to support it.
ASGI applications should be a single async callable::
coroutine application(scope, receive, send)
* ``scope``: The connection scope information, a dictionary that contains at least a
``type`` key specifying the protocol that is incoming
* ``receive``: an awaitable callable that will yield a new event dictionary
when one is available
* ``send``: an awaitable callable taking a single event dictionary as a
positional argument that will return once the send has been
completed or the connection has been closed
The application is called once per "connection". The definition of a connection
and its lifespan are dictated by the protocol specification in question. For
example, with HTTP it is one request, whereas for a WebSocket it is a single
WebSocket connection.
Both the ``scope`` and the format of the event messages you send and receive
are defined by one of the application protocols. ``scope`` must be a
``dict``. The key ``scope["type"]`` will always be present, and can
be used to work out which protocol is incoming. The key
``scope["asgi"]`` will also be present as a dictionary containing a
``scope["asgi"]["version"]`` key that corresponds to the ASGI version
the server implements. If missing, the version should default to ``"2.0"``.
There may also be a spec-specific version present as
``scope["asgi"]["spec_version"]``. This allows the individual protocol
specifications to make enhancements without bumping the overall ASGI version.
The protocol-specific sub-specifications cover these scope and event message formats.
They are equivalent to the specification for keys in the ``environ`` dict for
WSGI.
Legacy Applications
-------------------
Legacy (v2.0) ASGI applications are defined as a callable::
application(scope)
Which returns another, awaitable callable::
coroutine application_instance(receive, send)
The meanings of ``scope``, ``receive`` and ``send`` are the same as in the
newer single-callable application, but note that the first callable is
*synchronous*.
The first callable is called when the connection is started, and then the
second callable is called and awaited immediately afterwards.
This style was retired in version 3.0 as the two-callable layout was deemed
unnecessary. It's now legacy, but there are applications out there written in
this style, and so it's important to support them.
There is a compatibility suite available in the ``asgiref.compatibility``
module which allows you to both detect legacy applications and convert them
to the new single-protocol style seamlessly. Servers are encouraged to support
both types as of ASGI 3.0 and gradually drop support by default over time.
Protocol Specifications
-----------------------
These describe the standardized scope and message formats for various
protocols.
The one common key across all scopes and messages is ``type``, a way to
indicate what type of scope or event message is being received.
In scopes, the ``type`` key must be a Unicode string, like ``"http"`` or
``"websocket"``, as defined in the relevant protocol specification.
In messages, the ``type`` should be namespaced as ``protocol.message_type``,
where the ``protocol`` matches the scope type, and ``message_type`` is
defined by the protocol spec. Examples of a message ``type`` value include
``http.request`` and ``websocket.send``.
.. note::
Applications should actively reject any protocol that they do not understand
with an `Exception` (of any type). Failure to do this may result in the
server thinking you support a protocol you don't, which can be confusing when
using with the Lifespan protocol, as the server will wait to start until you
tell it.
Current protocol specifications:
* :doc:`HTTP and WebSocket <www>`
* :doc:`Lifespan <lifespan>`
Middleware
----------
It is possible to have ASGI "middleware" - code that plays the role of both
server and application, taking in a ``scope`` and the ``send``/``receive`` awaitable callables,
potentially modifying them, and then calling an inner application.
When middleware is modifying the ``scope``, it should make a copy of the ``scope``
object before mutating it and passing it to the inner application, as changes
may leak upstream otherwise. In particular, you should not assume that the copy
of the ``scope`` you pass down to the application is the one that it ends up using,
as there may be other middleware in the way; thus, do not keep a reference to
it and try to mutate it outside of the initial ASGI app call. Your one and only
chance to add to it is before you hand control to the child application.
Error Handling
--------------
If a server receives an invalid event dictionary - for example, having an
unknown ``type``, missing keys an event type should have, or with wrong Python
types for objects (e.g. Unicode strings for HTTP headers) - it should raise an
exception out of the ``send`` awaitable back to the application.
If an application receives an invalid event dictionary from ``receive``, it
should raise an exception.
In both cases, the presence of additional keys in the event dictionary should
not raise an exception. This allows non-breaking upgrades to protocol
specifications over time.
Servers are free to surface errors that bubble up out of application instances
they are running however they wish - log to console, send to syslog, or other
options - but they must terminate the application instance and its associated
connection if this happens.
Note that messages received by a server after the connection has been
closed are not considered errors. In this case the ``send`` awaitable
callable should act as a no-op.
Extra Coroutines
----------------
Frameworks or applications may want to run extra coroutines in addition to the
coroutine launched for each application instance. Since there is no way to
parent these to the instance's coroutine in Python 3.7, applications should
ensure that all coroutines launched as part of running an application are terminated
either before or at the same time as the application's coroutine.
Any coroutines that continue to run outside of this window have no guarantees
about their lifetime and may be killed at any time.
Extensions
----------
There are times when protocol servers may want to provide server-specific
extensions outside of a core ASGI protocol specification, or when a change
to a specification is being trialled before being rolled in.
For this use case, we define a common pattern for ``extensions`` - named
additions to a protocol specification that are optional but that, if provided
by the server and understood by the application, can be used to get more
functionality.
This is achieved via an ``extensions`` entry in the ``scope`` dictionary, which
is itself a ``dict``. Extensions have a Unicode string name that
is agreed upon between servers and applications.
If the server supports an extension, it should place an entry into the
``extensions`` dictionary under the extension's name, and the value of that
entry should itself be a ``dict``. Servers can provide any extra scope
information that is part of the extension inside this value or, if the
extension is only to indicate that the server accepts additional events via
the ``send`` callable, it may just be an empty ``dict``.
As an example, imagine a HTTP protocol server wishes to provide an extension
that allows a new event to be sent back to the server that tries to flush the
network send buffer all the way through the OS level. It provides an empty
entry in the ``extensions`` dictionary to signal that it can handle the event::
scope = {
"type": "http",
"method": "GET",
...
"extensions": {
"fullflush": {},
},
}
If an application sees this it then knows it can send the custom event
(say, of type ``http.fullflush``) via the ``send`` callable.
Strings and Unicode
-------------------
In this document, and all sub-specifications, *byte string* refers to
the ``bytes`` type in Python 3. *Unicode string* refers to the ``str`` type
in Python 3.
This document will never specify just *string* - all strings are one of the
two exact types.
All ``dict`` keys mentioned (including those for *scopes* and *events*) are
Unicode strings.
Version History
===============
* 3.0 (2019-03-04): Changed to single-callable application style
* 2.0 (2017-11-28): Initial non-channel-layer based ASGI spec
Copyright
=========
This document has been placed in the public domain.
07070100000027000081A400000000000000000000000160DDEA6C00001017000000000000000000000000000000000000002F00000000asgiref-3.4.1.0+git.c897542/specs/lifespan.rst=================
Lifespan Protocol
=================
**Version**: 2.0 (2019-03-20)
The Lifespan ASGI sub-specification outlines how to communicate
lifespan events such as startup and shutdown within ASGI. This refers to the
lifespan of the main event loop. In a multi-process environment there will be
lifespan events in each process.
The lifespan messages allow for an application to initialise and
shutdown in the context of a running event loop. An example of this
would be creating a connection pool and subsequently closing the
connection pool to release the connections.
A possible implementation of this protocol is given below::
async def app(scope, receive, send):
if scope['type'] == 'lifespan':
while True:
message = await receive()
if message['type'] == 'lifespan.startup':
... # Do some startup here!
await send({'type': 'lifespan.startup.complete'})
elif message['type'] == 'lifespan.shutdown':
... # Do some shutdown here!
await send({'type': 'lifespan.shutdown.complete'})
return
else:
pass # Handle other types
Scope
'''''
The lifespan scope exists for the duration of the event loop.
The scope information passed in ``scope`` contains basic metadata:
* ``type`` (*Unicode string*) -- ``"lifespan"``.
* ``asgi["version"]`` (*Unicode string*) -- The version of the ASGI spec.
* ``asgi["spec_version"]`` (*Unicode string*) -- The version of this spec being
used. Optional; if missing defaults to ``"1.0"``.
If an exception is raised when calling the application callable with a
``lifespan.startup`` message or a ``scope`` with type ``lifespan``,
the server must continue but not send any lifespan events.
This allows for compatibility with applications that do not support the
lifespan protocol. If you want to log an error that occurs during lifespan
startup and prevent the server from starting, then send back
``lifespan.startup.failed`` instead.
Startup - ``receive`` event
'''''''''''''''''''''''''''
Sent to the application when the server is ready to startup and receive connections,
but before it has started to do so.
Keys:
* ``type`` (*Unicode string*) -- ``"lifespan.startup"``.
Startup Complete - ``send`` event
'''''''''''''''''''''''''''''''''
Sent by the application when it has completed its startup. A server
must wait for this message before it starts processing connections.
Keys:
* ``type`` (*Unicode string*) -- ``"lifespan.startup.complete"``.
Startup Failed - ``send`` event
'''''''''''''''''''''''''''''''
Sent by the application when it has failed to complete its startup. If a server
sees this it should log/print the message provided and then exit.
Keys:
* ``type`` (*Unicode string*) -- ``"lifespan.startup.failed"``.
* ``message`` (*Unicode string*) -- Optional; if missing defaults to ``""``.
Shutdown - ``receive`` event
''''''''''''''''''''''''''''
Sent to the application when the server has stopped accepting connections and closed
all active connections.
Keys:
* ``type`` (*Unicode string*) -- ``"lifespan.shutdown"``.
Shutdown Complete - ``send`` event
''''''''''''''''''''''''''''''''''
Sent by the application when it has completed its cleanup. A server
must wait for this message before terminating.
Keys:
* ``type`` (*Unicode string*) -- ``"lifespan.shutdown.complete"``.
Shutdown Failed - ``send`` event
''''''''''''''''''''''''''''''''
Sent by the application when it has failed to complete its cleanup. If a server
sees this it should log/print the message provided and then terminate.
Keys:
* ``type`` (*Unicode string*) -- ``"lifespan.shutdown.failed"``.
* ``message`` (*Unicode string*) -- Optional; if missing defaults to ``""``.
Version History
===============
* 2.0 (2019-03-04): Added startup.failed and shutdown.failed,
clarified exception handling during startup phase.
* 1.0 (2018-09-06): Updated ASGI spec with a lifespan protocol.
Copyright
=========
This document has been placed in the public domain.
07070100000028000081A400000000000000000000000160DDEA6C00004D2B000000000000000000000000000000000000002A00000000asgiref-3.4.1.0+git.c897542/specs/www.rst====================================
HTTP & WebSocket ASGI Message Format
====================================
**Version**: 2.3 (2021-02-02)
The HTTP+WebSocket ASGI sub-specification outlines how to transport HTTP/1.1,
HTTP/2 and WebSocket connections within ASGI.
It is deliberately intended and designed to be a superset of the WSGI format
and specifies how to translate between the two for the set of requests that
are able to be handled by WSGI.
Spec Versions
-------------
This spec has had three versions:
* ``2.0``: The first version of the spec, released with ASGI 2.0
* ``2.1``: Added the ``headers`` key to the WebSocket Accept response
* ``2.2``: Allow ``None`` in the second item of ``server`` scope value.
* ``2.3``: Added the ``reason`` key to the WebSocket close event.
Spec versions let you understand what the server you are using understands. If
a server tells you it only supports version ``2.0`` of this spec, then
sending ``headers`` with a WebSocket Accept message is an error, for example.
They are separate from the HTTP version or the ASGI version.
HTTP
----
The HTTP format covers HTTP/1.0, HTTP/1.1 and HTTP/2, as the changes in
HTTP/2 are largely on the transport level. A protocol server should give
different scopes to different requests on the same HTTP/2 connection, and
correctly multiplex the responses back to the same stream in which they came.
The HTTP version is available as a string in the scope.
Multiple header fields with the same name are complex in HTTP. RFC 7230
states that for any header field that can appear multiple times, it is exactly
equivalent to sending that header field only once with all the values joined by
commas.
However, RFC 7230 and RFC 6265 make it clear that this rule does not apply to
the various headers used by HTTP cookies (``Cookie`` and ``Set-Cookie``). The
``Cookie`` header must only be sent once by a user-agent, but the
``Set-Cookie`` header may appear repeatedly and cannot be joined by commas.
The ASGI design decision is to transport both request and response headers as
lists of 2-element ``[name, value]`` lists and preserve headers exactly as they
were provided.
The HTTP protocol should be signified to ASGI applications with a ``type``
value of ``http``.
HTTP Connection Scope
'''''''''''''''''''''
HTTP connections have a single-request *connection scope* - that is, your
application will be called at the start of the request, and will last until
the end of that specific request, even if the underlying socket is still open
and serving multiple requests.
If you hold a response open for long-polling or similar, the *connection scope*
will persist until the response closes from either the client or server side.
The *connection scope* information passed in ``scope`` contains:
* ``type`` (*Unicode string*) -- ``"http"``.
* ``asgi["version"]`` (*Unicode string*) -- Version of the ASGI spec.
* ``asgi["spec_version"]`` (*Unicode string*) -- Version of the ASGI
HTTP spec this server understands; one of ``"2.0"`` or
``"2.1"``. Optional; if missing assume ``2.0``.
* ``http_version`` (*Unicode string*) -- One of ``"1.0"``, ``"1.1"`` or ``"2"``.
* ``method`` (*Unicode string*) -- The HTTP method name, uppercased.
* ``scheme`` (*Unicode string*) -- URL scheme portion (likely ``"http"`` or
``"https"``). Optional (but must not be empty); default is ``"http"``.
* ``path`` (*Unicode string*) -- HTTP request target excluding any query
string, with percent-encoded sequences and UTF-8 byte sequences
decoded into characters.
* ``raw_path`` (*byte string*) -- The original HTTP path component
unmodified from the bytes that were received by the web server. Some
web server implementations may be unable to provide this. Optional;
if missing defaults to ``None``.
* ``query_string`` (*byte string*) -- URL portion after the ``?``,
percent-encoded.
* ``root_path`` (*Unicode string*) -- The root path this application
is mounted at; same as ``SCRIPT_NAME`` in WSGI. Optional; if missing
defaults to ``""``.
* ``headers`` (*Iterable[[byte string, byte string]]*) -- An iterable of
``[name, value]`` two-item iterables, where ``name`` is the header name, and
``value`` is the header value. Order of header values must be preserved from
the original HTTP request; order of header names is not important. Duplicates
are possible and must be preserved in the message as received. Header names
should be lowercased, but it is not required; servers should preserve header case
on a best-effort basis. Pseudo headers (present in HTTP/2 and HTTP/3) must be
removed; if ``:authority`` is present its value must be added to the start of
the iterable with ``host`` as the header name or replace any existing host
header already present.
* ``client`` (*Iterable[Unicode string, int]*) -- A two-item iterable
of ``[host, port]``, where ``host`` is the remote host's IPv4 or
IPv6 address, and ``port`` is the remote port as an
integer. Optional; if missing defaults to ``None``.
* ``server`` (*Iterable[Unicode string, Optional[int]]*) -- Either a
two-item iterable of ``[host, port]``, where ``host`` is the
listening address for this server, and ``port`` is the integer
listening port, or ``[path, None]`` where ``path`` is that of the
unix socket. Optional; if missing defaults to ``None``.
Servers are responsible for handling inbound and outbound chunked transfer
encodings. A request with a ``chunked`` encoded body should be automatically
de-chunked by the server and presented to the application as plain body bytes;
a response that is given to the server with no ``Content-Length`` may be chunked
as the server sees fit.
Request - ``receive`` event
'''''''''''''''''''''''''''
Sent to the application to indicate an incoming request. Most of the request
information is in the connection ``scope``; the body message serves as a way to
stream large incoming HTTP bodies in chunks, and as a trigger to actually run
request code (as you should not trigger on a connection opening alone).
Note that if the request is being sent using ``Transfer-Encoding: chunked``,
the server is responsible for handling this encoding. The ``http.request``
messages should contain just the decoded contents of each chunk.
Keys:
* ``type`` (*Unicode string*) -- ``"http.request"``.
* ``body`` (*byte string*) -- Body of the request. Optional; if
missing defaults to ``b""``. If ``more_body`` is set, treat as start
of body and concatenate on further chunks.
* ``more_body`` (*bool*) -- Signifies if there is additional content
to come (as part of a Request message). If ``True``, the consuming
application should wait until it gets a chunk with this set to
``False``. If ``False``, the request is complete and should be
processed. Optional; if missing defaults to ``False``.
Response Start - ``send`` event
'''''''''''''''''''''''''''''''
Sent by the application to start sending a response to the client. Needs to be
followed by at least one response content message. The protocol server must not
start sending the response to the client until it has received at least one
*Response Body* event.
You may send a ``Transfer-Encoding`` header in this message, but the server
must ignore it. Servers handle ``Transfer-Encoding`` themselves, and may opt
to use ``Transfer-Encoding: chunked`` if the application presents a response
that has no ``Content-Length`` set.
Note that this is not the same as ``Content-Encoding``, which the application
still controls, and which is the appropriate place to set ``gzip`` or other
compression flags.
Keys:
* ``type`` (*Unicode string*) -- ``"http.response.start"``.
* ``status`` (*int*) -- HTTP status code.
* ``headers`` (*Iterable[[byte string, byte string]]*) -- An iterable
of ``[name, value]`` two-item iterables, where ``name`` is the
header name, and ``value`` is the header value. Order must be
preserved in the HTTP response. Header names must be
lowercased. Optional; if missing defaults to an empty list. Pseudo
headers (present in HTTP/2 and HTTP/3) must not be present.
Response Body - ``send`` event
''''''''''''''''''''''''''''''
Continues sending a response to the client. Protocol servers must
flush any data passed to them into the send buffer before returning from a
send call. If ``more_body`` is set to ``False`` this will
close the connection.
Keys:
* ``type`` (*Unicode string*) -- ``"http.response.body"``.
* ``body`` (*byte string*) -- HTTP body content. Concatenated onto any
previous ``body`` values sent in this connection scope. Optional; if
missing defaults to ``b""``.
* ``more_body`` (*bool*) -- Signifies if there is additional content
to come (as part of a Response Body message). If ``False``, response
will be taken as complete and closed, and any further messages on
the channel will be ignored. Optional; if missing defaults to
``False``.
Disconnect - ``receive`` event
''''''''''''''''''''''''''''''
Sent to the application when a HTTP connection is closed or if ``receive``
is called after a response has been sent. This is mainly useful for
long-polling, where you may want to trigger cleanup code if the
connection closes early.
Keys:
* ``type`` (*Unicode string*) -- ``"http.disconnect"``.
WebSocket
---------
WebSockets share some HTTP details - they have a path and headers - but also
have more state. Again, most of that state is in the ``scope``, which will live
as long as the socket does.
WebSocket protocol servers should handle PING/PONG messages themselves, and
send PING messages as necessary to ensure the connection is alive.
WebSocket protocol servers should handle message fragmentation themselves,
and deliver complete messages to the application.
The WebSocket protocol should be signified to ASGI applications with
a ``type`` value of ``websocket``.
Websocket Connection Scope
''''''''''''''''''''''''''
WebSocket connections' scope lives as long as the socket itself - if the
application dies the socket should be closed, and vice-versa.
The *connection scope* information passed in ``scope`` contains initial connection
metadata (mostly from the HTTP request line and headers):
* ``type`` (*Unicode string*) -- ``"websocket"``.
* ``asgi["version"]`` (*Unicode string*) -- The version of the ASGI spec.
* ``asgi["spec_version"]`` (*Unicode string*) -- Version of the ASGI
HTTP spec this server understands; one of ``"2.0"`` or
``"2.1"``. Optional; if missing assume ``"2.0"``.
* ``http_version`` (*Unicode string*) -- One of ``"1.1"`` or
``"2"``. Optional; if missing default is ``"1.1"``.
* ``scheme`` (*Unicode string*) -- URL scheme portion (likely ``"ws"`` or
``"wss"``). Optional (but must not be empty); default is ``"ws"``.
* ``path`` (*Unicode string*) -- HTTP request target excluding any query
string, with percent-encoded sequences and UTF-8 byte sequences
decoded into characters.
* ``raw_path`` (*byte string*) -- The original HTTP path component
unmodified from the bytes that were received by the web server. Some
web server implementations may be unable to provide this. Optional;
if missing defaults to ``None``.
* ``query_string`` (*byte string*) -- URL portion after the
``?``. Optional; if missing or ``None`` default is empty string.
* ``root_path`` (*byte string*) -- The root path this application is
mounted at; same as ``SCRIPT_NAME`` in WSGI. Optional; if missing
defaults to empty string.
* ``headers`` (*Iterable[[byte string, byte string]]*) -- An iterable of
``[name, value]`` two-item iterables, where ``name`` is the header name and
``value`` is the header value. Order should be preserved from the original
HTTP request; duplicates are possible and must be preserved in the message
as received. Header names should be lowercased, but it is not required;
servers should preserve header case on a best-effort basis.
Pseudo headers (present in HTTP/2 and HTTP/3) must be removed;
if ``:authority`` is present its value must be added to the
start of the iterable with ``host`` as the header name
or replace any existing host header already present.
* ``client`` (*Iterable[Unicode string, int]*) -- A two-item iterable
of ``[host, port]``, where ``host`` is the remote host's IPv4 or
IPv6 address, and ``port`` is the remote port. Optional; if missing
defaults to ``None``.
* ``server`` (*Iterable[Unicode string, Optional[int]]*) -- Either a
two-item iterable of ``[host, port]``, where ``host`` is the
listening address for this server, and ``port`` is the integer
listening port, or ``[path, None]`` where ``path`` is that of the
unix socket. Optional; if missing defaults to ``None``.
* ``subprotocols`` (*Iterable[Unicode string]*) -- Subprotocols the
client advertised. Optional; if missing defaults to empty list.
Connect - ``receive`` event
'''''''''''''''''''''''''''
Sent to the application when the client initially opens a connection and is about
to finish the WebSocket handshake.
This message must be responded to with either an *Accept* message
or a *Close* message before the socket will pass ``websocket.receive``
messages. The protocol server must send this message
during the handshake phase of the WebSocket and not complete the handshake
until it gets a reply, returning HTTP status code ``403`` if the connection is
denied.
Keys:
* ``type`` (*Unicode string*) -- ``"websocket.connect"``.
Accept - ``send`` event
'''''''''''''''''''''''
Sent by the application when it wishes to accept an incoming connection.
* ``type`` (*Unicode string*) -- ``"websocket.accept"``.
* ``subprotocol`` (*Unicode string*) -- The subprotocol the server
wishes to accept. Optional; if missing defaults to ``None``.
* ``headers`` (*Iterable[[byte string, byte string]]*) -- An iterable
of ``[name, value]`` two-item iterables, where ``name`` is the
header name, and ``value`` is the header value. Order must be
preserved in the HTTP response. Header names must be
lowercased. Must not include a header named
``sec-websocket-protocol``; use the ``subprotocol`` key
instead. Optional; if missing defaults to an empty list. *Added in
spec version 2.1*. Pseudo headers (present in HTTP/2 and HTTP/3)
must not be present.
Receive - ``receive`` event
'''''''''''''''''''''''''''
Sent to the application when a data message is received from the client.
Keys:
* ``type`` (*Unicode string*) -- ``"websocket.receive"``.
* ``bytes`` (*byte string*) -- The message content, if it was binary
mode, or ``None``. Optional; if missing, it is equivalent to
``None``.
* ``text`` (*Unicode string*) -- The message content, if it was text
mode, or ``None``. Optional; if missing, it is equivalent to
``None``.
Exactly one of ``bytes`` or ``text`` must be non-``None``. One or both
keys may be present, however.
Send - ``send`` event
'''''''''''''''''''''
Sent by the application to send a data message to the client.
Keys:
* ``type`` (*Unicode string*) -- ``"websocket.send"``.
* ``bytes`` (*byte string*) -- Binary message content, or ``None``.
Optional; if missing, it is equivalent to ``None``.
* ``text`` (*Unicode string*) -- Text message content, or ``None``.
Optional; if missing, it is equivalent to ``None``.
Exactly one of ``bytes`` or ``text`` must be non-``None``. One or both
keys may be present, however.
.. _disconnect-receive-event-ws:
Disconnect - ``receive`` event
''''''''''''''''''''''''''''''
Sent to the application when either connection to the client is lost, either from
the client closing the connection, the server closing the connection, or loss of the
socket.
Keys:
* ``type`` (*Unicode string*) -- ``"websocket.disconnect"``
* ``code`` (*int*) -- The WebSocket close code, as per the WebSocket spec.
Close - ``send`` event
''''''''''''''''''''''
Sent by the application to tell the server to close the connection.
If this is sent before the socket is accepted, the server
must close the connection with a HTTP 403 error code
(Forbidden), and not complete the WebSocket handshake; this may present on some
browsers as a different WebSocket error code (such as 1006, Abnormal Closure).
If this is sent after the socket is accepted, the server must close the socket
with the close code passed in the message (or 1000 if none is specified).
* ``type`` (*Unicode string*) -- ``"websocket.close"``.
* ``code`` (*int*) -- The WebSocket close code, as per the WebSocket
spec. Optional; if missing defaults to ``1000``.
* ``reason`` (*Unicode string*) -- A reason given for the closure, can
be any string. Optional; if missing or ``None`` default is empty
string.
WSGI Compatibility
------------------
Part of the design of the HTTP portion of this spec is to make sure it
aligns well with the WSGI specification, to ensure easy adaptability
between both specifications and the ability to keep using WSGI
applications with ASGI servers.
WSGI applications, being synchronous, must be run in a threadpool in order
to be served, but otherwise their runtime maps onto the HTTP connection scope's
lifetime.
There is an almost direct mapping for the various special keys in
WSGI's ``environ`` variable to the ``http`` scope:
* ``REQUEST_METHOD`` is the ``method``
* ``SCRIPT_NAME`` is ``root_path``
* ``PATH_INFO`` can be derived from ``path`` and ``root_path``
* ``QUERY_STRING`` is ``query_string``
* ``CONTENT_TYPE`` can be extracted from ``headers``
* ``CONTENT_LENGTH`` can be extracted from ``headers``
* ``SERVER_NAME`` and ``SERVER_PORT`` are in ``server``
* ``REMOTE_HOST``/``REMOTE_ADDR`` and ``REMOTE_PORT`` are in ``client``
* ``SERVER_PROTOCOL`` is encoded in ``http_version``
* ``wsgi.url_scheme`` is ``scheme``
* ``wsgi.input`` is a ``StringIO`` based around the ``http.request`` messages
* ``wsgi.errors`` is directed by the wrapper as needed
The ``start_response`` callable maps similarly to ``http.response.start``:
* The ``status`` argument becomes ``status``, with the reason phrase dropped.
* ``response_headers`` maps to ``headers``
Yielding content from the WSGI application maps to sending
``http.response.body`` messages.
WSGI encoding differences
-------------------------
The WSGI specification (as defined in PEP 3333) specifies that all strings
sent to or from the server must be of the ``str`` type but only contain
codepoints in the ISO-8859-1 ("latin-1") range. This was due to it originally
being designed for Python 2 and its different set of string types.
The ASGI HTTP and WebSocket specifications instead specify each entry of the
``scope`` dict as either a byte string or a Unicode string. HTTP, being an
older protocol, is sometimes imperfect at specifying encoding, so some
decisions of what is Unicode versus bytes may not be obvious.
* ``path``: URLs can have both percent-encoded and UTF-8 encoded sections.
Because decoding these is often done by the underlying server (or sometimes
even proxies in the path), this is a Unicode string, fully decoded from both
UTF-8 encoding and percent encodings.
* ``headers``: These are byte strings of the exact byte sequences sent by the
client/to be sent by the server. While modern HTTP standards say that headers
should be ASCII, older ones did not and allowed a wider range of characters.
Frameworks/applications should decode headers as they deem appropriate.
* ``query_string``: Unlike the ``path``, this is not as subject to server
interference and so is presented as its raw byte string version,
percent-encoded.
* ``root_path``: Unicode string to match ``path``.
Version History
---------------
* 2.0 (2017-11-28): Initial non-channel-layer based ASGI spec
Copyright
---------
This document has been placed in the public domain.
07070100000029000041ED00000000000000000000000260DDEA6C00000000000000000000000000000000000000000000002200000000asgiref-3.4.1.0+git.c897542/tests0707010000002A000081A400000000000000000000000160DDEA6C00000919000000000000000000000000000000000000003800000000asgiref-3.4.1.0+git.c897542/tests/test_compatibility.pyimport pytest
from asgiref.compatibility import double_to_single_callable, is_double_callable
from asgiref.testing import ApplicationCommunicator
def double_application_function(scope):
"""
A nested function based double-callable application.
"""
async def inner(receive, send):
message = await receive()
await send({"scope": scope["value"], "message": message["value"]})
return inner
class DoubleApplicationClass:
"""
A classic class-based double-callable application.
"""
def __init__(self, scope):
pass
async def __call__(self, receive, send):
pass
class DoubleApplicationClassNestedFunction:
"""
A function closure inside a class!
"""
def __init__(self):
pass
def __call__(self, scope):
async def inner(receive, send):
pass
return inner
async def single_application_function(scope, receive, send):
"""
A single-function single-callable application
"""
pass
class SingleApplicationClass:
"""
A single-callable class (where you'd pass the class instance in,
e.g. middleware)
"""
def __init__(self):
pass
async def __call__(self, scope, receive, send):
pass
def test_is_double_callable():
"""
Tests that the signature matcher works as expected.
"""
assert is_double_callable(double_application_function) is True
assert is_double_callable(DoubleApplicationClass) is True
assert is_double_callable(DoubleApplicationClassNestedFunction()) is True
assert is_double_callable(single_application_function) is False
assert is_double_callable(SingleApplicationClass()) is False
def test_double_to_single_signature():
"""
Test that the new object passes a signature test.
"""
assert (
is_double_callable(double_to_single_callable(double_application_function))
is False
)
@pytest.mark.asyncio
async def test_double_to_single_communicator():
"""
Test that the new application works
"""
new_app = double_to_single_callable(double_application_function)
instance = ApplicationCommunicator(new_app, {"value": "woohoo"})
await instance.send_input({"value": 42})
assert await instance.receive_output() == {"scope": "woohoo", "message": 42}
0707010000002B000081A400000000000000000000000160DDEA6C0000029E000000000000000000000000000000000000003B00000000asgiref-3.4.1.0+git.c897542/tests/test_deprecated_types.pyimport importlib
import pytest
from asgiref import typing
@pytest.mark.parametrize("deprecated_type", typing.__deprecated__.keys())
def test_deprecated_types(deprecated_type: str) -> None:
with pytest.warns(DeprecationWarning) as record:
getattr(importlib.import_module("asgiref.typing"), deprecated_type)
assert len(record) == 1
assert deprecated_type in str(record.list[0])
@pytest.mark.parametrize("available_type", typing.__all__)
def test_available_types(available_type: str) -> None:
with pytest.warns(None) as record:
getattr(importlib.import_module("asgiref.typing"), available_type)
assert len(record) == 0
0707010000002C000081A400000000000000000000000160DDEA6C00001F5F000000000000000000000000000000000000003000000000asgiref-3.4.1.0+git.c897542/tests/test_local.pyimport gc
import threading
import pytest
from asgiref.local import Local
from asgiref.sync import async_to_sync, sync_to_async
@pytest.mark.asyncio
async def test_local_task():
"""
Tests that local works just inside a normal task context
"""
test_local = Local()
# Unassigned should be an error
with pytest.raises(AttributeError):
test_local.foo
# Assign and check it persists
test_local.foo = 1
assert test_local.foo == 1
# Delete and check it errors again
del test_local.foo
with pytest.raises(AttributeError):
test_local.foo
def test_local_thread():
"""
Tests that local works just inside a normal thread context
"""
test_local = Local()
# Unassigned should be an error
with pytest.raises(AttributeError):
test_local.foo
# Assign and check it persists
test_local.foo = 2
assert test_local.foo == 2
# Delete and check it errors again
del test_local.foo
with pytest.raises(AttributeError):
test_local.foo
def test_local_thread_nested():
"""
Tests that local does not leak across threads
"""
test_local = Local()
# Unassigned should be an error
with pytest.raises(AttributeError):
test_local.foo
# Assign and check it does not persist inside the thread
class TestThread(threading.Thread):
# Failure reason
failed = "unknown"
def run(self):
# Make sure the attribute is not there
try:
test_local.foo
self.failed = "leak inside"
return
except AttributeError:
pass
# Check the value is good
self.failed = "set inside"
test_local.foo = 123
assert test_local.foo == 123
# Binary signal that these tests passed to the outer thread
self.failed = ""
test_local.foo = 8
thread = TestThread()
thread.start()
thread.join()
assert thread.failed == ""
# Check it didn't leak back out
assert test_local.foo == 8
def test_local_cycle():
"""
Tests that Local can handle cleanup up a cycle to itself
(Borrowed and modified from the CPython threadlocal tests)
"""
locals = None
matched = 0
e1 = threading.Event()
e2 = threading.Event()
def f():
nonlocal matched
# Involve Local in a cycle
cycle = [Local()]
cycle.append(cycle)
cycle[0].foo = "bar"
# GC the cycle
del cycle
gc.collect()
# Trigger the local creation outside
e1.set()
e2.wait()
# New Locals should be empty
matched = len(
[local for local in locals if getattr(local, "foo", None) == "bar"]
)
t = threading.Thread(target=f)
t.start()
e1.wait()
# Creates locals outside of the inner thread
locals = [Local() for i in range(100)]
e2.set()
t.join()
assert matched == 0
@pytest.mark.asyncio
async def test_local_task_to_sync():
"""
Tests that local carries through sync_to_async
"""
# Set up the local
test_local = Local()
test_local.foo = 3
# Look at it in a sync context
def sync_function():
assert test_local.foo == 3
test_local.foo = "phew, done"
await sync_to_async(sync_function)()
# Check the value passed out again
assert test_local.foo == "phew, done"
def test_local_thread_to_async():
"""
Tests that local carries through async_to_sync
"""
# Set up the local
test_local = Local()
test_local.foo = 12
# Look at it in an async context
async def async_function():
assert test_local.foo == 12
test_local.foo = "inside"
async_to_sync(async_function)()
# Check the value passed out again
assert test_local.foo == "inside"
@pytest.mark.asyncio
async def test_local_task_to_sync_to_task():
"""
Tests that local carries through sync_to_async and then back through
async_to_sync
"""
# Set up the local
test_local = Local()
test_local.foo = 756
# Look at it in an async context inside a sync context
def sync_function():
async def async_function():
assert test_local.foo == 756
test_local.foo = "dragons"
async_to_sync(async_function)()
await sync_to_async(sync_function)()
# Check the value passed out again
assert test_local.foo == "dragons"
@pytest.mark.asyncio
async def test_local_many_layers():
"""
Tests that local carries through a lot of layers of sync/async
"""
# Set up the local
test_local = Local()
test_local.foo = 8374
# Make sure we go between each world at least twice
def sync_function():
async def async_function():
def sync_function_2():
async def async_function_2():
assert test_local.foo == 8374
test_local.foo = "miracles"
async_to_sync(async_function_2)()
await sync_to_async(sync_function_2)()
async_to_sync(async_function)()
await sync_to_async(sync_function)()
# Check the value passed out again
assert test_local.foo == "miracles"
@pytest.mark.asyncio
async def test_local_critical_no_task_to_thread():
"""
Tests that local doesn't go through sync_to_async when the local is set
as thread-critical.
"""
# Set up the local
test_local = Local(thread_critical=True)
test_local.foo = 86
# Look at it in a sync context
def sync_function():
with pytest.raises(AttributeError):
test_local.foo
test_local.foo = "secret"
assert test_local.foo == "secret"
await sync_to_async(sync_function)()
# Check the value outside is not touched
assert test_local.foo == 86
def test_local_critical_no_thread_to_task():
"""
Tests that local does not go through async_to_sync when the local is set
as thread-critical
"""
# Set up the local
test_local = Local(thread_critical=True)
test_local.foo = 89
# Look at it in an async context
async def async_function():
with pytest.raises(AttributeError):
test_local.foo
test_local.foo = "numbers"
assert test_local.foo == "numbers"
async_to_sync(async_function)()
# Check the value outside
assert test_local.foo == 89
@pytest.mark.asyncio
async def test_local_threads_and_tasks():
"""
Tests that local and threads don't interfere with each other.
"""
test_local = Local()
test_local.counter = 0
def sync_function(expected):
assert test_local.counter == expected
test_local.counter += 1
async def async_function(expected):
assert test_local.counter == expected
test_local.counter += 1
await sync_to_async(sync_function)(0)
assert test_local.counter == 1
await async_function(1)
assert test_local.counter == 2
class TestThread(threading.Thread):
def run(self):
with pytest.raises(AttributeError):
test_local.counter
test_local.counter = -1
await sync_to_async(sync_function)(2)
assert test_local.counter == 3
threads = [TestThread() for _ in range(5)]
for thread in threads:
thread.start()
threads[0].join()
await sync_to_async(sync_function)(3)
assert test_local.counter == 4
await async_function(4)
assert test_local.counter == 5
for thread in threads[1:]:
thread.join()
await sync_to_async(sync_function)(5)
assert test_local.counter == 6
def test_local_del_swallows_type_error(monkeypatch):
test_local = Local()
blow_up_calls = 0
def blow_up(self):
nonlocal blow_up_calls
blow_up_calls += 1
raise TypeError()
monkeypatch.setattr("weakref.WeakSet.__iter__", blow_up)
test_local.__del__()
assert blow_up_calls == 1
0707010000002D000081A400000000000000000000000160DDEA6C0000012C000000000000000000000000000000000000003100000000asgiref-3.4.1.0+git.c897542/tests/test_server.pyfrom asgiref.server import StatelessServer
def test_stateless_server():
"""StatelessServer can be instantiated with an ASGI 3 application."""
async def app(scope, receive, send):
pass
server = StatelessServer(app)
server.get_or_create_application_instance("scope_id", {})
0707010000002E000081A400000000000000000000000160DDEA6C000044A8000000000000000000000000000000000000002F00000000asgiref-3.4.1.0+git.c897542/tests/test_sync.pyimport asyncio
import functools
import multiprocessing
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from unittest import TestCase
import pytest
from asgiref.compatibility import create_task, get_running_loop
from asgiref.sync import ThreadSensitiveContext, async_to_sync, sync_to_async
@pytest.mark.asyncio
async def test_sync_to_async():
"""
Tests we can call sync functions from an async thread
(even if the number of thread workers is less than the number of calls)
"""
# Define sync function
def sync_function():
time.sleep(1)
return 42
# Ensure outermost detection works
# Wrap it
async_function = sync_to_async(sync_function)
# Check it works right
start = time.monotonic()
result = await async_function()
end = time.monotonic()
assert result == 42
assert end - start >= 1
# Set workers to 1, call it twice and make sure that works right
loop = get_running_loop()
old_executor = loop._default_executor or ThreadPoolExecutor()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
try:
start = time.monotonic()
await asyncio.wait(
[
create_task(async_function()),
create_task(async_function()),
]
)
end = time.monotonic()
# It should take at least 2 seconds as there's only one worker.
assert end - start >= 2
finally:
loop.set_default_executor(old_executor)
def test_sync_to_async_fail_non_function():
"""
async_to_sync raises a TypeError when called with a non-function.
"""
with pytest.raises(TypeError) as excinfo:
sync_to_async(1)
assert excinfo.value.args == (
"sync_to_async can only be applied to sync functions.",
)
@pytest.mark.asyncio
async def test_sync_to_async_fail_async():
"""
sync_to_async raises a TypeError when applied to a sync function.
"""
with pytest.raises(TypeError) as excinfo:
@sync_to_async
async def test_function():
pass
assert excinfo.value.args == (
"sync_to_async can only be applied to sync functions.",
)
@pytest.mark.asyncio
async def test_async_to_sync_fail_partial():
"""
sync_to_async raises a TypeError when applied to a sync partial.
"""
with pytest.raises(TypeError) as excinfo:
async def test_function(*args):
pass
partial_function = functools.partial(test_function, 42)
sync_to_async(partial_function)
assert excinfo.value.args == (
"sync_to_async can only be applied to sync functions.",
)
@pytest.mark.asyncio
async def test_sync_to_async_decorator():
"""
Tests sync_to_async as a decorator
"""
# Define sync function
@sync_to_async
def test_function():
time.sleep(1)
return 43
# Check it works right
result = await test_function()
assert result == 43
@pytest.mark.asyncio
async def test_nested_sync_to_async_retains_wrapped_function_attributes():
"""
Tests that attributes of functions wrapped by sync_to_async are retained
"""
def enclosing_decorator(attr_value):
@wraps(attr_value)
def wrapper(f):
f.attr_name = attr_value
return f
return wrapper
@enclosing_decorator("test_name_attribute")
@sync_to_async
def test_function():
pass
assert test_function.attr_name == "test_name_attribute"
assert test_function.__name__ == "test_function"
@pytest.mark.asyncio
async def test_sync_to_async_method_decorator():
"""
Tests sync_to_async as a method decorator
"""
# Define sync function
class TestClass:
@sync_to_async
def test_method(self):
time.sleep(1)
return 44
# Check it works right
instance = TestClass()
result = await instance.test_method()
assert result == 44
@pytest.mark.asyncio
async def test_sync_to_async_method_self_attribute():
"""
Tests sync_to_async on a method copies __self__
"""
# Define sync function
class TestClass:
def test_method(self):
time.sleep(0.1)
return 45
# Check it works right
instance = TestClass()
method = sync_to_async(instance.test_method)
result = await method()
assert result == 45
# Check __self__ has been copied
assert method.__self__ == instance
@pytest.mark.asyncio
async def test_async_to_sync_to_async():
"""
Tests we can call async functions from a sync thread created by async_to_sync
(even if the number of thread workers is less than the number of calls)
"""
result = {}
# Define async function
async def inner_async_function():
result["worked"] = True
result["thread"] = threading.current_thread()
return 65
# Define sync function
def sync_function():
return async_to_sync(inner_async_function)()
# Wrap it
async_function = sync_to_async(sync_function)
# Check it works right
number = await async_function()
assert number == 65
assert result["worked"]
# Make sure that it didn't needlessly make a new async loop
assert result["thread"] == threading.current_thread()
def test_async_to_sync_fail_non_function():
"""
async_to_sync raises a TypeError when applied to a non-function.
"""
with pytest.warns(UserWarning) as warnings:
async_to_sync(1)
assert warnings[0].message.args == (
"async_to_sync was passed a non-async-marked callable",
)
def test_async_to_sync_fail_sync():
"""
async_to_sync raises a TypeError when applied to a sync function.
"""
with pytest.warns(UserWarning) as warnings:
@async_to_sync
def test_function(self):
pass
assert warnings[0].message.args == (
"async_to_sync was passed a non-async-marked callable",
)
def test_async_to_sync():
"""
Tests we can call async_to_sync outside of an outer event loop.
"""
result = {}
# Define async function
async def inner_async_function():
await asyncio.sleep(0)
result["worked"] = True
return 84
# Run it
sync_function = async_to_sync(inner_async_function)
number = sync_function()
assert number == 84
assert result["worked"]
def test_async_to_sync_decorator():
"""
Tests we can call async_to_sync as a function decorator
"""
result = {}
# Define async function
@async_to_sync
async def test_function():
await asyncio.sleep(0)
result["worked"] = True
return 85
# Run it
number = test_function()
assert number == 85
assert result["worked"]
def test_async_to_sync_method_decorator():
"""
Tests we can call async_to_sync as a function decorator
"""
result = {}
# Define async function
class TestClass:
@async_to_sync
async def test_function(self):
await asyncio.sleep(0)
result["worked"] = True
return 86
# Run it
instance = TestClass()
number = instance.test_function()
assert number == 86
assert result["worked"]
@pytest.mark.asyncio
async def test_async_to_sync_in_async():
"""
Makes sure async_to_sync bails if you try to call it from an async loop
"""
# Define async function
async def inner_async_function():
return 84
# Run it
sync_function = async_to_sync(inner_async_function)
with pytest.raises(RuntimeError):
sync_function()
def test_async_to_sync_in_thread():
"""
Tests we can call async_to_sync inside a thread
"""
result = {}
# Define async function
@async_to_sync
async def test_function():
await asyncio.sleep(0)
result["worked"] = True
# Make a thread and run it
thread = threading.Thread(target=test_function)
thread.start()
thread.join()
assert result["worked"]
def test_async_to_sync_in_except():
"""
Tests we can call async_to_sync inside an except block without it
re-propagating the exception.
"""
# Define async function
@async_to_sync
async def test_function():
return 42
# Run inside except
try:
raise ValueError("Boom")
except ValueError:
assert test_function() == 42
def test_async_to_sync_partial():
"""
Tests we can call async_to_sync on an async partial.
"""
result = {}
# Define async function
async def inner_async_function(*args):
await asyncio.sleep(0)
result["worked"] = True
return [*args]
partial_function = functools.partial(inner_async_function, 42)
# Run it
sync_function = async_to_sync(partial_function)
out = sync_function(84)
assert out == [42, 84]
assert result["worked"]
def test_async_to_async_method_self_attribute():
"""
Tests async_to_async on a method copies __self__.
"""
# Define async function.
class TestClass:
async def test_function(self):
await asyncio.sleep(0)
return 45
# Check it works right.
instance = TestClass()
sync_function = async_to_sync(instance.test_function)
number = sync_function()
assert number == 45
# Check __self__ has been copied.
assert sync_function.__self__ is instance
def test_thread_sensitive_outside_sync():
"""
Tests that thread_sensitive SyncToAsync where the outside is sync code runs
in the main thread.
"""
result = {}
# Middle async function
@async_to_sync
async def middle():
await inner()
# Inner sync function
@sync_to_async
def inner():
result["thread"] = threading.current_thread()
# Run it
middle()
assert result["thread"] == threading.current_thread()
@pytest.mark.asyncio
async def test_thread_sensitive_outside_async():
"""
Tests that thread_sensitive SyncToAsync where the outside is async code runs
in a single, separate thread.
"""
result_1 = {}
result_2 = {}
# Outer sync function
@sync_to_async
def outer(result):
middle(result)
# Middle async function
@async_to_sync
async def middle(result):
await inner(result)
# Inner sync function
@sync_to_async
def inner(result):
result["thread"] = threading.current_thread()
# Run it (in supposed parallel!)
await asyncio.wait([create_task(outer(result_1)), create_task(inner(result_2))])
# They should not have run in the main thread, but in the same thread
assert result_1["thread"] != threading.current_thread()
assert result_1["thread"] == result_2["thread"]
@pytest.mark.asyncio
async def test_thread_sensitive_with_context_matches():
result_1 = {}
result_2 = {}
def store_thread(result):
result["thread"] = threading.current_thread()
store_thread_async = sync_to_async(store_thread)
async def fn():
async with ThreadSensitiveContext():
# Run it (in supposed parallel!)
await asyncio.wait(
[
create_task(store_thread_async(result_1)),
create_task(store_thread_async(result_2)),
]
)
await fn()
# They should not have run in the main thread, and on the same threads
assert result_1["thread"] != threading.current_thread()
assert result_1["thread"] == result_2["thread"]
@pytest.mark.asyncio
async def test_thread_sensitive_nested_context():
result_1 = {}
result_2 = {}
@sync_to_async
def store_thread(result):
result["thread"] = threading.current_thread()
async with ThreadSensitiveContext():
await store_thread(result_1)
async with ThreadSensitiveContext():
await store_thread(result_2)
# They should not have run in the main thread, and on the same threads
assert result_1["thread"] != threading.current_thread()
assert result_1["thread"] == result_2["thread"]
@pytest.mark.asyncio
async def test_thread_sensitive_context_without_sync_work():
async with ThreadSensitiveContext():
pass
def test_thread_sensitive_double_nested_sync():
"""
Tests that thread_sensitive SyncToAsync nests inside itself where the
outside is sync.
"""
result = {}
# Async level 1
@async_to_sync
async def level1():
await level2()
# Sync level 2
@sync_to_async
def level2():
level3()
# Async level 3
@async_to_sync
async def level3():
await level4()
# Sync level 2
@sync_to_async
def level4():
result["thread"] = threading.current_thread()
# Run it
level1()
assert result["thread"] == threading.current_thread()
@pytest.mark.asyncio
async def test_thread_sensitive_double_nested_async():
"""
Tests that thread_sensitive SyncToAsync nests inside itself where the
outside is async.
"""
result = {}
# Sync level 1
@sync_to_async
def level1():
level2()
# Async level 2
@async_to_sync
async def level2():
await level3()
# Sync level 3
@sync_to_async
def level3():
level4()
# Async level 4
@async_to_sync
async def level4():
result["thread"] = threading.current_thread()
# Run it
await level1()
assert result["thread"] == threading.current_thread()
def test_thread_sensitive_disabled():
"""
Tests that we can disable thread sensitivity and make things run in
separate threads.
"""
result = {}
# Middle async function
@async_to_sync
async def middle():
await inner()
# Inner sync function
@sync_to_async(thread_sensitive=False)
def inner():
result["thread"] = threading.current_thread()
# Run it
middle()
assert result["thread"] != threading.current_thread()
class ASGITest(TestCase):
"""
Tests collection of async cases inside classes
"""
@async_to_sync
async def test_wrapped_case_is_collected(self):
self.assertTrue(True)
def test_sync_to_async_detected_as_coroutinefunction():
"""
Tests that SyncToAsync functions are detected as coroutines.
"""
def sync_func():
return
assert not asyncio.iscoroutinefunction(sync_to_async)
assert asyncio.iscoroutinefunction(sync_to_async(sync_func))
@pytest.mark.asyncio
async def test_multiprocessing():
"""
Tests that a forked process can use async_to_sync without it looking for
the event loop from the parent process.
"""
test_queue = multiprocessing.Queue()
async def async_process():
test_queue.put(42)
def sync_process():
"""Runs async_process synchronously"""
async_to_sync(async_process)()
def fork_first():
"""Forks process before running sync_process"""
fork = multiprocessing.Process(target=sync_process)
fork.start()
fork.join(3)
# Force cleanup in failed test case
if fork.is_alive():
fork.terminate()
return test_queue.get(True, 1)
assert await sync_to_async(fork_first)() == 42
@pytest.mark.asyncio
async def test_sync_to_async_uses_executor():
"""
Tests that SyncToAsync uses the passed in executor correctly.
"""
class CustomExecutor:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=1)
self.times_submit_called = 0
def submit(self, callable_, *args, **kwargs):
self.times_submit_called += 1
return self.executor.submit(callable_, *args, **kwargs)
expected_result = "expected_result"
def sync_func():
return expected_result
custom_executor = CustomExecutor()
async_function = sync_to_async(
sync_func, thread_sensitive=False, executor=custom_executor
)
actual_result = await async_function()
assert actual_result == expected_result
assert custom_executor.times_submit_called == 1
pytest.raises(
TypeError,
sync_to_async,
sync_func,
thread_sensitive=True,
executor=custom_executor,
)
@pytest.mark.skipif(sys.version_info < (3, 7), reason="Issue persists with 3.6")
def test_sync_to_async_deadlock_raises():
def db_write():
pass
async def io_task():
await sync_to_async(db_write)()
async def do_io_tasks():
t = asyncio.create_task(io_task())
await t
# await asyncio.gather(io_task()) # Also deadlocks
# await io_task() # Works
def view():
async_to_sync(do_io_tasks)()
async def server_entry():
await sync_to_async(view)()
with pytest.raises(RuntimeError):
asyncio.run(server_entry())
@pytest.mark.skipif(sys.version_info < (3, 7), reason="Issue persists with 3.6")
def test_sync_to_async_deadlock_ignored_with_exception():
"""
Ensures that throwing an exception from inside a deadlock-protected block
still resets the deadlock detector's status.
"""
def view():
raise ValueError()
async def server_entry():
try:
await sync_to_async(view)()
except ValueError:
pass
try:
await sync_to_async(view)()
except ValueError:
pass
asyncio.run(server_entry())
0707010000002F000081A400000000000000000000000160DDEA6C00000859000000000000000000000000000000000000003B00000000asgiref-3.4.1.0+git.c897542/tests/test_sync_contextvars.pyimport asyncio
import threading
import time
import pytest
from asgiref.compatibility import create_task
from asgiref.sync import ThreadSensitiveContext, async_to_sync, sync_to_async
contextvars = pytest.importorskip("contextvars")
foo = contextvars.ContextVar("foo")
@pytest.mark.asyncio
async def test_thread_sensitive_with_context_different():
result_1 = {}
result_2 = {}
@sync_to_async
def store_thread(result):
result["thread"] = threading.current_thread()
async def fn(result):
async with ThreadSensitiveContext():
await store_thread(result)
# Run it (in true parallel!)
await asyncio.wait([create_task(fn(result_1)), create_task(fn(result_2))])
# They should not have run in the main thread, and on different threads
assert result_1["thread"] != threading.current_thread()
assert result_1["thread"] != result_2["thread"]
@pytest.mark.asyncio
async def test_sync_to_async_contextvars():
"""
Tests to make sure that contextvars from the calling context are
present in the called context, and that any changes in the called context
are then propagated back to the calling context.
"""
# Define sync function
def sync_function():
time.sleep(1)
assert foo.get() == "bar"
foo.set("baz")
return 42
# Ensure outermost detection works
# Wrap it
foo.set("bar")
async_function = sync_to_async(sync_function)
assert await async_function() == 42
assert foo.get() == "baz"
def test_async_to_sync_contextvars():
"""
Tests to make sure that contextvars from the calling context are
present in the called context, and that any changes in the called context
are then propagated back to the calling context.
"""
# Define sync function
async def async_function():
await asyncio.sleep(1)
assert foo.get() == "bar"
foo.set("baz")
return 42
# Ensure outermost detection works
# Wrap it
foo.set("bar")
sync_function = async_to_sync(async_function)
assert sync_function() == 42
assert foo.get() == "baz"
07070100000030000081A400000000000000000000000160DDEA6C00000574000000000000000000000000000000000000003200000000asgiref-3.4.1.0+git.c897542/tests/test_testing.pyimport pytest
from asgiref.testing import ApplicationCommunicator
from asgiref.wsgi import WsgiToAsgi
@pytest.mark.asyncio
async def test_receive_nothing():
"""
Tests ApplicationCommunicator.receive_nothing to return the correct value.
"""
# Get an ApplicationCommunicator instance
def wsgi_application(environ, start_response):
start_response("200 OK", [])
yield b"content"
application = WsgiToAsgi(wsgi_application)
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/foo/",
"query_string": b"bar=baz",
"headers": [],
},
)
# No event
assert await instance.receive_nothing() is True
# Produce 3 events to receive
await instance.send_input({"type": "http.request"})
# Start event of the response
assert await instance.receive_nothing() is False
await instance.receive_output()
# First body event of the response announcing further body event
assert await instance.receive_nothing() is False
await instance.receive_output()
# Last body event of the response
assert await instance.receive_nothing() is False
await instance.receive_output()
# Response received completely
assert await instance.receive_nothing(0.01) is True
07070100000031000081A400000000000000000000000160DDEA6C0000220E000000000000000000000000000000000000002F00000000asgiref-3.4.1.0+git.c897542/tests/test_wsgi.pyimport sys
import pytest
from asgiref.testing import ApplicationCommunicator
from asgiref.wsgi import WsgiToAsgi
@pytest.mark.asyncio
async def test_basic_wsgi():
"""
Makes sure the WSGI wrapper has basic functionality.
"""
# Define WSGI app
def wsgi_application(environ, start_response):
assert environ["HTTP_TEST_HEADER"] == "test value 1,test value 2"
start_response("200 OK", [["X-Colour", "Blue"]])
yield b"first chunk "
yield b"second chunk"
# Wrap it
application = WsgiToAsgi(wsgi_application)
# Launch it as a test application
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/foo/",
"query_string": b"bar=baz",
"headers": [
[b"test-header", b"test value 1"],
[b"test-header", b"test value 2"],
],
},
)
await instance.send_input({"type": "http.request"})
# Check they send stuff
assert (await instance.receive_output(1)) == {
"type": "http.response.start",
"status": 200,
"headers": [(b"x-colour", b"Blue")],
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"first chunk ",
"more_body": True,
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"second chunk",
"more_body": True,
}
assert (await instance.receive_output(1)) == {"type": "http.response.body"}
@pytest.mark.asyncio
async def test_wsgi_path_encoding():
"""
Makes sure the WSGI wrapper has basic functionality.
"""
# Define WSGI app
def wsgi_application(environ, start_response):
assert environ["SCRIPT_NAME"] == "/中国".encode().decode("latin-1")
assert environ["PATH_INFO"] == "/中文".encode().decode("latin-1")
start_response("200 OK", [])
yield b""
# Wrap it
application = WsgiToAsgi(wsgi_application)
# Launch it as a test application
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/中文",
"root_path": "/中国",
"query_string": b"bar=baz",
"headers": [],
},
)
await instance.send_input({"type": "http.request"})
# Check they send stuff
assert (await instance.receive_output(1)) == {
"type": "http.response.start",
"status": 200,
"headers": [],
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"",
"more_body": True,
}
assert (await instance.receive_output(1)) == {"type": "http.response.body"}
@pytest.mark.asyncio
async def test_wsgi_empty_body():
"""
Makes sure WsgiToAsgi handles an empty body response correctly
"""
def wsgi_application(environ, start_response):
start_response("200 OK", [])
return []
application = WsgiToAsgi(wsgi_application)
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"headers": [],
},
)
await instance.send_input({"type": "http.request"})
# response.start should always be send
assert (await instance.receive_output(1)) == {
"type": "http.response.start",
"status": 200,
"headers": [],
}
assert (await instance.receive_output(1)) == {"type": "http.response.body"}
@pytest.mark.asyncio
async def test_wsgi_clamped_body():
"""
Makes sure WsgiToAsgi clamps a body response longer than Content-Length
"""
def wsgi_application(environ, start_response):
start_response("200 OK", [("Content-Length", "8")])
return [b"0123", b"45", b"6789"]
application = WsgiToAsgi(wsgi_application)
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"headers": [],
},
)
await instance.send_input({"type": "http.request"})
assert (await instance.receive_output(1)) == {
"type": "http.response.start",
"status": 200,
"headers": [(b"content-length", b"8")],
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"0123",
"more_body": True,
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"45",
"more_body": True,
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"67",
"more_body": True,
}
assert (await instance.receive_output(1)) == {"type": "http.response.body"}
@pytest.mark.asyncio
async def test_wsgi_stops_iterating_after_content_length_bytes():
"""
Makes sure WsgiToAsgi does not iterate after than Content-Length bytes
"""
def wsgi_application(environ, start_response):
start_response("200 OK", [("Content-Length", "4")])
yield b"0123"
pytest.fail("WsgiToAsgi should not iterate after Content-Length bytes")
yield b"4567"
application = WsgiToAsgi(wsgi_application)
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"headers": [],
},
)
await instance.send_input({"type": "http.request"})
assert (await instance.receive_output(1)) == {
"type": "http.response.start",
"status": 200,
"headers": [(b"content-length", b"4")],
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"0123",
"more_body": True,
}
assert (await instance.receive_output(1)) == {"type": "http.response.body"}
@pytest.mark.asyncio
async def test_wsgi_multiple_start_response():
"""
Makes sure WsgiToAsgi only keep Content-Length from the last call to start_response
"""
def wsgi_application(environ, start_response):
start_response("200 OK", [("Content-Length", "5")])
try:
raise ValueError("Application Error")
except ValueError:
start_response("500 Server Error", [], sys.exc_info())
return [b"Some long error message"]
application = WsgiToAsgi(wsgi_application)
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"headers": [],
},
)
await instance.send_input({"type": "http.request"})
assert (await instance.receive_output(1)) == {
"type": "http.response.start",
"status": 500,
"headers": [],
}
assert (await instance.receive_output(1)) == {
"type": "http.response.body",
"body": b"Some long error message",
"more_body": True,
}
assert (await instance.receive_output(1)) == {"type": "http.response.body"}
@pytest.mark.asyncio
async def test_wsgi_multi_body():
"""
Verify that multiple http.request events with body parts are all delivered
to the WSGI application.
"""
def wsgi_application(environ, start_response):
infp = environ["wsgi.input"]
body = infp.read(12)
assert body == b"Hello World!"
start_response("200 OK", [])
return []
application = WsgiToAsgi(wsgi_application)
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "POST",
"path": "/",
"query_string": b"",
"headers": [[b"content-length", b"12"]],
},
)
await instance.send_input(
{"type": "http.request", "body": b"Hello ", "more_body": True}
)
await instance.send_input({"type": "http.request", "body": b"World!"})
assert (await instance.receive_output(1)) == {
"type": "http.response.start",
"status": 200,
"headers": [],
}
assert (await instance.receive_output(1)) == {"type": "http.response.body"}
07070100000032000081A400000000000000000000000160DDEA6C0000012E000000000000000000000000000000000000002400000000asgiref-3.4.1.0+git.c897542/tox.ini[tox]
envlist =
py{36,37,38,39,310}-{test,mypy}
qa
[testenv]
usedevelop = true
extras = tests
commands =
test: pytest -v {posargs}
mypy: mypy . {posargs}
[testenv:qa]
skip_install = true
deps =
pre-commit
commands =
pre-commit {posargs:run --all-files --show-diff-on-failure}
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!367 blocks