Source code

Revision control

Copy as Markdown

Other Tools

/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
#include <deque>
#include <memory>
#include <set>
#include <unordered_map>
#include "absl/types/optional.h"
#include "rtc_base/checks.h"
namespace webrtc {
// Represents the queue which can be read by multiple readers. Each reader reads
// from its own queue head. When an element is added it will become visible for
// all readers. When an element will be removed by all the readers, the element
// will be removed from the queue.
template <typename T>
class MultiReaderQueue {
public:
// Creates queue with exactly `readers_count` readers named from 0 to
// `readers_count - 1`.
explicit MultiReaderQueue(size_t readers_count) {
for (size_t i = 0; i < readers_count; ++i) {
heads_[i] = 0;
}
}
// Creates queue with specified readers.
explicit MultiReaderQueue(std::set<size_t> readers) {
for (size_t reader : readers) {
heads_[reader] = 0;
}
}
// Adds a new `reader`, initializing its reading position (the reader's head)
// equal to the one of `reader_to_copy`.
// Complexity O(MultiReaderQueue::size(reader_to_copy)).
void AddReader(size_t reader, size_t reader_to_copy) {
size_t pos = GetHeadPositionOrDie(reader_to_copy);
auto it = heads_.find(reader);
RTC_CHECK(it == heads_.end())
<< "Reader " << reader << " is already in the queue";
heads_[reader] = heads_[reader_to_copy];
for (size_t i = pos; i < queue_.size(); ++i) {
in_queues_[i]++;
}
}
// Adds a new `reader`, initializing its reading position equal to first
// element in the queue.
// Complexity O(MultiReaderQueue::size()).
void AddReader(size_t reader) {
auto it = heads_.find(reader);
RTC_CHECK(it == heads_.end())
<< "Reader " << reader << " is already in the queue";
heads_[reader] = removed_elements_count_;
for (size_t i = 0; i < queue_.size(); ++i) {
in_queues_[i]++;
}
}
// Removes specified `reader` from the queue.
// Complexity O(MultiReaderQueue::size(reader)).
void RemoveReader(size_t reader) {
size_t pos = GetHeadPositionOrDie(reader);
for (size_t i = pos; i < queue_.size(); ++i) {
in_queues_[i]--;
}
while (!in_queues_.empty() && in_queues_[0] == 0) {
PopFront();
}
heads_.erase(reader);
}
// Add value to the end of the queue. Complexity O(1).
void PushBack(T value) {
queue_.push_back(value);
in_queues_.push_back(heads_.size());
}
// Extract element from specified head. Complexity O(1).
absl::optional<T> PopFront(size_t reader) {
size_t pos = GetHeadPositionOrDie(reader);
if (pos >= queue_.size()) {
return absl::nullopt;
}
T out = queue_[pos];
in_queues_[pos]--;
heads_[reader]++;
if (in_queues_[pos] == 0) {
RTC_DCHECK_EQ(pos, 0);
PopFront();
}
return out;
}
// Returns element at specified head. Complexity O(1).
absl::optional<T> Front(size_t reader) const {
size_t pos = GetHeadPositionOrDie(reader);
if (pos >= queue_.size()) {
return absl::nullopt;
}
return queue_[pos];
}
// Returns true if for specified head there are no more elements in the queue
// or false otherwise. Complexity O(1).
bool IsEmpty(size_t reader) const {
size_t pos = GetHeadPositionOrDie(reader);
return pos >= queue_.size();
}
// Returns size of the longest queue between all readers.
// Complexity O(1).
size_t size() const { return queue_.size(); }
// Returns size of the specified queue. Complexity O(1).
size_t size(size_t reader) const {
size_t pos = GetHeadPositionOrDie(reader);
return queue_.size() - pos;
}
// Complexity O(1).
size_t readers_count() const { return heads_.size(); }
private:
size_t GetHeadPositionOrDie(size_t reader) const {
auto it = heads_.find(reader);
RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader;
return it->second - removed_elements_count_;
}
void PopFront() {
RTC_DCHECK(!queue_.empty());
RTC_DCHECK_EQ(in_queues_[0], 0);
queue_.pop_front();
in_queues_.pop_front();
removed_elements_count_++;
}
// Number of the elements that were removed from the queue. It is used to
// subtract from each head to compute the right index inside `queue_`;
size_t removed_elements_count_ = 0;
std::deque<T> queue_;
// In how may queues the element at index `i` is. An element can be removed
// from the front if and only if it is in 0 queues.
std::deque<size_t> in_queues_;
// Map from the reader to the head position in the queue.
std::unordered_map<size_t, size_t> heads_;
};
} // namespace webrtc
#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_