Source code

Revision control

Copy as Markdown

Other Tools

/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/stream_scheduler.h"
#include <vector>
#include "net/dcsctp/packet/sctp_packet.h"
#include "net/dcsctp/public/types.h"
#include "test/gmock.h"
namespace dcsctp {
namespace {
using ::testing::Return;
using ::testing::StrictMock;
using ::webrtc::Timestamp;
constexpr size_t kMtu = 1000;
constexpr size_t kPayloadSize = 4;
constexpr Timestamp kNow = Timestamp::Zero();
MATCHER_P(HasDataWithMid, mid, "") {
if (!arg.has_value()) {
*result_listener << "There was no produced data";
return false;
}
if (arg->data.mid != mid) {
*result_listener << "the produced data had mid " << *arg->data.mid
<< " and not the expected " << *mid;
return false;
}
return true;
}
std::function<absl::optional<SendQueue::DataToSend>(Timestamp, size_t)>
CreateChunk(OutgoingMessageId message_id,
StreamID sid,
MID mid,
size_t payload_size = kPayloadSize) {
return [sid, mid, payload_size, message_id](Timestamp now, size_t max_size) {
return SendQueue::DataToSend(
message_id,
Data(sid, SSN(0), mid, FSN(0), PPID(42),
std::vector<uint8_t>(payload_size), Data::IsBeginning(true),
Data::IsEnd(true), IsUnordered(true)));
};
}
std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
size_t packets_to_generate) {
std::map<StreamID, size_t> packet_counts;
for (size_t i = 0; i < packets_to_generate; ++i) {
absl::optional<SendQueue::DataToSend> data = scheduler.Produce(kNow, kMtu);
if (data.has_value()) {
++packet_counts[data->data.stream_id];
}
}
return packet_counts;
}
class MockStreamProducer : public StreamScheduler::StreamProducer {
public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
(Timestamp, size_t),
(override));
MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
};
class TestStream {
public:
TestStream(StreamScheduler& scheduler,
StreamID stream_id,
StreamPriority priority,
size_t packet_size = kPayloadSize) {
EXPECT_CALL(producer_, Produce)
.WillRepeatedly(
CreateChunk(OutgoingMessageId(0), stream_id, MID(0), packet_size));
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size));
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
stream_->MaybeMakeActive();
}
StreamScheduler::Stream& stream() { return *stream_; }
private:
StrictMock<MockStreamProducer> producer_;
std::unique_ptr<StreamScheduler::Stream> stream_;
};
// A scheduler without active streams doesn't produce data.
TEST(StreamSchedulerTest, HasNoActiveStreams) {
StreamScheduler scheduler("", kMtu);
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Stream properties can be set and retrieved
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer;
auto stream =
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
EXPECT_EQ(stream->stream_id(), StreamID(1));
EXPECT_EQ(stream->priority(), StreamPriority(2));
stream->SetPriority(StreamPriority(0));
EXPECT_EQ(stream->priority(), StreamPriority(0));
}
// A scheduler with a single stream produced packets from it.
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer;
EXPECT_CALL(producer, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0)));
EXPECT_CALL(producer, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0));
auto stream =
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Switches between two streams after every packet.
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Switches between two streams after every packet, but keeps producing from the
// same stream when a packet contains of multiple fragments.
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce([](...) {
return SendQueue::DataToSend(
OutgoingMessageId(1),
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(true),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
OutgoingMessageId(1),
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
OutgoingMessageId(1),
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(true), IsUnordered(true)));
})
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Deactivates a stream before it has finished producing all packets.
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// ... but the stream is made inactive before it can be produced.
stream1->MakeInactive();
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Resumes a paused stream - makes a stream active after inactivating it.
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer1;
// Callbacks are setup so that they hint that there is a MID(2) coming...
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active again
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
stream1->MakeInactive();
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Iterates between streams, where one is suddenly paused and later resumed.
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
stream1->MakeInactive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Verifies that packet counts are evenly distributed in round robin scheduling.
TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
StreamScheduler scheduler("", kMtu);
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
}
// Verifies that packet counts are evenly distributed among active streams,
// where a stream is suddenly made inactive, two are added, and then the paused
// stream is resumed.
TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
StreamScheduler scheduler("", kMtu);
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
stream2.stream().MakeInactive();
TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
EXPECT_EQ(counts2[StreamID(1)], 5U);
EXPECT_EQ(counts2[StreamID(2)], 0U);
EXPECT_EQ(counts2[StreamID(3)], 5U);
EXPECT_EQ(counts2[StreamID(4)], 5U);
stream2.stream().MaybeMakeActive();
std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
EXPECT_EQ(counts3[StreamID(1)], 5U);
EXPECT_EQ(counts3[StreamID(2)], 5U);
EXPECT_EQ(counts3[StreamID(3)], 5U);
EXPECT_EQ(counts3[StreamID(4)], 5U);
}
// Degrades to fair queuing with streams having identical priority.
TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
StreamScheduler scheduler("", kMtu);
scheduler.EnableMessageInterleaving(true);
constexpr size_t kSmallPacket = 30;
constexpr size_t kLargePacket = 70;
StrictMock<MockStreamProducer> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100),
kSmallPacket))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101),
kSmallPacket))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102),
kSmallPacket));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kSmallPacket)) // When making active
.WillOnce(Return(kSmallPacket))
.WillOnce(Return(kSmallPacket))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> callback2;
EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200),
kLargePacket))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201),
kLargePacket))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202),
kLargePacket));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kLargePacket)) // When making active
.WillOnce(Return(kLargePacket))
.WillOnce(Return(kLargePacket))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
// t = 30
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
// t = 60
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// t = 70
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
// t = 90
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
// t = 140
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
// t = 210
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Will do weighted fair queuing with three streams having different priority.
TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
StreamScheduler scheduler("", kMtu);
scheduler.EnableMessageInterleaving(true);
StrictMock<MockStreamProducer> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
// Priority 125 -> allowed to produce every 1000/125 ~= 80 time units.
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> callback2;
EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
// Priority 200 -> allowed to produce every 1000/200 ~= 50 time units.
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
stream2->MaybeMakeActive();
StrictMock<MockStreamProducer> callback3;
EXPECT_CALL(callback3, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300)))
.WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301)))
.WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302)));
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
// Priority 500 -> allowed to produce every 1000/500 ~= 20 time units.
auto stream3 =
scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
stream3->MaybeMakeActive();
// t ~= 20
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300)));
// t ~= 40
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301)));
// t ~= 50
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
// t ~= 60
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302)));
// t ~= 80
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
// t ~= 100
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
// t ~= 150
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
// t ~= 160
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// t ~= 240
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Will do weighted fair queuing with three streams having different priority
// and sending different payload sizes.
TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
StreamScheduler scheduler("", kMtu);
scheduler.EnableMessageInterleaving(true);
constexpr size_t kSmallPacket = 20;
constexpr size_t kMediumPacket = 50;
constexpr size_t kLargePacket = 70;
// Stream with priority = 125 -> inverse weight ~=80
StrictMock<MockStreamProducer> callback1;
EXPECT_CALL(callback1, Produce)
// virtual finish time ~ 0 + 50 * 80 = 4000
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100),
kMediumPacket))
// virtual finish time ~ 4000 + 20 * 80 = 5600
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101),
kSmallPacket))
// virtual finish time ~ 5600 + 70 * 80 = 11200
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102),
kLargePacket));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kMediumPacket)) // When making active
.WillOnce(Return(kSmallPacket))
.WillOnce(Return(kLargePacket))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
stream1->MaybeMakeActive();
// Stream with priority = 200 -> inverse weight ~=50
StrictMock<MockStreamProducer> callback2;
EXPECT_CALL(callback2, Produce)
// virtual finish time ~ 0 + 50 * 50 = 2500
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200),
kMediumPacket))
// virtual finish time ~ 2500 + 70 * 50 = 6000
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201),
kLargePacket))
// virtual finish time ~ 6000 + 20 * 50 = 7000
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202),
kSmallPacket));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kMediumPacket)) // When making active
.WillOnce(Return(kLargePacket))
.WillOnce(Return(kSmallPacket))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
stream2->MaybeMakeActive();
// Stream with priority = 500 -> inverse weight ~=20
StrictMock<MockStreamProducer> callback3;
EXPECT_CALL(callback3, Produce)
// virtual finish time ~ 0 + 20 * 20 = 400
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300),
kSmallPacket))
// virtual finish time ~ 400 + 50 * 20 = 1400
.WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301),
kMediumPacket))
// virtual finish time ~ 1400 + 70 * 20 = 2800
.WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302),
kLargePacket));
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
.WillOnce(Return(kSmallPacket)) // When making active
.WillOnce(Return(kMediumPacket))
.WillOnce(Return(kLargePacket))
.WillOnce(Return(0));
auto stream3 =
scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
stream3->MaybeMakeActive();
// t ~= 400
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300)));
// t ~= 1400
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301)));
// t ~= 2500
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
// t ~= 2800
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302)));
// t ~= 4000
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
// t ~= 5600
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// t ~= 6000
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
// t ~= 7000
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
// t ~= 11200
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
// A simple test with two streams of different priority, but sending packets
// of identical size. Verifies that the ratio of sent packets represent their
// priority.
StreamScheduler scheduler("", kMtu);
scheduler.EnableMessageInterleaving(true);
TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 10U);
}
TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) {
// Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more
// streams.
StreamScheduler scheduler("", kMtu);
scheduler.EnableMessageInterleaving(true);
TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize);
TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize);
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 10U);
EXPECT_EQ(packet_counts[StreamID(3)], 15U);
EXPECT_EQ(packet_counts[StreamID(4)], 20U);
}
TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) {
// A simple test with two streams of different priority, but sending packets
// of different size. Verifies that the ratio of total packet payload
// represent their priority.
// In this example,
// * stream1 has priority 100 and sends packets of size 8
// * stream2 has priority 400 and sends packets of size 4
// With round robin, stream1 would get twice as many payload bytes on the wire
// as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as
// many payload bytes on the wire. That translates to stream2 getting 8x as
// many packets on the wire as they are half as large.
StreamScheduler scheduler("", kMtu);
// Enable WFQ scheduler.
scheduler.EnableMessageInterleaving(true);
TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
/*packet_size=*/8);
TestStream stream2(scheduler, StreamID(2), StreamPriority(400),
/*packet_size=*/4);
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90);
EXPECT_EQ(packet_counts[StreamID(1)], 10U);
EXPECT_EQ(packet_counts[StreamID(2)], 80U);
}
TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) {
// Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more
// complicated.
StreamScheduler scheduler("", kMtu);
// Enable WFQ scheduler.
scheduler.EnableMessageInterleaving(true);
TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
/*packet_size=*/10);
TestStream stream2(scheduler, StreamID(2), StreamPriority(200),
/*packet_size=*/10);
TestStream stream3(scheduler, StreamID(3), StreamPriority(200),
/*packet_size=*/20);
TestStream stream4(scheduler, StreamID(4), StreamPriority(400),
/*packet_size=*/30);
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80);
// 15 packets * 10 bytes = 150 bytes at priority 100.
EXPECT_EQ(packet_counts[StreamID(1)], 15U);
// 30 packets * 10 bytes = 300 bytes at priority 200.
EXPECT_EQ(packet_counts[StreamID(2)], 30U);
// 15 packets * 20 bytes = 300 bytes at priority 200.
EXPECT_EQ(packet_counts[StreamID(3)], 15U);
// 20 packets * 30 bytes = 600 bytes at priority 400.
EXPECT_EQ(packet_counts[StreamID(4)], 20U);
}
// Sending large messages with small MTU will fragment the messages and produce
// a first fragment not larger than the MTU, and will then not first send from
// the stream with the smallest message, as their first fragment will be equally
// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but
// with a larger MTU.
TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
StreamScheduler scheduler(
"", 100 + SctpPacket::kHeaderSize + IDataChunk::kHeaderSize);
scheduler.EnableMessageInterleaving(true);
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 100))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(0), 100));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(200)) // When making active
.WillOnce(Return(100))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(2), MID(1), 100))
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(1), 50));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(150)) // When making active
.WillOnce(Return(50))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Sending large messages with large MTU will not fragment messages and will
// send the message first from the stream that has the smallest message.
TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
StreamScheduler scheduler(
"", 200 + SctpPacket::kHeaderSize + IDataChunk::kHeaderSize);
scheduler.EnableMessageInterleaving(true);
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 200));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(200)) // When making active
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(2), MID(1), 150));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(150)) // When making active
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
} // namespace
} // namespace dcsctp