Source code
Revision control
Copy as Markdown
Other Tools
# -*- coding: utf-8 -*-
"""
test_invalid_headers.py
~~~~~~~~~~~~~~~~~~~~~~~
This module contains tests that use invalid header blocks, and validates that
they fail appropriately.
"""
import itertools
import pytest
import h2.config
import h2.connection
import h2.errors
import h2.events
import h2.exceptions
import h2.settings
import h2.utilities
import hyperframe.frame
from hypothesis import given
from hypothesis.strategies import binary, lists, tuples
HEADERS_STRATEGY = lists(tuples(binary(min_size=1), binary()))
class TestInvalidFrameSequences(object):
"""
Invalid header sequences cause ProtocolErrors to be thrown when received.
"""
base_request_headers = [
(':authority', 'example.com'),
(':path', '/'),
(':scheme', 'https'),
(':method', 'GET'),
('user-agent', 'someua/0.0.1'),
]
invalid_header_blocks = [
base_request_headers + [('Uppercase', 'name')],
base_request_headers + [(':late', 'pseudo-header')],
[(':path', 'duplicate-pseudo-header')] + base_request_headers,
base_request_headers + [('connection', 'close')],
base_request_headers + [('proxy-connection', 'close')],
base_request_headers + [('keep-alive', 'close')],
base_request_headers + [('transfer-encoding', 'gzip')],
base_request_headers + [('upgrade', 'super-protocol/1.1')],
base_request_headers + [('te', 'chunked')],
base_request_headers + [('host', 'notexample.com')],
base_request_headers + [(' name', 'name with leading space')],
base_request_headers + [('name ', 'name with trailing space')],
base_request_headers + [('name', ' value with leading space')],
base_request_headers + [('name', 'value with trailing space ')],
[header for header in base_request_headers
if header[0] != ':authority'],
[(':protocol', 'websocket')] + base_request_headers,
]
server_config = h2.config.H2Configuration(
client_side=False, header_encoding='utf-8'
)
@pytest.mark.parametrize('headers', invalid_header_blocks)
def test_headers_event(self, frame_factory, headers):
"""
Test invalid headers are rejected with PROTOCOL_ERROR.
"""
c = h2.connection.H2Connection(config=self.server_config)
c.receive_data(frame_factory.preamble())
c.clear_outbound_data_buffer()
f = frame_factory.build_headers_frame(headers)
data = f.serialize()
with pytest.raises(h2.exceptions.ProtocolError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=1, error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR
)
assert c.data_to_send() == expected_frame.serialize()
@pytest.mark.parametrize('headers', invalid_header_blocks)
def test_push_promise_event(self, frame_factory, headers):
"""
If a PUSH_PROMISE header frame is received with an invalid header block
it is rejected with a PROTOCOL_ERROR.
"""
c = h2.connection.H2Connection()
c.initiate_connection()
c.send_headers(
stream_id=1, headers=self.base_request_headers, end_stream=True
)
c.clear_outbound_data_buffer()
f = frame_factory.build_push_promise_frame(
stream_id=1,
promised_stream_id=2,
headers=headers
)
data = f.serialize()
with pytest.raises(h2.exceptions.ProtocolError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=0, error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR
)
assert c.data_to_send() == expected_frame.serialize()
@pytest.mark.parametrize('headers', invalid_header_blocks)
def test_push_promise_skipping_validation(self, frame_factory, headers):
"""
If we have ``validate_inbound_headers`` disabled, then invalid header
blocks in push promise frames are allowed to pass.
"""
config = h2.config.H2Configuration(
client_side=True,
validate_inbound_headers=False,
header_encoding='utf-8'
)
c = h2.connection.H2Connection(config=config)
c.initiate_connection()
c.send_headers(
stream_id=1, headers=self.base_request_headers, end_stream=True
)
c.clear_outbound_data_buffer()
f = frame_factory.build_push_promise_frame(
stream_id=1,
promised_stream_id=2,
headers=headers
)
data = f.serialize()
events = c.receive_data(data)
assert len(events) == 1
pp_event = events[0]
assert pp_event.headers == headers
@pytest.mark.parametrize('headers', invalid_header_blocks)
def test_headers_event_skipping_validation(self, frame_factory, headers):
"""
If we have ``validate_inbound_headers`` disabled, then all of these
invalid header blocks are allowed to pass.
"""
config = h2.config.H2Configuration(
client_side=False,
validate_inbound_headers=False,
header_encoding='utf-8'
)
c = h2.connection.H2Connection(config=config)
c.receive_data(frame_factory.preamble())
f = frame_factory.build_headers_frame(headers)
data = f.serialize()
events = c.receive_data(data)
assert len(events) == 1
request_event = events[0]
assert request_event.headers == headers
def test_te_trailers_is_valid(self, frame_factory):
"""
`te: trailers` is allowed by the filter.
"""
headers = (
self.base_request_headers + [('te', 'trailers')]
)
c = h2.connection.H2Connection(config=self.server_config)
c.receive_data(frame_factory.preamble())
f = frame_factory.build_headers_frame(headers)
data = f.serialize()
events = c.receive_data(data)
assert len(events) == 1
request_event = events[0]
assert request_event.headers == headers
def test_pseudo_headers_rejected_in_trailer(self, frame_factory):
"""
Ensure we reject pseudo headers included in trailers
"""
trailers = [(':path', '/'), ('extra', 'value')]
c = h2.connection.H2Connection(config=self.server_config)
c.receive_data(frame_factory.preamble())
c.clear_outbound_data_buffer()
header_frame = frame_factory.build_headers_frame(
self.base_request_headers
)
trailer_frame = frame_factory.build_headers_frame(
trailers, flags=["END_STREAM"]
)
head = header_frame.serialize()
trailer = trailer_frame.serialize()
c.receive_data(head)
# Raise exception if pseudo header in trailer
with pytest.raises(h2.exceptions.ProtocolError) as e:
c.receive_data(trailer)
assert "pseudo-header in trailer" in str(e.value)
# Test appropriate response frame is generated
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=1, error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR
)
assert c.data_to_send() == expected_frame.serialize()
class TestSendingInvalidFrameSequences(object):
"""
Trying to send invalid header sequences cause ProtocolErrors to
be thrown.
"""
base_request_headers = [
(':authority', 'example.com'),
(':path', '/'),
(':scheme', 'https'),
(':method', 'GET'),
('user-agent', 'someua/0.0.1'),
]
invalid_header_blocks = [
base_request_headers + [(':late', 'pseudo-header')],
[(':path', 'duplicate-pseudo-header')] + base_request_headers,
base_request_headers + [('te', 'chunked')],
base_request_headers + [('host', 'notexample.com')],
[header for header in base_request_headers
if header[0] != ':authority'],
]
strippable_header_blocks = [
base_request_headers + [('connection', 'close')],
base_request_headers + [('proxy-connection', 'close')],
base_request_headers + [('keep-alive', 'close')],
base_request_headers + [('transfer-encoding', 'gzip')],
base_request_headers + [('upgrade', 'super-protocol/1.1')]
]
all_header_blocks = invalid_header_blocks + strippable_header_blocks
server_config = h2.config.H2Configuration(client_side=False)
@pytest.mark.parametrize('headers', invalid_header_blocks)
def test_headers_event(self, frame_factory, headers):
"""
Test sending invalid headers raise a ProtocolError.
"""
c = h2.connection.H2Connection()
c.initiate_connection()
# Clear the data, then try to send headers.
c.clear_outbound_data_buffer()
with pytest.raises(h2.exceptions.ProtocolError):
c.send_headers(1, headers)
@pytest.mark.parametrize('headers', invalid_header_blocks)
def test_send_push_promise(self, frame_factory, headers):
"""
Sending invalid headers in a push promise raises a ProtocolError.
"""
c = h2.connection.H2Connection(config=self.server_config)
c.initiate_connection()
c.receive_data(frame_factory.preamble())
header_frame = frame_factory.build_headers_frame(
self.base_request_headers
)
c.receive_data(header_frame.serialize())
# Clear the data, then try to send a push promise.
c.clear_outbound_data_buffer()
with pytest.raises(h2.exceptions.ProtocolError):
c.push_stream(
stream_id=1, promised_stream_id=2, request_headers=headers
)
@pytest.mark.parametrize('headers', all_header_blocks)
def test_headers_event_skipping_validation(self, frame_factory, headers):
"""
If we have ``validate_outbound_headers`` disabled, then all of these
invalid header blocks are allowed to pass.
"""
config = h2.config.H2Configuration(
validate_outbound_headers=False
)
c = h2.connection.H2Connection(config=config)
c.initiate_connection()
# Clear the data, then send headers.
c.clear_outbound_data_buffer()
c.send_headers(1, headers)
# Ensure headers are still normalized.
norm_headers = h2.utilities.normalize_outbound_headers(headers, None)
f = frame_factory.build_headers_frame(norm_headers)
assert c.data_to_send() == f.serialize()
@pytest.mark.parametrize('headers', all_header_blocks)
def test_push_promise_skipping_validation(self, frame_factory, headers):
"""
If we have ``validate_outbound_headers`` disabled, then all of these
invalid header blocks are allowed to pass.
"""
config = h2.config.H2Configuration(
client_side=False,
validate_outbound_headers=False,
)
c = h2.connection.H2Connection(config=config)
c.initiate_connection()
c.receive_data(frame_factory.preamble())
header_frame = frame_factory.build_headers_frame(
self.base_request_headers
)
c.receive_data(header_frame.serialize())
# Create push promise frame with normalized headers.
frame_factory.refresh_encoder()
norm_headers = h2.utilities.normalize_outbound_headers(headers, None)
pp_frame = frame_factory.build_push_promise_frame(
stream_id=1, promised_stream_id=2, headers=norm_headers
)
# Clear the data, then send a push promise.
c.clear_outbound_data_buffer()
c.push_stream(
stream_id=1, promised_stream_id=2, request_headers=headers
)
assert c.data_to_send() == pp_frame.serialize()
@pytest.mark.parametrize('headers', all_header_blocks)
def test_headers_event_skip_normalization(self, frame_factory, headers):
"""
If we have ``normalize_outbound_headers`` disabled, then all of these
invalid header blocks are sent through unmodified.
"""
config = h2.config.H2Configuration(
validate_outbound_headers=False,
normalize_outbound_headers=False
)
c = h2.connection.H2Connection(config=config)
c.initiate_connection()
f = frame_factory.build_headers_frame(
headers,
stream_id=1,
)
# Clear the data, then send headers.
c.clear_outbound_data_buffer()
c.send_headers(1, headers)
assert c.data_to_send() == f.serialize()
@pytest.mark.parametrize('headers', all_header_blocks)
def test_push_promise_skip_normalization(self, frame_factory, headers):
"""
If we have ``normalize_outbound_headers`` disabled, then all of these
invalid header blocks are allowed to pass unmodified.
"""
config = h2.config.H2Configuration(
client_side=False,
validate_outbound_headers=False,
normalize_outbound_headers=False,
)
c = h2.connection.H2Connection(config=config)
c.initiate_connection()
c.receive_data(frame_factory.preamble())
header_frame = frame_factory.build_headers_frame(
self.base_request_headers
)
c.receive_data(header_frame.serialize())
frame_factory.refresh_encoder()
pp_frame = frame_factory.build_push_promise_frame(
stream_id=1, promised_stream_id=2, headers=headers
)
# Clear the data, then send a push promise.
c.clear_outbound_data_buffer()
c.push_stream(
stream_id=1, promised_stream_id=2, request_headers=headers
)
assert c.data_to_send() == pp_frame.serialize()
@pytest.mark.parametrize('headers', strippable_header_blocks)
def test_strippable_headers(self, frame_factory, headers):
"""
Test connection related headers are removed before sending.
"""
c = h2.connection.H2Connection()
c.initiate_connection()
# Clear the data, then try to send headers.
c.clear_outbound_data_buffer()
c.send_headers(1, headers)
f = frame_factory.build_headers_frame(self.base_request_headers)
assert c.data_to_send() == f.serialize()
class TestFilter(object):
"""
Test the filter function directly.
These tests exists to confirm the behaviour of the filter function in a
wide range of scenarios. Many of these scenarios may not be legal for
HTTP/2 and so may never hit the function, but it's worth validating that it
behaves as expected anyway.
"""
validation_functions = [
h2.utilities.validate_headers,
h2.utilities.validate_outbound_headers
]
hdr_validation_combos = [
h2.utilities.HeaderValidationFlags(
is_client, is_trailer, is_response_header, is_push_promise
)
for is_client, is_trailer, is_response_header, is_push_promise in (
itertools.product([True, False], repeat=4)
)
]
hdr_validation_response_headers = [
flags for flags in hdr_validation_combos
if flags.is_response_header
]
hdr_validation_request_headers_no_trailer = [
flags for flags in hdr_validation_combos
if not (flags.is_trailer or flags.is_response_header)
]
invalid_request_header_blocks_bytes = (
# First, missing :method
(
(b':authority', b'google.com'),
(b':path', b'/'),
(b':scheme', b'https'),
),
# Next, missing :path
(
(b':authority', b'google.com'),
(b':method', b'GET'),
(b':scheme', b'https'),
),
# Next, missing :scheme
(
(b':authority', b'google.com'),
(b':method', b'GET'),
(b':path', b'/'),
),
# Finally, path present but empty.
(
(b':authority', b'google.com'),
(b':method', b'GET'),
(b':scheme', b'https'),
(b':path', b''),
),
)
invalid_request_header_blocks_unicode = (
# First, missing :method
(
(':authority', 'google.com'),
(':path', '/'),
(':scheme', 'https'),
),
# Next, missing :path
(
(':authority', 'google.com'),
(':method', 'GET'),
(':scheme', 'https'),
),
# Next, missing :scheme
(
(':authority', 'google.com'),
(':method', 'GET'),
(':path', '/'),
),
# Finally, path present but empty.
(
(':authority', 'google.com'),
(':method', 'GET'),
(':scheme', 'https'),
(':path', ''),
),
)
# All headers that are forbidden from either request or response blocks.
forbidden_request_headers_bytes = (b':status',)
forbidden_request_headers_unicode = (':status',)
forbidden_response_headers_bytes = (
b':path', b':scheme', b':authority', b':method'
)
forbidden_response_headers_unicode = (
':path', ':scheme', ':authority', ':method'
)
@pytest.mark.parametrize('validation_function', validation_functions)
@pytest.mark.parametrize('hdr_validation_flags', hdr_validation_combos)
@given(headers=HEADERS_STRATEGY)
def test_range_of_acceptable_outputs(self,
headers,
validation_function,
hdr_validation_flags):
"""
The header validation functions either return the data unchanged
or throw a ProtocolError.
"""
try:
assert headers == list(validation_function(
headers, hdr_validation_flags))
except h2.exceptions.ProtocolError:
assert True
@pytest.mark.parametrize('hdr_validation_flags', hdr_validation_combos)
def test_invalid_pseudo_headers(self, hdr_validation_flags):
headers = [(b':custom', b'value')]
with pytest.raises(h2.exceptions.ProtocolError):
list(h2.utilities.validate_headers(headers, hdr_validation_flags))
@pytest.mark.parametrize('validation_function', validation_functions)
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_request_headers_no_trailer
)
def test_matching_authority_host_headers(self,
validation_function,
hdr_validation_flags):
"""
If a header block has :authority and Host headers and they match,
the headers should pass through unchanged.
"""
headers = [
(b':authority', b'example.com'),
(b':path', b'/'),
(b':scheme', b'https'),
(b':method', b'GET'),
(b'host', b'example.com'),
]
assert headers == list(h2.utilities.validate_headers(
headers, hdr_validation_flags
))
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_response_headers
)
def test_response_header_without_status(self, hdr_validation_flags):
headers = [(b'content-length', b'42')]
with pytest.raises(h2.exceptions.ProtocolError):
list(h2.utilities.validate_headers(headers, hdr_validation_flags))
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_request_headers_no_trailer
)
@pytest.mark.parametrize(
'header_block',
(
invalid_request_header_blocks_bytes +
invalid_request_header_blocks_unicode
)
)
def test_outbound_req_header_missing_pseudo_headers(self,
hdr_validation_flags,
header_block):
with pytest.raises(h2.exceptions.ProtocolError):
list(
h2.utilities.validate_outbound_headers(
header_block, hdr_validation_flags
)
)
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_request_headers_no_trailer
)
@pytest.mark.parametrize(
'header_block', invalid_request_header_blocks_bytes
)
def test_inbound_req_header_missing_pseudo_headers(self,
hdr_validation_flags,
header_block):
with pytest.raises(h2.exceptions.ProtocolError):
list(
h2.utilities.validate_headers(
header_block, hdr_validation_flags
)
)
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_request_headers_no_trailer
)
@pytest.mark.parametrize(
'invalid_header',
forbidden_request_headers_bytes + forbidden_request_headers_unicode
)
def test_outbound_req_header_extra_pseudo_headers(self,
hdr_validation_flags,
invalid_header):
"""
Outbound request header blocks containing the forbidden request headers
fail validation.
"""
headers = [
(b':path', b'/'),
(b':scheme', b'https'),
(b':authority', b'google.com'),
(b':method', b'GET'),
]
headers.append((invalid_header, b'some value'))
with pytest.raises(h2.exceptions.ProtocolError):
list(
h2.utilities.validate_outbound_headers(
headers, hdr_validation_flags
)
)
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_request_headers_no_trailer
)
@pytest.mark.parametrize(
'invalid_header',
forbidden_request_headers_bytes
)
def test_inbound_req_header_extra_pseudo_headers(self,
hdr_validation_flags,
invalid_header):
"""
Inbound request header blocks containing the forbidden request headers
fail validation.
"""
headers = [
(b':path', b'/'),
(b':scheme', b'https'),
(b':authority', b'google.com'),
(b':method', b'GET'),
]
headers.append((invalid_header, b'some value'))
with pytest.raises(h2.exceptions.ProtocolError):
list(h2.utilities.validate_headers(headers, hdr_validation_flags))
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_response_headers
)
@pytest.mark.parametrize(
'invalid_header',
forbidden_response_headers_bytes + forbidden_response_headers_unicode
)
def test_outbound_resp_header_extra_pseudo_headers(self,
hdr_validation_flags,
invalid_header):
"""
Outbound response header blocks containing the forbidden response
headers fail validation.
"""
headers = [(b':status', b'200')]
headers.append((invalid_header, b'some value'))
with pytest.raises(h2.exceptions.ProtocolError):
list(
h2.utilities.validate_outbound_headers(
headers, hdr_validation_flags
)
)
@pytest.mark.parametrize(
'hdr_validation_flags', hdr_validation_response_headers
)
@pytest.mark.parametrize(
'invalid_header',
forbidden_response_headers_bytes
)
def test_inbound_resp_header_extra_pseudo_headers(self,
hdr_validation_flags,
invalid_header):
"""
Inbound response header blocks containing the forbidden response
headers fail validation.
"""
headers = [(b':status', b'200')]
headers.append((invalid_header, b'some value'))
with pytest.raises(h2.exceptions.ProtocolError):
list(h2.utilities.validate_headers(headers, hdr_validation_flags))
@pytest.mark.parametrize('hdr_validation_flags', hdr_validation_combos)
def test_inbound_header_name_length(self, hdr_validation_flags):
with pytest.raises(h2.exceptions.ProtocolError):
list(h2.utilities.validate_headers([(b'', b'foobar')], hdr_validation_flags))
def test_inbound_header_name_length_full_frame_decode(self, frame_factory):
f = frame_factory.build_headers_frame([])
f.data = b"\x00\x00\x05\x00\x00\x00\x00\x04"
data = f.serialize()
c = h2.connection.H2Connection(config=h2.config.H2Configuration(client_side=False))
c.initiate_connection()
c.receive_data(frame_factory.preamble())
c.clear_outbound_data_buffer()
with pytest.raises(h2.exceptions.ProtocolError, match="Received header name with zero length."):
c.receive_data(data)
class TestOversizedHeaders(object):
"""
Tests that oversized header blocks are correctly rejected. This replicates
the "HPACK Bomb" attack, and confirms that we're resistant against it.
"""
request_header_block = [
(b':method', b'GET'),
(b':authority', b'example.com'),
(b':scheme', b'https'),
(b':path', b'/'),
]
response_header_block = [
(b':status', b'200'),
]
# The first header block contains a single header that fills the header
# table. To do that, we'll give it a single-character header name and a
# 4063 byte header value. This will make it exactly the size of the header
# table. It must come last, so that it evicts all other headers.
# This block must be appended to either a request or response block.
first_header_block = [
(b'a', b'a' * 4063),
]
# The second header "block" is actually a custom HEADERS frame body that
# simply repeatedly refers to the first entry for 16kB. Each byte has the
# high bit set (0x80), and then uses the remaining 7 bits to encode the
# number 62 (0x3e), leading to a repeat of the byte 0xbe.
second_header_block = b'\xbe' * 2**14
server_config = h2.config.H2Configuration(client_side=False)
def test_hpack_bomb_request(self, frame_factory):
"""
A HPACK bomb request causes the connection to be torn down with the
error code ENHANCE_YOUR_CALM.
"""
c = h2.connection.H2Connection(config=self.server_config)
c.receive_data(frame_factory.preamble())
c.clear_outbound_data_buffer()
f = frame_factory.build_headers_frame(
self.request_header_block + self.first_header_block
)
data = f.serialize()
c.receive_data(data)
# Build the attack payload.
attack_frame = hyperframe.frame.HeadersFrame(stream_id=3)
attack_frame.data = self.second_header_block
attack_frame.flags.add('END_HEADERS')
data = attack_frame.serialize()
with pytest.raises(h2.exceptions.DenialOfServiceError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=1, error_code=h2.errors.ErrorCodes.ENHANCE_YOUR_CALM
)
assert c.data_to_send() == expected_frame.serialize()
def test_hpack_bomb_response(self, frame_factory):
"""
A HPACK bomb response causes the connection to be torn down with the
error code ENHANCE_YOUR_CALM.
"""
c = h2.connection.H2Connection()
c.initiate_connection()
c.send_headers(
stream_id=1, headers=self.request_header_block
)
c.send_headers(
stream_id=3, headers=self.request_header_block
)
c.clear_outbound_data_buffer()
f = frame_factory.build_headers_frame(
self.response_header_block + self.first_header_block
)
data = f.serialize()
c.receive_data(data)
# Build the attack payload.
attack_frame = hyperframe.frame.HeadersFrame(stream_id=3)
attack_frame.data = self.second_header_block
attack_frame.flags.add('END_HEADERS')
data = attack_frame.serialize()
with pytest.raises(h2.exceptions.DenialOfServiceError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=0, error_code=h2.errors.ErrorCodes.ENHANCE_YOUR_CALM
)
assert c.data_to_send() == expected_frame.serialize()
def test_hpack_bomb_push(self, frame_factory):
"""
A HPACK bomb push causes the connection to be torn down with the
error code ENHANCE_YOUR_CALM.
"""
c = h2.connection.H2Connection()
c.initiate_connection()
c.send_headers(
stream_id=1, headers=self.request_header_block
)
c.clear_outbound_data_buffer()
f = frame_factory.build_headers_frame(
self.response_header_block + self.first_header_block
)
data = f.serialize()
c.receive_data(data)
# Build the attack payload. We need to shrink it by four bytes because
# the promised_stream_id consumes four bytes of body.
attack_frame = hyperframe.frame.PushPromiseFrame(stream_id=3)
attack_frame.promised_stream_id = 2
attack_frame.data = self.second_header_block[:-4]
attack_frame.flags.add('END_HEADERS')
data = attack_frame.serialize()
with pytest.raises(h2.exceptions.DenialOfServiceError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=0, error_code=h2.errors.ErrorCodes.ENHANCE_YOUR_CALM
)
assert c.data_to_send() == expected_frame.serialize()
def test_reject_headers_when_list_size_shrunk(self, frame_factory):
"""
When we've shrunk the header list size, we reject new header blocks
that violate the new size.
"""
c = h2.connection.H2Connection(config=self.server_config)
c.receive_data(frame_factory.preamble())
c.clear_outbound_data_buffer()
# Receive the first request, which causes no problem.
f = frame_factory.build_headers_frame(
stream_id=1,
headers=self.request_header_block
)
data = f.serialize()
c.receive_data(data)
# Now, send a settings change. It's un-ACKed at this time. A new
# request arrives, also without incident.
c.update_settings({h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE: 50})
c.clear_outbound_data_buffer()
f = frame_factory.build_headers_frame(
stream_id=3,
headers=self.request_header_block
)
data = f.serialize()
c.receive_data(data)
# We get a SETTINGS ACK.
f = frame_factory.build_settings_frame({}, ack=True)
data = f.serialize()
c.receive_data(data)
# Now a third request comes in. This explodes.
f = frame_factory.build_headers_frame(
stream_id=5,
headers=self.request_header_block
)
data = f.serialize()
with pytest.raises(h2.exceptions.DenialOfServiceError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=3, error_code=h2.errors.ErrorCodes.ENHANCE_YOUR_CALM
)
assert c.data_to_send() == expected_frame.serialize()
def test_reject_headers_when_table_size_shrunk(self, frame_factory):
"""
When we've shrunk the header table size, we reject header blocks that
do not respect the change.
"""
c = h2.connection.H2Connection(config=self.server_config)
c.receive_data(frame_factory.preamble())
c.clear_outbound_data_buffer()
# Receive the first request, which causes no problem.
f = frame_factory.build_headers_frame(
stream_id=1,
headers=self.request_header_block
)
data = f.serialize()
c.receive_data(data)
# Now, send a settings change. It's un-ACKed at this time. A new
# request arrives, also without incident.
c.update_settings({h2.settings.SettingCodes.HEADER_TABLE_SIZE: 128})
c.clear_outbound_data_buffer()
f = frame_factory.build_headers_frame(
stream_id=3,
headers=self.request_header_block
)
data = f.serialize()
c.receive_data(data)
# We get a SETTINGS ACK.
f = frame_factory.build_settings_frame({}, ack=True)
data = f.serialize()
c.receive_data(data)
# Now a third request comes in. This explodes, as it does not contain
# a dynamic table size update.
f = frame_factory.build_headers_frame(
stream_id=5,
headers=self.request_header_block
)
data = f.serialize()
with pytest.raises(h2.exceptions.ProtocolError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=3, error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR
)
assert c.data_to_send() == expected_frame.serialize()
def test_reject_headers_exceeding_table_size(self, frame_factory):
"""
When the remote peer sends a dynamic table size update that exceeds our
setting, we reject it.
"""
c = h2.connection.H2Connection(config=self.server_config)
c.receive_data(frame_factory.preamble())
c.clear_outbound_data_buffer()
# Receive the first request, which causes no problem.
f = frame_factory.build_headers_frame(
stream_id=1,
headers=self.request_header_block
)
data = f.serialize()
c.receive_data(data)
# Now a second request comes in that sets the table size too high.
# This explodes.
frame_factory.change_table_size(c.local_settings.header_table_size + 1)
f = frame_factory.build_headers_frame(
stream_id=5,
headers=self.request_header_block
)
data = f.serialize()
with pytest.raises(h2.exceptions.ProtocolError):
c.receive_data(data)
expected_frame = frame_factory.build_goaway_frame(
last_stream_id=1, error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR
)
assert c.data_to_send() == expected_frame.serialize()