Revision control

Copy as Markdown

Other Tools

// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Congestion control
use std::{
cmp::{max, min},
fmt::{self, Debug, Display},
time::{Duration, Instant},
};
use super::CongestionControl;
use crate::{
packet::PacketNumber,
qlog::{self, QlogMetric},
recovery::SentPacket,
rtt::RttEstimate,
sender::PACING_BURST_SIZE,
Pmtud,
};
#[rustfmt::skip] // to keep `::` and thus prevent conflict with `crate::qlog`
use ::qlog::events::{quic::CongestionStateUpdated, EventData};
use neqo_common::{const_max, const_min, qdebug, qinfo, qlog::NeqoQlog, qtrace};
pub const CWND_INITIAL_PKTS: usize = 10;
const PERSISTENT_CONG_THRESH: u32 = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
/// In either slow start or congestion avoidance, not recovery.
SlowStart,
/// In congestion avoidance.
CongestionAvoidance,
/// In a recovery period, but no packets have been sent yet. This is a
/// transient state because we want to exempt the first packet sent after
/// entering recovery from the congestion window.
RecoveryStart,
/// In a recovery period, with the first packet sent at this time.
Recovery,
/// Start of persistent congestion, which is transient, like `RecoveryStart`.
PersistentCongestion,
}
impl State {
pub const fn in_recovery(self) -> bool {
matches!(self, Self::RecoveryStart | Self::Recovery)
}
pub fn in_slow_start(self) -> bool {
self == Self::SlowStart
}
/// These states are transient, we tell qlog on entry, but not on exit.
pub const fn transient(self) -> bool {
matches!(self, Self::RecoveryStart | Self::PersistentCongestion)
}
/// Update a transient state to the true state.
pub fn update(&mut self) {
*self = match self {
Self::PersistentCongestion => Self::SlowStart,
Self::RecoveryStart => Self::Recovery,
_ => unreachable!(),
};
}
pub const fn to_qlog(self) -> &'static str {
match self {
Self::SlowStart | Self::PersistentCongestion => "slow_start",
Self::CongestionAvoidance => "congestion_avoidance",
Self::Recovery | Self::RecoveryStart => "recovery",
}
}
}
pub trait WindowAdjustment: Display + Debug {
/// This is called when an ack is received.
/// The function calculates the amount of acked bytes congestion controller needs
/// to collect before increasing its cwnd by `MAX_DATAGRAM_SIZE`.
fn bytes_for_cwnd_increase(
&mut self,
curr_cwnd: usize,
new_acked_bytes: usize,
min_rtt: Duration,
max_datagram_size: usize,
now: Instant,
) -> usize;
/// This function is called when a congestion event has beed detected and it
/// returns new (decreased) values of `curr_cwnd` and `acked_bytes`.
/// This value can be very small; the calling code is responsible for ensuring that the
/// congestion window doesn't drop below the minimum of `CWND_MIN`.
fn reduce_cwnd(
&mut self,
curr_cwnd: usize,
acked_bytes: usize,
max_datagram_size: usize,
) -> (usize, usize);
/// Cubic needs this signal to reset its epoch.
fn on_app_limited(&mut self);
#[cfg(test)]
fn last_max_cwnd(&self) -> f64;
#[cfg(test)]
fn set_last_max_cwnd(&mut self, last_max_cwnd: f64);
}
#[derive(Debug)]
pub struct ClassicCongestionControl<T> {
cc_algorithm: T,
state: State,
congestion_window: usize, // = kInitialWindow
bytes_in_flight: usize,
acked_bytes: usize,
ssthresh: usize,
recovery_start: Option<PacketNumber>,
/// `first_app_limited` indicates the packet number after which the application might be
/// underutilizing the congestion window. When underutilizing the congestion window due to not
/// sending out enough data, we SHOULD NOT increase the congestion window.[1] Packets sent
/// before this point are deemed to fully utilize the congestion window and count towards
/// increasing the congestion window.
///
first_app_limited: PacketNumber,
pmtud: Pmtud,
qlog: NeqoQlog,
}
impl<T> ClassicCongestionControl<T> {
pub const fn max_datagram_size(&self) -> usize {
self.pmtud.plpmtu()
}
}
impl<T: WindowAdjustment> Display for ClassicCongestionControl<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{} CongCtrl {}/{} ssthresh {}",
self.cc_algorithm, self.bytes_in_flight, self.congestion_window, self.ssthresh,
)?;
Ok(())
}
}
impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
fn set_qlog(&mut self, qlog: NeqoQlog) {
self.qlog = qlog;
}
#[must_use]
fn cwnd(&self) -> usize {
self.congestion_window
}
#[must_use]
fn bytes_in_flight(&self) -> usize {
self.bytes_in_flight
}
#[must_use]
fn cwnd_avail(&self) -> usize {
// BIF can be higher than cwnd due to PTO packets, which are sent even
// if avail is 0, but still count towards BIF.
self.congestion_window.saturating_sub(self.bytes_in_flight)
}
#[must_use]
fn cwnd_min(&self) -> usize {
self.max_datagram_size() * 2
}
#[cfg(test)]
#[must_use]
fn cwnd_initial(&self) -> usize {
cwnd_initial(self.pmtud.plpmtu())
}
fn pmtud(&self) -> &Pmtud {
&self.pmtud
}
fn pmtud_mut(&mut self) -> &mut Pmtud {
&mut self.pmtud
}
// Multi-packet version of OnPacketAckedCC
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], rtt_est: &RttEstimate, now: Instant) {
let mut is_app_limited = true;
let mut new_acked = 0;
for pkt in acked_pkts {
qtrace!(
"packet_acked this={:p}, pn={}, ps={}, ignored={}, lost={}, rtt_est={:?}",
self,
pkt.pn(),
pkt.len(),
i32::from(!pkt.cc_outstanding()),
i32::from(pkt.lost()),
rtt_est,
);
if !pkt.cc_outstanding() {
continue;
}
if pkt.pn() < self.first_app_limited {
is_app_limited = false;
}
// BIF is set to 0 on a path change, but in case that was because of a simple rebinding
// event, we may still get ACKs for packets sent before the rebinding.
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(pkt.len());
if !self.after_recovery_start(pkt) {
// Do not increase congestion window for packets sent before
// recovery last started.
continue;
}
if self.state.in_recovery() {
self.set_state(State::CongestionAvoidance);
qlog::metrics_updated(&self.qlog, &[QlogMetric::InRecovery(false)]);
}
new_acked += pkt.len();
}
if is_app_limited {
self.cc_algorithm.on_app_limited();
qdebug!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
return;
}
// Slow start, up to the slow start threshold.
if self.congestion_window < self.ssthresh {
self.acked_bytes += new_acked;
let increase = min(self.ssthresh - self.congestion_window, self.acked_bytes);
self.congestion_window += increase;
self.acked_bytes -= increase;
qdebug!([self], "slow start += {}", increase);
if self.congestion_window == self.ssthresh {
// This doesn't look like it is necessary, but it can happen
// after persistent congestion.
self.set_state(State::CongestionAvoidance);
}
}
// Congestion avoidance, above the slow start threshold.
if self.congestion_window >= self.ssthresh {
// The following function return the amount acked bytes a controller needs
// to collect to be allowed to increase its cwnd by MAX_DATAGRAM_SIZE.
let bytes_for_increase = self.cc_algorithm.bytes_for_cwnd_increase(
self.congestion_window,
new_acked,
rtt_est.minimum(),
self.max_datagram_size(),
now,
);
debug_assert!(bytes_for_increase > 0);
// If enough credit has been accumulated already, apply them gradually.
// If we have sudden increase in allowed rate we actually increase cwnd gently.
if self.acked_bytes >= bytes_for_increase {
self.acked_bytes = 0;
self.congestion_window += self.max_datagram_size();
}
self.acked_bytes += new_acked;
if self.acked_bytes >= bytes_for_increase {
self.acked_bytes -= bytes_for_increase;
self.congestion_window += self.max_datagram_size(); // or is this the current MTU?
}
// The number of bytes we require can go down over time with Cubic.
// That might result in an excessive rate of increase, so limit the number of unused
// acknowledged bytes after increasing the congestion window twice.
self.acked_bytes = min(bytes_for_increase, self.acked_bytes);
}
qlog::metrics_updated(
&self.qlog,
&[
QlogMetric::CongestionWindow(self.congestion_window),
QlogMetric::BytesInFlight(self.bytes_in_flight),
],
);
qdebug!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
}
/// Update congestion controller state based on lost packets.
fn on_packets_lost(
&mut self,
first_rtt_sample_time: Option<Instant>,
prev_largest_acked_sent: Option<Instant>,
pto: Duration,
lost_packets: &[SentPacket],
) -> bool {
if lost_packets.is_empty() {
return false;
}
for pkt in lost_packets.iter().filter(|pkt| pkt.cc_in_flight()) {
qdebug!(
"packet_lost this={:p}, pn={}, ps={}",
self,
pkt.pn(),
pkt.len()
);
// BIF is set to 0 on a path change, but in case that was because of a simple rebinding
// event, we may still declare packets lost that were sent before the rebinding.
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(pkt.len());
}
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
);
let is_pmtud_probe = self.pmtud.is_probe_filter();
let mut lost_packets = lost_packets
.iter()
.filter(|pkt| !is_pmtud_probe(pkt))
.rev()
.peekable();
// Lost PMTUD probes do not elicit a congestion control reaction.
let Some(last_lost_packet) = lost_packets.peek() else {
return false;
};
let congestion = self.on_congestion_event(last_lost_packet);
let persistent_congestion = self.detect_persistent_congestion(
first_rtt_sample_time,
prev_largest_acked_sent,
pto,
lost_packets.rev(),
);
qdebug!(
"on_packets_lost this={:p}, bytes_in_flight={}, cwnd={}, state={:?}",
self,
self.bytes_in_flight,
self.congestion_window,
self.state
);
congestion || persistent_congestion
}
/// Report received ECN CE mark(s) to the congestion controller as a
/// congestion event.
///
fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool {
self.on_congestion_event(largest_acked_pkt)
}
fn discard(&mut self, pkt: &SentPacket) {
if pkt.cc_outstanding() {
assert!(self.bytes_in_flight >= pkt.len());
self.bytes_in_flight -= pkt.len();
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
);
qtrace!([self], "Ignore pkt with size {}", pkt.len());
}
}
fn discard_in_flight(&mut self) {
self.bytes_in_flight = 0;
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
);
}
fn on_packet_sent(&mut self, pkt: &SentPacket) {
// Record the recovery time and exit any transient state.
if self.state.transient() {
self.recovery_start = Some(pkt.pn());
self.state.update();
}
if !pkt.cc_in_flight() {
return;
}
if !self.app_limited() {
// Given the current non-app-limited condition, we're fully utilizing the congestion
// window. Assume that all in-flight packets up to this one are NOT app-limited.
// However, subsequent packets might be app-limited. Set `first_app_limited` to the
// next packet number.
self.first_app_limited = pkt.pn() + 1;
}
self.bytes_in_flight += pkt.len();
qdebug!(
"packet_sent this={:p}, pn={}, ps={}",
self,
pkt.pn(),
pkt.len()
);
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
);
}
/// Whether a packet can be sent immediately as a result of entering recovery.
fn recovery_packet(&self) -> bool {
self.state == State::RecoveryStart
}
}
const fn cwnd_initial(mtu: usize) -> usize {
const_min(CWND_INITIAL_PKTS * mtu, const_max(2 * mtu, 14_720))
}
impl<T: WindowAdjustment> ClassicCongestionControl<T> {
pub fn new(cc_algorithm: T, pmtud: Pmtud) -> Self {
Self {
cc_algorithm,
state: State::SlowStart,
congestion_window: cwnd_initial(pmtud.plpmtu()),
bytes_in_flight: 0,
acked_bytes: 0,
ssthresh: usize::MAX,
recovery_start: None,
qlog: NeqoQlog::disabled(),
first_app_limited: 0,
pmtud,
}
}
#[cfg(test)]
#[must_use]
pub const fn ssthresh(&self) -> usize {
self.ssthresh
}
#[cfg(test)]
pub fn set_ssthresh(&mut self, v: usize) {
self.ssthresh = v;
}
#[cfg(test)]
pub fn last_max_cwnd(&self) -> f64 {
self.cc_algorithm.last_max_cwnd()
}
#[cfg(test)]
pub fn set_last_max_cwnd(&mut self, last_max_cwnd: f64) {
self.cc_algorithm.set_last_max_cwnd(last_max_cwnd);
}
#[cfg(test)]
pub const fn acked_bytes(&self) -> usize {
self.acked_bytes
}
fn set_state(&mut self, state: State) {
if self.state != state {
qdebug!([self], "state -> {:?}", state);
let old_state = self.state;
self.qlog.add_event_data(|| {
// No need to tell qlog about exit from transient states.
if old_state.transient() {
None
} else {
let ev_data = EventData::CongestionStateUpdated(CongestionStateUpdated {
old: Some(old_state.to_qlog().to_owned()),
new: state.to_qlog().to_owned(),
trigger: None,
});
Some(ev_data)
}
});
self.state = state;
}
}
fn detect_persistent_congestion<'a>(
&mut self,
first_rtt_sample_time: Option<Instant>,
prev_largest_acked_sent: Option<Instant>,
pto: Duration,
lost_packets: impl IntoIterator<Item = &'a SentPacket>,
) -> bool {
if first_rtt_sample_time.is_none() {
return false;
}
let pc_period = pto * PERSISTENT_CONG_THRESH;
let mut last_pn = 1 << 62; // Impossibly large, but not enough to overflow.
let mut start = None;
// Look for the first lost packet after the previous largest acknowledged.
// Ignore packets that weren't ack-eliciting for the start of this range.
// Also, make sure to ignore any packets sent before we got an RTT estimate
// as we might not have sent PTO packets soon enough after those.
let cutoff = max(first_rtt_sample_time, prev_largest_acked_sent);
for p in lost_packets
.into_iter()
.skip_while(|p| Some(p.time_sent()) < cutoff)
{
if p.pn() != last_pn + 1 {
// Not a contiguous range of lost packets, start over.
start = None;
}
last_pn = p.pn();
if !p.cc_in_flight() {
// Not interesting, keep looking.
continue;
}
if let Some(t) = start {
let elapsed = p
.time_sent()
.checked_duration_since(t)
.expect("time is monotonic");
if elapsed > pc_period {
qinfo!([self], "persistent congestion");
self.congestion_window = self.cwnd_min();
self.acked_bytes = 0;
self.set_state(State::PersistentCongestion);
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::CongestionWindow(self.congestion_window)],
);
return true;
}
} else {
start = Some(p.time_sent());
}
}
false
}
#[must_use]
fn after_recovery_start(&self, packet: &SentPacket) -> bool {
// At the start of the recovery period, the state is transient and
// all packets will have been sent before recovery. When sending out
// the first packet we transition to the non-transient `Recovery`
// state and update the variable `self.recovery_start`. Before the
// first recovery, all packets were sent after the recovery event,
// allowing to reduce the cwnd on congestion events.
!self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn)
}
/// Handle a congestion event.
/// Returns true if this was a true congestion event.
fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool {
// Start a new congestion event if lost or ECN CE marked packet was sent
// after the start of the previous congestion recovery period.
if !self.after_recovery_start(last_packet) {
return false;
}
let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd(
self.congestion_window,
self.acked_bytes,
self.max_datagram_size(),
);
self.congestion_window = max(cwnd, self.cwnd_min());
self.acked_bytes = acked_bytes;
self.ssthresh = self.congestion_window;
qdebug!(
[self],
"Cong event -> recovery; cwnd {}, ssthresh {}",
self.congestion_window,
self.ssthresh
);
qlog::metrics_updated(
&self.qlog,
&[
QlogMetric::CongestionWindow(self.congestion_window),
QlogMetric::SsThresh(self.ssthresh),
QlogMetric::InRecovery(true),
],
);
self.set_state(State::RecoveryStart);
true
}
fn app_limited(&self) -> bool {
if self.bytes_in_flight >= self.congestion_window {
false
} else if self.state.in_slow_start() {
// Allow for potential doubling of the congestion window during slow start.
// That is, the application might not have been able to send enough to respond
// to increases to the congestion window.
self.bytes_in_flight < self.congestion_window / 2
} else {
// We're not limited if the in-flight data is within a single burst of the
// congestion window.
(self.bytes_in_flight + self.max_datagram_size() * PACING_BURST_SIZE)
< self.congestion_window
}
}
}
#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv4Addr},
time::{Duration, Instant},
};
use neqo_common::{qinfo, IpTosEcn};
use test_fixture::now;
use super::{ClassicCongestionControl, WindowAdjustment, PERSISTENT_CONG_THRESH};
use crate::{
cc::{
classic_cc::State,
cubic::{Cubic, CUBIC_BETA_USIZE_DIVIDEND, CUBIC_BETA_USIZE_DIVISOR},
new_reno::NewReno,
CongestionControl, CongestionControlAlgorithm, CWND_INITIAL_PKTS,
},
packet::{PacketNumber, PacketType},
recovery::SentPacket,
rtt::RttEstimate,
Pmtud,
};
const IP_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
const PTO: Duration = Duration::from_millis(100);
const RTT: Duration = Duration::from_millis(98);
const RTT_ESTIMATE: RttEstimate = RttEstimate::from_duration(Duration::from_millis(98));
const ZERO: Duration = Duration::from_secs(0);
const EPSILON: Duration = Duration::from_nanos(1);
const GAP: Duration = Duration::from_secs(1);
/// The largest time between packets without causing persistent congestion.
const SUB_PC: Duration = Duration::from_millis(100 * PERSISTENT_CONG_THRESH as u64);
/// The minimum time between packets to cause persistent congestion.
/// Uses an odd expression because `Duration` arithmetic isn't `const`.
const PC: Duration = Duration::from_nanos(100_000_000 * (PERSISTENT_CONG_THRESH as u64) + 1);
fn cwnd_is_default(cc: &ClassicCongestionControl<NewReno>) {
assert_eq!(cc.cwnd(), cc.cwnd_initial());
assert_eq!(cc.ssthresh(), usize::MAX);
}
fn cwnd_is_halved(cc: &ClassicCongestionControl<NewReno>) {
assert_eq!(cc.cwnd(), cc.cwnd_initial() / 2);
assert_eq!(cc.ssthresh(), cc.cwnd_initial() / 2);
}
fn lost(pn: PacketNumber, ack_eliciting: bool, t: Duration) -> SentPacket {
SentPacket::new(
PacketType::Short,
pn,
IpTosEcn::default(),
now() + t,
ack_eliciting,
Vec::new(),
100,
)
}
fn congestion_control(cc: CongestionControlAlgorithm) -> Box<dyn CongestionControl> {
match cc {
CongestionControlAlgorithm::NewReno => Box::new(ClassicCongestionControl::new(
NewReno::default(),
Pmtud::new(IP_ADDR),
)),
CongestionControlAlgorithm::Cubic => Box::new(ClassicCongestionControl::new(
Cubic::default(),
Pmtud::new(IP_ADDR),
)),
}
}
fn persistent_congestion_by_algorithm(
mut cc: Box<dyn CongestionControl>,
reduced_cwnd: usize,
lost_packets: &[SentPacket],
persistent_expected: bool,
) {
for p in lost_packets {
cc.on_packet_sent(p);
}
cc.on_packets_lost(Some(now()), None, PTO, lost_packets);
let persistent = if cc.cwnd() == reduced_cwnd {
false
} else if cc.cwnd() == cc.cwnd_min() {
true
} else {
panic!("unexpected cwnd");
};
assert_eq!(persistent, persistent_expected);
}
fn persistent_congestion(lost_packets: &[SentPacket], persistent_expected: bool) {
let cc = congestion_control(CongestionControlAlgorithm::NewReno);
let cwnd_initial = cc.cwnd_initial();
persistent_congestion_by_algorithm(cc, cwnd_initial / 2, lost_packets, persistent_expected);
let cc = congestion_control(CongestionControlAlgorithm::Cubic);
let cwnd_initial = cc.cwnd_initial();
persistent_congestion_by_algorithm(
cc,
cwnd_initial * CUBIC_BETA_USIZE_DIVIDEND / CUBIC_BETA_USIZE_DIVISOR,
lost_packets,
persistent_expected,
);
}
/// A span of exactly the PC threshold only reduces the window on loss.
#[test]
fn persistent_congestion_none() {
persistent_congestion(&[lost(1, true, ZERO), lost(2, true, SUB_PC)], false);
}
/// A span of just more than the PC threshold causes persistent congestion.
#[test]
fn persistent_congestion_simple() {
persistent_congestion(&[lost(1, true, ZERO), lost(2, true, PC)], true);
}
/// Both packets need to be ack-eliciting.
#[test]
fn persistent_congestion_non_ack_eliciting() {
persistent_congestion(&[lost(1, false, ZERO), lost(2, true, PC)], false);
persistent_congestion(&[lost(1, true, ZERO), lost(2, false, PC)], false);
}
/// Packets in the middle, of any type, are OK.
#[test]
fn persistent_congestion_middle() {
persistent_congestion(
&[lost(1, true, ZERO), lost(2, false, RTT), lost(3, true, PC)],
true,
);
persistent_congestion(
&[lost(1, true, ZERO), lost(2, true, RTT), lost(3, true, PC)],
true,
);
}
/// Leading non-ack-eliciting packets are skipped.
#[test]
fn persistent_congestion_leading_non_ack_eliciting() {
persistent_congestion(
&[lost(1, false, ZERO), lost(2, true, RTT), lost(3, true, PC)],
false,
);
persistent_congestion(
&[
lost(1, false, ZERO),
lost(2, true, RTT),
lost(3, true, RTT + PC),
],
true,
);
}
/// Trailing non-ack-eliciting packets aren't relevant.
#[test]
fn persistent_congestion_trailing_non_ack_eliciting() {
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, PC),
lost(3, false, PC + EPSILON),
],
true,
);
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, SUB_PC),
lost(3, false, PC),
],
false,
);
}
/// Gaps in the middle, of any type, restart the count.
#[test]
fn persistent_congestion_gap_reset() {
persistent_congestion(&[lost(1, true, ZERO), lost(3, true, PC)], false);
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, RTT),
lost(4, true, GAP),
lost(5, true, GAP + PTO * PERSISTENT_CONG_THRESH),
],
false,
);
}
/// A span either side of a gap will cause persistent congestion.
#[test]
fn persistent_congestion_gap_or() {
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, PC),
lost(4, true, GAP),
lost(5, true, GAP + PTO),
],
true,
);
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, PTO),
lost(4, true, GAP),
lost(5, true, GAP + PC),
],
true,
);
}
/// A gap only restarts after an ack-eliciting packet.
#[test]
fn persistent_congestion_gap_non_ack_eliciting() {
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, PTO),
lost(4, false, GAP),
lost(5, true, GAP + PC),
],
false,
);
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, PTO),
lost(4, false, GAP),
lost(5, true, GAP + RTT),
lost(6, true, GAP + RTT + SUB_PC),
],
false,
);
persistent_congestion(
&[
lost(1, true, ZERO),
lost(2, true, PTO),
lost(4, false, GAP),
lost(5, true, GAP + RTT),
lost(6, true, GAP + RTT + PC),
],
true,
);
}
/// Get a time, in multiples of `PTO`, relative to `now()`.
fn by_pto(t: u32) -> Instant {
now() + (PTO * t)
}
/// Make packets that will be made lost.
/// `times` is the time of sending, in multiples of `PTO`, relative to `now()`.
fn make_lost(times: &[u32]) -> Vec<SentPacket> {
times
.iter()
.enumerate()
.map(|(i, &t)| {
SentPacket::new(
PacketType::Short,
u64::try_from(i).unwrap(),
IpTosEcn::default(),
by_pto(t),
true,
Vec::new(),
1000,
)
})
.collect::<Vec<_>>()
}
/// Call `detect_persistent_congestion` using times relative to now and the fixed PTO time.
/// `last_ack` and `rtt_time` are times in multiples of `PTO`, relative to `now()`,
/// for the time of the largest acknowledged and the first RTT sample, respectively.
fn persistent_congestion_by_pto<T: WindowAdjustment>(
mut cc: ClassicCongestionControl<T>,
last_ack: u32,
rtt_time: u32,
lost: &[SentPacket],
) -> bool {
assert_eq!(cc.cwnd(), cc.cwnd_initial());
let last_ack = Some(by_pto(last_ack));
let rtt_time = Some(by_pto(rtt_time));
// Persistent congestion is never declared if the RTT time is `None`.
cc.detect_persistent_congestion(None, None, PTO, lost.iter());
assert_eq!(cc.cwnd(), cc.cwnd_initial());
cc.detect_persistent_congestion(None, last_ack, PTO, lost.iter());
assert_eq!(cc.cwnd(), cc.cwnd_initial());
cc.detect_persistent_congestion(rtt_time, last_ack, PTO, lost.iter());
cc.cwnd() == cc.cwnd_min()
}
/// No persistent congestion can be had if there are no lost packets.
#[test]
fn persistent_congestion_no_lost() {
let lost = make_lost(&[]);
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
}
/// No persistent congestion can be had if there is only one lost packet.
#[test]
fn persistent_congestion_one_lost() {
let lost = make_lost(&[1]);
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
}
/// Persistent congestion can't happen based on old packets.
#[test]
fn persistent_congestion_past() {
// Packets sent prior to either the last acknowledged or the first RTT
// sample are not considered. So 0 is ignored.
let lost = make_lost(&[0, PERSISTENT_CONG_THRESH + 1, PERSISTENT_CONG_THRESH + 2]);
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
1,
1,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
0,
1,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
1,
0,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
1,
1,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
0,
1,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
1,
0,
&lost
));
}
/// Persistent congestion doesn't start unless the packet is ack-eliciting.
#[test]
fn persistent_congestion_ack_eliciting() {
let mut lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]);
lost[0] = SentPacket::new(
lost[0].packet_type(),
lost[0].pn(),
lost[0].ecn_mark(),
lost[0].time_sent(),
false,
Vec::new(),
lost[0].len(),
);
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
}
/// Detect persistent congestion. Note that the first lost packet needs to have a time
/// greater than the previously acknowledged packet AND the first RTT sample. And the
/// difference in times needs to be greater than the persistent congestion threshold.
#[test]
fn persistent_congestion_min() {
let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]);
assert!(persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
assert!(persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
}
/// Make sure that not having a previous largest acknowledged also results
/// in detecting persistent congestion. (This is not expected to happen, but
/// the code permits it).
#[test]
fn persistent_congestion_no_prev_ack_newreno() {
let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]);
let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR));
cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter());
assert_eq!(cc.cwnd(), cc.cwnd_min());
}
#[test]
fn persistent_congestion_no_prev_ack_cubic() {
let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]);
let mut cc = ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR));
cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter());
assert_eq!(cc.cwnd(), cc.cwnd_min());
}
/// The code asserts on ordering errors.
#[test]
#[should_panic(expected = "time is monotonic")]
fn persistent_congestion_unsorted_newreno() {
let lost = make_lost(&[PERSISTENT_CONG_THRESH + 2, 1]);
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
}
/// The code asserts on ordering errors.
#[test]
#[should_panic(expected = "time is monotonic")]
fn persistent_congestion_unsorted_cubic() {
let lost = make_lost(&[PERSISTENT_CONG_THRESH + 2, 1]);
assert!(!persistent_congestion_by_pto(
ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)),
0,
0,
&lost
));
}
#[test]
fn app_limited_slow_start() {
const BELOW_APP_LIMIT_PKTS: usize = 5;
const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1;
let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR));
let cwnd = cc.congestion_window;
let mut now = now();
let mut next_pn = 0;
// simulate packet bursts below app_limit
for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS {
// always stay below app_limit during sent.
let mut pkts = Vec::new();
for _ in 0..packet_burst_size {
let p = SentPacket::new(
PacketType::Short,
next_pn,
IpTosEcn::default(),
now,
true,
Vec::new(),
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(
cc.bytes_in_flight(),
packet_burst_size * cc.max_datagram_size()
);
now += RTT;
cc.on_packets_acked(&pkts, &RTT_ESTIMATE, now);
assert_eq!(cc.bytes_in_flight(), 0);
assert_eq!(cc.acked_bytes, 0);
assert_eq!(cwnd, cc.congestion_window); // CWND doesn't grow because we're app limited
}
// Fully utilize the congestion window by sending enough packets to
// have `bytes_in_flight` above the `app_limited` threshold.
let mut pkts = Vec::new();
for _ in 0..ABOVE_APP_LIMIT_PKTS {
let p = SentPacket::new(
PacketType::Short,
next_pn,
IpTosEcn::default(),
now,
true,
Vec::new(),
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(
cc.bytes_in_flight(),
ABOVE_APP_LIMIT_PKTS * cc.max_datagram_size()
);
now += RTT;
// Check if congestion window gets increased for all packets currently in flight
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], &RTT_ESTIMATE, now);
assert_eq!(
cc.bytes_in_flight(),
(ABOVE_APP_LIMIT_PKTS - i - 1) * cc.max_datagram_size()
);
// increase acked_bytes with each packet
qinfo!(
"{} {}",
cc.congestion_window,
cwnd + i * cc.max_datagram_size()
);
assert_eq!(
cc.congestion_window,
cwnd + (i + 1) * cc.max_datagram_size()
);
assert_eq!(cc.acked_bytes, 0);
}
}
#[test]
fn app_limited_congestion_avoidance() {
const CWND_PKTS_CA: usize = CWND_INITIAL_PKTS / 2;
const BELOW_APP_LIMIT_PKTS: usize = CWND_PKTS_CA - 2;
const ABOVE_APP_LIMIT_PKTS: usize = BELOW_APP_LIMIT_PKTS + 1;
let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR));
let mut now = now();
// Change state to congestion avoidance by introducing loss.
let p_lost = SentPacket::new(
PacketType::Short,
1,
IpTosEcn::default(),
now,
true,
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packet_sent(&p_lost);
cwnd_is_default(&cc);
now += PTO;
cc.on_packets_lost(Some(now), None, PTO, &[p_lost]);
cwnd_is_halved(&cc);
let p_not_lost = SentPacket::new(
PacketType::Short,
2,
IpTosEcn::default(),
now,
true,
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packet_sent(&p_not_lost);
now += RTT;
cc.on_packets_acked(&[p_not_lost], &RTT_ESTIMATE, now);
cwnd_is_halved(&cc);
// cc is app limited therefore cwnd in not increased.
assert_eq!(cc.acked_bytes, 0);
// Now we are in the congestion avoidance state.
assert_eq!(cc.state, State::CongestionAvoidance);
// simulate packet bursts below app_limit
let mut next_pn = 3;
for packet_burst_size in 1..=BELOW_APP_LIMIT_PKTS {
// always stay below app_limit during sent.
let mut pkts = Vec::new();
for _ in 0..packet_burst_size {
let p = SentPacket::new(
PacketType::Short,
next_pn,
IpTosEcn::default(),
now,
true,
Vec::new(),
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(
cc.bytes_in_flight(),
packet_burst_size * cc.max_datagram_size()
);
now += RTT;
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], &RTT_ESTIMATE, now);
assert_eq!(
cc.bytes_in_flight(),
(packet_burst_size - i - 1) * cc.max_datagram_size()
);
cwnd_is_halved(&cc); // CWND doesn't grow because we're app limited
assert_eq!(cc.acked_bytes, 0);
}
}
// Fully utilize the congestion window by sending enough packets to
// have `bytes_in_flight` above the `app_limited` threshold.
let mut pkts = Vec::new();
for _ in 0..ABOVE_APP_LIMIT_PKTS {
let p = SentPacket::new(
PacketType::Short,
next_pn,
IpTosEcn::default(),
now,
true,
Vec::new(),
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
pkts.push(p);
}
assert_eq!(
cc.bytes_in_flight(),
ABOVE_APP_LIMIT_PKTS * cc.max_datagram_size()
);
now += RTT;
let mut last_acked_bytes = 0;
// Check if congestion window gets increased for all packets currently in flight
for (i, pkt) in pkts.into_iter().enumerate() {
cc.on_packets_acked(&[pkt], &RTT_ESTIMATE, now);
assert_eq!(
cc.bytes_in_flight(),
(ABOVE_APP_LIMIT_PKTS - i - 1) * cc.max_datagram_size()
);
// The cwnd doesn't increase, but the acked_bytes do, which will eventually lead to an
// increase, once the number of bytes reaches the necessary level
cwnd_is_halved(&cc);
// increase acked_bytes with each packet
assert_ne!(cc.acked_bytes, last_acked_bytes);
last_acked_bytes = cc.acked_bytes;
}
}
#[test]
fn ecn_ce() {
let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR));
let p_ce = SentPacket::new(
PacketType::Short,
1,
IpTosEcn::default(),
now(),
true,
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packet_sent(&p_ce);
cwnd_is_default(&cc);
assert_eq!(cc.state, State::SlowStart);
// Signal congestion (ECN CE) and thus change state to recovery start.
cc.on_ecn_ce_received(&p_ce);
cwnd_is_halved(&cc);
assert_eq!(cc.state, State::RecoveryStart);
}
}