Source code
Revision control
Copy as Markdown
Other Tools
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::{
cell::RefCell,
collections::VecDeque,
fmt::{Debug, Display},
mem,
rc::Rc,
slice::SliceIndex,
};
use neqo_common::{qerror, qinfo, qtrace, Header};
use neqo_transport::{Connection, StreamId};
use crate::{
client_events::{Http3ClientEvent, Http3ClientEvents},
connection::Http3Connection,
frames::HFrame,
CloseType, Error, Http3StreamInfo, HttpRecvStreamEvents, RecvStreamEvents, Res,
};
/// `PushStates`:
/// `Init`: there is no push stream nor a push promise. This state is only used to keep track of
/// opened and closed push streams.
/// `PushPromise`: the push has only ever receive a pushpromise frame
/// `OnlyPushStream`: there is only a push stream. All push stream events, i.e. `PushHeaderReady`
/// and `PushDataReadable` will be delayed until a push promise is received
/// (they are kept in `events`).
/// `Active`: there is a push steam and at least one push promise frame.
/// `Close`: the push stream has been closed or reset already.
#[derive(Debug, PartialEq, Clone)]
enum PushState {
Init,
PushPromise {
headers: Vec<Header>,
},
OnlyPushStream {
stream_id: StreamId,
events: Vec<Http3ClientEvent>,
},
Active {
stream_id: StreamId,
headers: Vec<Header>,
},
Closed,
}
/// `ActivePushStreams` holds information about push streams.
///
/// `first_push_id` holds a `push_id` of the first element in `push_streams` if it is present.
/// `push_id` smaller than `first_push_id` have been already closed. `push_id` => `first_push_id`
/// are in `push_streams` or they are not yet opened.
#[derive(Debug)]
struct ActivePushStreams {
push_streams: VecDeque<PushState>,
first_push_id: u64,
}
impl ActivePushStreams {
pub const fn new() -> Self {
Self {
push_streams: VecDeque::new(),
first_push_id: 0,
}
}
/// Returns None if a stream has been closed already.
pub fn get_mut(
&mut self,
push_id: u64,
) -> Option<&mut <usize as SliceIndex<[PushState]>>::Output> {
if push_id < self.first_push_id {
return None;
}
let inx = usize::try_from(push_id - self.first_push_id).unwrap();
if inx >= self.push_streams.len() {
self.push_streams.resize(inx + 1, PushState::Init);
}
match self.push_streams.get_mut(inx) {
Some(PushState::Closed) => None,
e => e,
}
}
/// Returns None if a stream has been closed already.
pub fn get(&mut self, push_id: u64) -> Option<&mut PushState> {
self.get_mut(push_id)
}
/// Returns the State of a closed push stream or None for already closed streams.
pub fn close(&mut self, push_id: u64) -> Option<PushState> {
match self.get_mut(push_id) {
None | Some(PushState::Closed) => None,
Some(s) => {
let res = mem::replace(s, PushState::Closed);
while self.push_streams.front() == Some(&PushState::Closed) {
self.push_streams.pop_front();
self.first_push_id += 1;
}
Some(res)
}
}
}
#[must_use]
pub fn number_done(&self) -> u64 {
self.first_push_id
+ u64::try_from(
self.push_streams
.iter()
.filter(|&e| e == &PushState::Closed)
.count(),
)
.unwrap()
}
pub fn clear(&mut self) {
self.first_push_id = 0;
self.push_streams.clear();
}
}
/// `PushController` keeps information about push stream states.
///
/// A `PushStream` calls `add_new_push_stream` that may change the push state from Init to
/// `OnlyPushStream` or from `PushPromise` to `Active`. If a stream has already been closed
/// `add_new_push_stream` returns false (the `PushStream` will close the transport stream).
/// A `PushStream` calls `push_stream_reset` if the transport stream has been canceled.
/// When a push stream is done it calls `close`.
///
/// The `PushController` handles:
/// `PUSH_PROMISE` frame: frames may change the push state from Init to `PushPromise` and from
/// `OnlyPushStream` to `Active`. Frames for a closed steams are ignored.
/// `CANCEL_PUSH` frame: (`handle_cancel_push` will be called). If a push is in state `PushPromise`
/// or `Active`, any posted events will be removed and a `PushCanceled` event
/// will be posted. If a push is in state `OnlyPushStream` or `Active` the
/// transport stream and the `PushStream` will be closed. The frame will be
/// ignored for already closed pushes. Application calling cancel: the actions are similar to the
/// `CANCEL_PUSH` frame. The difference is that `PushCanceled` will not
/// be posted and a `CANCEL_PUSH` frame may be sent.
#[derive(Debug)]
pub struct PushController {
max_concurent_push: u64,
current_max_push_id: u64,
// push_streams holds the states of push streams.
// We keep a stream until the stream has been closed.
push_streams: ActivePushStreams,
// The keeps the next consecutive push_id that should be open.
// All push_id < next_push_id_to_open are in the push_stream lists. If they are not in the list
// they have been already closed.
conn_events: Http3ClientEvents,
}
impl PushController {
pub const fn new(max_concurent_push: u64, conn_events: Http3ClientEvents) -> Self {
Self {
max_concurent_push,
current_max_push_id: 0,
push_streams: ActivePushStreams::new(),
conn_events,
}
}
}
impl Display for PushController {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "Push controler")
}
}
impl PushController {
/// A new `push_promise` has been received.
///
/// # Errors
///
/// `HttpId` if `push_id` greater than it is allowed has been received.
pub fn new_push_promise(
&mut self,
push_id: u64,
ref_stream_id: StreamId,
new_headers: Vec<Header>,
) -> Res<()> {
qtrace!(
[self],
"New push promise push_id={} headers={:?} max_push={}",
push_id,
new_headers,
self.max_concurent_push
);
self.check_push_id(push_id)?;
match self.push_streams.get_mut(push_id) {
None => {
qtrace!("Push has been closed already {}.", push_id);
Ok(())
}
Some(push_state) => match push_state {
PushState::Init => {
self.conn_events
.push_promise(push_id, ref_stream_id, new_headers.clone());
*push_state = PushState::PushPromise {
headers: new_headers,
};
Ok(())
}
PushState::PushPromise { headers } | PushState::Active { headers, .. } => {
if new_headers != *headers {
return Err(Error::HttpGeneralProtocol);
}
self.conn_events
.push_promise(push_id, ref_stream_id, new_headers);
Ok(())
}
PushState::OnlyPushStream { stream_id, events } => {
let stream_id_tmp = *stream_id;
self.conn_events
.push_promise(push_id, ref_stream_id, new_headers.clone());
for e in events.drain(..) {
self.conn_events.insert(e);
}
*push_state = PushState::Active {
stream_id: stream_id_tmp,
headers: new_headers,
};
Ok(())
}
PushState::Closed => unreachable!("This is only internal; it is transfer to None"),
},
}
}
pub fn add_new_push_stream(&mut self, push_id: u64, stream_id: StreamId) -> Res<bool> {
qtrace!(
"A new push stream with push_id={} stream_id={}",
push_id,
stream_id
);
self.check_push_id(push_id)?;
self.push_streams.get_mut(push_id).map_or_else(
|| {
qinfo!("Push has been closed already.");
Ok(false)
},
|push_state| match push_state {
PushState::Init => {
*push_state = PushState::OnlyPushStream {
stream_id,
events: Vec::new(),
};
Ok(true)
}
PushState::PushPromise { headers } => {
let tmp = mem::take(headers);
*push_state = PushState::Active {
stream_id,
headers: tmp,
};
Ok(true)
}
// The following state have already have a push stream:
// PushState::OnlyPushStream | PushState::Active
_ => {
qerror!("Duplicate push stream.");
Err(Error::HttpId)
}
},
)
}
fn check_push_id(&self, push_id: u64) -> Res<()> {
// Check if push id is greater than what we allow.
if push_id > self.current_max_push_id {
qerror!("Push id is greater than current_max_push_id.");
Err(Error::HttpId)
} else {
Ok(())
}
}
pub fn handle_cancel_push(
&mut self,
push_id: u64,
conn: &mut Connection,
base_handler: &mut Http3Connection,
) -> Res<()> {
qtrace!("CANCEL_PUSH frame has been received, push_id={}", push_id);
self.check_push_id(push_id)?;
match self.push_streams.close(push_id) {
None => {
qtrace!("Push has already been closed (push_id={}).", push_id);
Ok(())
}
Some(ps) => match ps {
PushState::Init => Ok(()),
PushState::PushPromise { .. } => {
self.conn_events.remove_events_for_push_id(push_id);
self.conn_events.push_canceled(push_id);
Ok(())
}
PushState::OnlyPushStream { stream_id, .. }
| PushState::Active { stream_id, .. } => {
mem::drop(base_handler.stream_stop_sending(
conn,
stream_id,
Error::HttpRequestCancelled.code(),
));
self.conn_events.remove_events_for_push_id(push_id);
self.conn_events.push_canceled(push_id);
Ok(())
}
PushState::Closed => unreachable!("This is only internal; it is transfer to None"),
},
}
}
pub fn close(&mut self, push_id: u64) {
qtrace!("Push stream has been closed.");
if let Some(push_state) = self.push_streams.close(push_id) {
debug_assert!(matches!(push_state, PushState::Active { .. }));
} else {
debug_assert!(false, "Closing non existing push stream!");
}
}
pub fn cancel(
&mut self,
push_id: u64,
conn: &mut Connection,
base_handler: &mut Http3Connection,
) -> Res<()> {
qtrace!("Cancel push_id={}", push_id);
self.check_push_id(push_id)?;
match self.push_streams.get(push_id) {
None => {
qtrace!("Push has already been closed.");
// If we have some events for the push_id in the event queue, the caller still does
// not not know that the push has been closed. Otherwise return
// InvalidStreamId.
if self.conn_events.has_push(push_id) {
self.conn_events.remove_events_for_push_id(push_id);
Ok(())
} else {
Err(Error::InvalidStreamId)
}
}
Some(PushState::PushPromise { .. }) => {
self.conn_events.remove_events_for_push_id(push_id);
base_handler.queue_control_frame(&HFrame::CancelPush { push_id });
self.push_streams.close(push_id);
Ok(())
}
Some(PushState::Active { stream_id, .. }) => {
self.conn_events.remove_events_for_push_id(push_id);
// Cancel the stream. the transport steam may already be done, so ignore an error.
mem::drop(base_handler.stream_stop_sending(
conn,
*stream_id,
Error::HttpRequestCancelled.code(),
));
self.push_streams.close(push_id);
Ok(())
}
Some(_) => Err(Error::InvalidStreamId),
}
}
pub fn push_stream_reset(&mut self, push_id: u64, close_type: CloseType) {
qtrace!("Push stream has been reset, push_id={}", push_id);
if let Some(push_state) = self.push_streams.get(push_id) {
match push_state {
PushState::OnlyPushStream { .. } => {
self.push_streams.close(push_id);
}
PushState::Active { .. } => {
self.push_streams.close(push_id);
self.conn_events.remove_events_for_push_id(push_id);
if let CloseType::LocalError(app_error) = close_type {
self.conn_events.push_reset(push_id, app_error);
} else {
self.conn_events.push_canceled(push_id);
}
}
_ => {
debug_assert!(
false,
"Reset cannot actually happen because we do not have a stream."
);
}
}
}
}
pub fn get_active_stream_id(&mut self, push_id: u64) -> Option<StreamId> {
match self.push_streams.get(push_id) {
Some(PushState::Active { stream_id, .. }) => Some(*stream_id),
_ => None,
}
}
pub fn maybe_send_max_push_id_frame(&mut self, base_handler: &mut Http3Connection) {
let push_done = self.push_streams.number_done();
if self.max_concurent_push > 0
&& (self.current_max_push_id - push_done) <= (self.max_concurent_push / 2)
{
self.current_max_push_id = push_done + self.max_concurent_push;
base_handler.queue_control_frame(&HFrame::MaxPushId {
push_id: self.current_max_push_id,
});
}
}
pub fn handle_zero_rtt_rejected(&mut self) {
self.current_max_push_id = 0;
}
pub fn clear(&mut self) {
self.push_streams.clear();
}
pub const fn can_receive_push(&self) -> bool {
self.max_concurent_push > 0
}
pub fn new_stream_event(&mut self, push_id: u64, event: Http3ClientEvent) {
match self.push_streams.get_mut(push_id) {
None => {
debug_assert!(false, "Push has been closed already.");
}
Some(PushState::OnlyPushStream { events, .. }) => {
events.push(event);
}
Some(PushState::Active { .. }) => {
self.conn_events.insert(event);
}
Some(_) => {
debug_assert!(false, "No record of a stream!");
}
}
}
}
/// `RecvPushEvents` relays a push stream events to `PushController`.
/// It informs `PushController` when a push stream is done or canceled.
/// Also when headers or data is ready and `PushController` decide whether to post
/// `PushHeaderReady` and `PushDataReadable` events or to postpone them if
/// a `push_promise` has not been yet received for the stream.
#[derive(Debug)]
pub struct RecvPushEvents {
push_id: u64,
push_handler: Rc<RefCell<PushController>>,
}
impl RecvPushEvents {
pub const fn new(push_id: u64, push_handler: Rc<RefCell<PushController>>) -> Self {
Self {
push_id,
push_handler,
}
}
}
impl RecvStreamEvents for RecvPushEvents {
fn data_readable(&self, _stream_info: Http3StreamInfo) {
self.push_handler.borrow_mut().new_stream_event(
self.push_id,
Http3ClientEvent::PushDataReadable {
push_id: self.push_id,
},
);
}
fn recv_closed(&self, _stream_info: Http3StreamInfo, close_type: CloseType) {
match close_type {
CloseType::ResetApp(_) => {}
CloseType::ResetRemote(_) | CloseType::LocalError(_) => self
.push_handler
.borrow_mut()
.push_stream_reset(self.push_id, close_type),
CloseType::Done => self.push_handler.borrow_mut().close(self.push_id),
}
}
}
impl HttpRecvStreamEvents for RecvPushEvents {
fn header_ready(
&self,
_stream_info: Http3StreamInfo,
headers: Vec<Header>,
interim: bool,
fin: bool,
) {
self.push_handler.borrow_mut().new_stream_event(
self.push_id,
Http3ClientEvent::PushHeaderReady {
push_id: self.push_id,
headers,
interim,
fin,
},
);
}
}