Source code
Revision control
Copy as Markdown
Other Tools
from typing import Optional, Tuple
from urllib.parse import urlsplit, parse_qsl
return_stream_id = 0;
summary : bytes = [];
def session_established(session):
# When a WebTransport session is established, a bidirectional stream is
# created by the server, which is used to echo back stream data from the
# client.
path: Optional[bytes] = None
for key, value in session.request_headers:
if key == b':path':
path = value
assert path is not None
qs = dict(parse_qsl(urlsplit(path).query))
token = qs[b'token']
if token is None:
raise Exception('token is missing, path = {}'.format(path))
session.dict_for_handlers['token'] = token
global summary;
# need an initial value to replace
session.stash.put(key=token, value=summary)
def stream_data_received(session,
stream_id: int,
data: bytes,
stream_ended: bool):
# we want to record the order that data arrives, and feed that ordering back to
# the sender. Instead of echoing all the data, we'll send back
# just the first byte of each message. This requires the sender to send buffers
# filled with only a single byte value.
# The test can then examine the stream of data received by the server to
# determine if orderings are correct.
# note that the size in bytes received here can vary wildly
# Send back the data on the control stream
global summary
summary += data[0:1]
token = session.dict_for_handlers['token']
old_data = session.stash.take(key=token) or {}
session.stash.put(key=token, value=summary)
def stream_reset(session, stream_id: int, error_code: int) -> None:
global summary;
token = session.dict_for_handlers['token']
session.stash.put(key=token, value=summary)
summary = []
# do something different to include datagrams...
def datagram_received(session, data: bytes):
session.send_datagram(data)