Source code

Revision control

Copy as Markdown

Other Tools

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#include "MediaPipeline.h"
#include <inttypes.h>
#include <math.h>
#include <sstream>
#include <utility>
#include "AudioSegment.h"
#include "AudioConverter.h"
#include "DOMMediaStream.h"
#include "ImageContainer.h"
#include "ImageTypes.h"
#include "MediaEngine.h"
#include "MediaSegment.h"
#include "MediaTrackGraph.h"
#include "MediaTrackListener.h"
#include "MediaStreamTrack.h"
#include "RtpLogger.h"
#include "VideoFrameConverter.h"
#include "VideoSegment.h"
#include "VideoStreamTrack.h"
#include "VideoUtils.h"
#include "mozilla/Logging.h"
#include "mozilla/NullPrincipal.h"
#include "mozilla/PeerIdentity.h"
#include "mozilla/Preferences.h"
#include "mozilla/SharedThreadPool.h"
#include "mozilla/Sprintf.h"
#include "mozilla/StaticPrefs_media.h"
#include "mozilla/TaskQueue.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/UniquePtrExtensions.h"
#include "mozilla/dom/RTCStatsReportBinding.h"
#include "mozilla/dom/Document.h"
#include "mozilla/gfx/Point.h"
#include "mozilla/gfx/Types.h"
#include "nsError.h"
#include "nsThreadUtils.h"
#include "transport/runnable_utils.h"
#include "jsapi/MediaTransportHandler.h"
#include "jsapi/PeerConnectionImpl.h"
#include "Tracing.h"
#include "libwebrtcglue/WebrtcImageBuffer.h"
#include "libwebrtcglue/MediaConduitInterface.h"
#include "common_video/include/video_frame_buffer.h"
#include "modules/rtp_rtcp/include/rtp_rtcp.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/source/rtp_packet_received.h"
// Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
// 48KHz)
#define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
AUDIO_SAMPLE_BUFFER_MAX_BYTES,
"AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
using namespace mozilla;
using namespace mozilla::dom;
using namespace mozilla::gfx;
using namespace mozilla::layers;
mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline");
namespace mozilla {
// An async inserter for audio data, to avoid running audio codec encoders
// on the MTG/input audio thread. Basically just bounces all the audio
// data to a single audio processing/input queue. We could if we wanted to
// use multiple threads and a TaskQueue.
class AudioProxyThread {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)
: mConduit(std::move(aConduit)),
mTaskQueue(TaskQueue::Create(
GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), "AudioProxy")),
mAudioConverter(nullptr) {
MOZ_ASSERT(mConduit);
MOZ_COUNT_CTOR(AudioProxyThread);
}
// This function is the identity if aInputRate is supported.
// Else, it returns a rate that is supported, that ensure no loss in audio
// quality: the sampling rate returned is always greater to the inputed
// sampling-rate, if they differ..
uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
AudioSessionConduit* conduit =
static_cast<AudioSessionConduit*>(mConduit.get());
if (conduit->IsSamplingFreqSupported(aInputRate)) {
return aInputRate;
}
if (aInputRate < 16000) {
return 16000;
}
if (aInputRate < 32000) {
return 32000;
}
if (aInputRate < 44100) {
return 44100;
}
return 48000;
}
// From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
// something the conduit can work with (or send silence if the track is not
// enabled), and send the audio in 10ms chunks to the conduit.
void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
bool aEnabled) {
MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
// Convert to interleaved 16-bits integer audio, with a maximum of two
// channels (since the WebRTC.org code below makes the assumption that the
// input audio is either mono or stereo), with a sample-rate rate that is
// 16, 32, 44.1, or 48kHz.
uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
// We take advantage of the fact that the common case (microphone directly
// to PeerConnection, that is, a normal call), the samples are already
// 16-bits mono, so the representation in interleaved and planar is the
// same, and we can just use that.
if (aEnabled && outputChannels == 1 &&
aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
PacketizeAndSend(samples, transmissionRate, outputChannels,
aChunk.mDuration);
return;
}
uint32_t sampleCount = aChunk.mDuration * outputChannels;
if (mInterleavedAudio.Length() < sampleCount) {
mInterleavedAudio.SetLength(sampleCount);
}
if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
PodZero(mInterleavedAudio.Elements(), sampleCount);
} else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
aChunk.mVolume, outputChannels,
mInterleavedAudio.Elements());
} else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
aChunk.mVolume, outputChannels,
mInterleavedAudio.Elements());
}
int16_t* inputAudio = mInterleavedAudio.Elements();
size_t inputAudioFrameCount = aChunk.mDuration;
AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
AudioConfig::FORMAT_S16);
AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
transmissionRate, AudioConfig::FORMAT_S16);
// Resample to an acceptable sample-rate for the sending side
if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
mAudioConverter->OutputConfig() != outputConfig) {
mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
}
int16_t* processedAudio = nullptr;
size_t framesProcessed =
mAudioConverter->Process(inputAudio, inputAudioFrameCount);
if (framesProcessed == 0) {
// In place conversion not possible, use a buffer.
framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
inputAudioFrameCount);
processedAudio = mOutputAudio.Data();
} else {
processedAudio = inputAudio;
}
PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
framesProcessed);
}
// This packetizes aAudioData in 10ms chunks and sends it.
// aAudioData is interleaved audio data at a rate and with a channel count
// that is appropriate to send with the conduit.
void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
uint32_t aChannels, uint32_t aFrameCount) {
MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
MOZ_ASSERT(aChannels == 1 || aChannels == 2);
MOZ_ASSERT(aAudioData);
uint32_t audio_10ms = aRate / 100;
if (!mPacketizer || mPacketizer->mPacketSize != audio_10ms ||
mPacketizer->mChannels != aChannels) {
// It's the right thing to drop the bit of audio still in the packetizer:
// we don't want to send to the conduit audio that has two different
// rates while telling it that it has a constante rate.
mPacketizer =
MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
}
mPacketizer->Input(aAudioData, aFrameCount);
while (mPacketizer->PacketsAvailable()) {
mPacketizer->Output(mPacket.get());
auto frame = std::make_unique<webrtc::AudioFrame>();
// UpdateFrame makes a copy of the audio data.
frame->UpdateFrame(frame->timestamp_, mPacket.get(),
mPacketizer->mPacketSize, aRate, frame->speech_type_,
frame->vad_activity_, mPacketizer->mChannels);
mConduit->SendAudioFrame(std::move(frame));
}
}
void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
bool aEnabled) {
RefPtr<AudioProxyThread> self = this;
nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
"AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
}));
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
Unused << rv;
}
protected:
virtual ~AudioProxyThread() { MOZ_COUNT_DTOR(AudioProxyThread); }
const RefPtr<AudioSessionConduit> mConduit;
const RefPtr<TaskQueue> mTaskQueue;
// Only accessed on mTaskQueue
UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
// A buffer to hold a single packet of audio.
UniquePtr<int16_t[]> mPacket;
nsTArray<int16_t> mInterleavedAudio;
AlignedShortBuffer mOutputAudio;
UniquePtr<AudioConverter> mAudioConverter;
};
#define INIT_MIRROR(name, val) \
name(AbstractThread::MainThread(), val, "MediaPipeline::" #name " (Mirror)")
MediaPipeline::MediaPipeline(const std::string& aPc,
RefPtr<MediaTransportHandler> aTransportHandler,
DirectionType aDirection,
RefPtr<AbstractThread> aCallThread,
RefPtr<nsISerialEventTarget> aStsThread,
RefPtr<MediaSessionConduit> aConduit)
: mConduit(std::move(aConduit)),
mDirection(aDirection),
mCallThread(std::move(aCallThread)),
mStsThread(std::move(aStsThread)),
INIT_MIRROR(mActive, false),
mLevel(0),
mTransportHandler(std::move(aTransportHandler)),
mRtpPacketsSent(0),
mRtcpPacketsSent(0),
mRtpPacketsReceived(0),
mRtpBytesSent(0),
mRtpBytesReceived(0),
mPc(aPc),
mRtpHeaderExtensionMap(new webrtc::RtpHeaderExtensionMap()),
mPacketDumper(PacketDumper::GetPacketDumper(mPc)) {}
#undef INIT_MIRROR
MediaPipeline::~MediaPipeline() {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("Destroying MediaPipeline: %s", mDescription.c_str()));
}
void MediaPipeline::Shutdown() {
MOZ_ASSERT(NS_IsMainThread());
mActive.DisconnectIfConnected();
RUN_ON_THREAD(mStsThread,
WrapRunnable(RefPtr<MediaPipeline>(this),
&MediaPipeline::DetachTransport_s),
NS_DISPATCH_NORMAL);
}
void MediaPipeline::DetachTransport_s() {
ASSERT_ON_THREAD(mStsThread);
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("%s in %s", mDescription.c_str(), __FUNCTION__));
disconnect_all();
mRtpState = TransportLayer::TS_NONE;
mRtcpState = TransportLayer::TS_NONE;
mTransportId.clear();
mConduit->SetTransportActive(false);
mRtpSendEventListener.DisconnectIfExists();
mSenderRtcpSendEventListener.DisconnectIfExists();
mReceiverRtcpSendEventListener.DisconnectIfExists();
}
void MediaPipeline::UpdateTransport_m(
const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
mStsThread->Dispatch(NS_NewRunnableFunction(
__func__, [aTransportId, filter = std::move(aFilter),
self = RefPtr<MediaPipeline>(this)]() mutable {
self->UpdateTransport_s(aTransportId, std::move(filter));
}));
}
void MediaPipeline::UpdateTransport_s(
const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
ASSERT_ON_THREAD(mStsThread);
if (!mSignalsConnected) {
mTransportHandler->SignalStateChange.connect(
this, &MediaPipeline::RtpStateChange);
mTransportHandler->SignalRtcpStateChange.connect(
this, &MediaPipeline::RtcpStateChange);
mTransportHandler->SignalEncryptedSending.connect(
this, &MediaPipeline::EncryptedPacketSending);
mTransportHandler->SignalPacketReceived.connect(
this, &MediaPipeline::PacketReceived);
mTransportHandler->SignalAlpnNegotiated.connect(
this, &MediaPipeline::AlpnNegotiated);
mSignalsConnected = true;
}
if (aTransportId != mTransportId) {
mTransportId = aTransportId;
mRtpState = mTransportHandler->GetState(mTransportId, false);
mRtcpState = mTransportHandler->GetState(mTransportId, true);
CheckTransportStates();
}
if (mFilter) {
for (const auto& extension : mFilter->GetExtmap()) {
mRtpHeaderExtensionMap->Deregister(extension.uri);
}
}
if (mFilter && aFilter) {
// Use the new filter, but don't forget any remote SSRCs that we've learned
// by receiving traffic.
mFilter->Update(*aFilter);
} else {
mFilter = std::move(aFilter);
}
if (mFilter) {
for (const auto& extension : mFilter->GetExtmap()) {
mRtpHeaderExtensionMap->RegisterByUri(extension.id, extension.uri);
}
}
}
void MediaPipeline::GetContributingSourceStats(
const nsString& aInboundRtpStreamId,
FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
ASSERT_ON_THREAD(mStsThread);
// Get the expiry from now
DOMHighResTimeStamp expiry =
RtpCSRCStats::GetExpiryFromTime(GetTimestampMaker().GetNow().ToDom());
for (auto info : mCsrcStats) {
if (!info.second.Expired(expiry)) {
RTCRTPContributingSourceStats stats;
info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
if (!aArr.AppendElement(stats, fallible)) {
mozalloc_handle_oom(0);
}
}
}
}
void MediaPipeline::RtpStateChange(const std::string& aTransportId,
TransportLayer::State aState) {
if (mTransportId != aTransportId) {
return;
}
mRtpState = aState;
CheckTransportStates();
}
void MediaPipeline::RtcpStateChange(const std::string& aTransportId,
TransportLayer::State aState) {
if (mTransportId != aTransportId) {
return;
}
mRtcpState = aState;
CheckTransportStates();
}
void MediaPipeline::CheckTransportStates() {
ASSERT_ON_THREAD(mStsThread);
if (mRtpState == TransportLayer::TS_CLOSED ||
mRtpState == TransportLayer::TS_ERROR ||
mRtcpState == TransportLayer::TS_CLOSED ||
mRtcpState == TransportLayer::TS_ERROR) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Warning,
("RTP Transport failed for pipeline %p flow %s", this,
mDescription.c_str()));
NS_WARNING(
"MediaPipeline Transport failed. This is not properly cleaned up yet");
// TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
// connection was good and now it is bad.
// TODO(ekr@rtfm.com): Report up so that the PC knows we
// have experienced an error.
mConduit->SetTransportActive(false);
mRtpSendEventListener.DisconnectIfExists();
mSenderRtcpSendEventListener.DisconnectIfExists();
mReceiverRtcpSendEventListener.DisconnectIfExists();
return;
}
if (mRtpState == TransportLayer::TS_OPEN) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTP Transport ready for pipeline %p flow %s", this,
mDescription.c_str()));
}
if (mRtcpState == TransportLayer::TS_OPEN) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTCP Transport ready for pipeline %p flow %s", this,
mDescription.c_str()));
}
if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) {
if (mDirection == DirectionType::TRANSMIT) {
mRtpSendEventListener = mConduit->SenderRtpSendEvent().Connect(
mStsThread, this, &MediaPipeline::SendPacket);
mSenderRtcpSendEventListener = mConduit->SenderRtcpSendEvent().Connect(
mStsThread, this, &MediaPipeline::SendPacket);
} else {
mConduit->ConnectReceiverRtpEvent(mRtpReceiveEvent);
mReceiverRtcpSendEventListener =
mConduit->ReceiverRtcpSendEvent().Connect(mStsThread, this,
&MediaPipeline::SendPacket);
}
mConduit->SetTransportActive(true);
TransportReady_s();
}
}
void MediaPipeline::SendPacket(MediaPacket&& aPacket) {
ASSERT_ON_THREAD(mStsThread);
const bool isRtp = aPacket.type() == MediaPacket::RTP;
if (isRtp && mRtpState != TransportLayer::TS_OPEN) {
return;
}
if (!isRtp && mRtcpState != TransportLayer::TS_OPEN) {
return;
}
aPacket.sdp_level() = Some(Level());
if (RtpLogger::IsPacketLoggingOn()) {
RtpLogger::LogPacket(aPacket, false, mDescription);
}
if (isRtp) {
mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtp, true,
aPacket.data(), aPacket.len());
IncrementRtpPacketsSent(aPacket);
} else {
mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtcp, true,
aPacket.data(), aPacket.len());
IncrementRtcpPacketsSent();
}
MOZ_LOG(
gMediaPipelineLog, LogLevel::Debug,
("%s sending %s packet", mDescription.c_str(), (isRtp ? "RTP" : "RTCP")));
mTransportHandler->SendPacket(mTransportId, std::move(aPacket));
}
void MediaPipeline::IncrementRtpPacketsSent(const MediaPacket& aPacket) {
ASSERT_ON_THREAD(mStsThread);
++mRtpPacketsSent;
mRtpBytesSent += aPacket.len();
if (!(mRtpPacketsSent % 100)) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent));
}
}
void MediaPipeline::IncrementRtcpPacketsSent() {
ASSERT_ON_THREAD(mStsThread);
++mRtcpPacketsSent;
if (!(mRtcpPacketsSent % 100)) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTCP sent packet count for %s Pipeline %p: %u",
mDescription.c_str(), this, mRtcpPacketsSent));
}
}
void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
ASSERT_ON_THREAD(mStsThread);
++mRtpPacketsReceived;
mRtpBytesReceived += aBytes;
if (!(mRtpPacketsReceived % 100)) {
MOZ_LOG(
gMediaPipelineLog, LogLevel::Info,
("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived));
}
}
void MediaPipeline::PacketReceived(const std::string& aTransportId,
const MediaPacket& packet) {
ASSERT_ON_THREAD(mStsThread);
if (mTransportId != aTransportId) {
return;
}
MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN);
if (packet.type() != MediaPacket::RTP) {
return;
}
if (mDirection == DirectionType::TRANSMIT) {
return;
}
if (!packet.len()) {
return;
}
webrtc::RTPHeader header;
rtc::CopyOnWriteBuffer packet_buffer(packet.data(), packet.len());
webrtc::RtpPacketReceived parsedPacket(mRtpHeaderExtensionMap.get());
if (!parsedPacket.Parse(packet_buffer)) {
return;
}
parsedPacket.GetHeader(&header);
if (mFilter && !mFilter->Filter(header)) {
return;
}
auto now = GetTimestampMaker().GetNow();
parsedPacket.set_arrival_time(now.ToRealtime());
if (IsVideo()) {
parsedPacket.set_payload_type_frequency(webrtc::kVideoPayloadTypeFrequency);
}
// Remove expired RtpCSRCStats
if (!mCsrcStats.empty()) {
auto expiry = RtpCSRCStats::GetExpiryFromTime(now.ToDom());
for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
if (p->second.Expired(expiry)) {
p = mCsrcStats.erase(p);
continue;
}
p++;
}
}
// Add new RtpCSRCStats
if (header.numCSRCs) {
for (auto i = 0; i < header.numCSRCs; i++) {
auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
if (csrcInfo == mCsrcStats.end()) {
mCsrcStats.insert(
std::make_pair(header.arrOfCSRCs[i],
RtpCSRCStats(header.arrOfCSRCs[i], now.ToDom())));
} else {
csrcInfo->second.SetTimestamp(now.ToDom());
}
}
}
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("%s received RTP packet.", mDescription.c_str()));
IncrementRtpPacketsReceived(packet.len());
RtpLogger::LogPacket(packet, true, mDescription);
// Might be nice to pass ownership of the buffer in this case, but it is a
// small optimization in a rare case.
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false,
packet.encrypted_data(), packet.encrypted_len());
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(),
packet.len());
mRtpReceiveEvent.Notify(std::move(parsedPacket), header);
}
void MediaPipeline::AlpnNegotiated(const std::string& aAlpn,
bool aPrivacyRequested) {
ASSERT_ON_THREAD(mStsThread);
if (aPrivacyRequested && Direction() == DirectionType::RECEIVE) {
// This will force the receive pipelines to drop data until they have
// received a private PrincipalHandle from RTCRtpReceiver (which takes a
// detour via main thread).
static_cast<MediaPipelineReceive*>(this)->OnPrivacyRequested_s();
}
}
void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId,
const MediaPacket& aPacket) {
ASSERT_ON_THREAD(mStsThread);
if (mTransportId == aTransportId) {
dom::mozPacketDumpType type;
if (aPacket.type() == MediaPacket::SRTP) {
type = dom::mozPacketDumpType::Srtp;
} else if (aPacket.type() == MediaPacket::SRTCP) {
type = dom::mozPacketDumpType::Srtcp;
} else if (aPacket.type() == MediaPacket::DTLS) {
// TODO(bug 1497936): Implement packet dump for DTLS
return;
} else {
MOZ_ASSERT(false);
return;
}
mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len());
}
}
class MediaPipelineTransmit::PipelineListener
: public DirectMediaTrackListener {
friend class MediaPipelineTransmit;
public:
explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit)
: mConduit(std::move(aConduit)),
mActive(false),
mEnabled(false),
mDirectConnect(false) {}
~PipelineListener() {
if (mConverter) {
mConverter->Shutdown();
}
}
void SetActive(bool aActive) {
mActive = aActive;
if (mConverter) {
mConverter->SetActive(aActive);
}
}
void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
// These are needed since nested classes don't have access to any particular
// instance of the parent
void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) {
mAudioProcessing = std::move(aProxy);
}
void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) {
mConverter = std::move(aConverter);
}
void OnVideoFrameConverted(webrtc::VideoFrame aVideoFrame) {
MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO);
static_cast<VideoSessionConduit*>(mConduit.get())
->SendVideoFrame(std::move(aVideoFrame));
}
// Implement MediaTrackListener
void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset,
const MediaSegment& aQueuedMedia) override;
void NotifyEnabledStateChanged(MediaTrackGraph* aGraph,
bool aEnabled) override;
// Implement DirectMediaTrackListener
void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset,
const MediaSegment& aMedia) override;
void NotifyDirectListenerInstalled(InstallationResult aResult) override;
void NotifyDirectListenerUninstalled() override;
private:
void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
const RefPtr<MediaSessionConduit> mConduit;
RefPtr<AudioProxyThread> mAudioProcessing;
RefPtr<VideoFrameConverter> mConverter;
// active is true if there is a transport to send on
mozilla::Atomic<bool> mActive;
// enabled is true if the media access control permits sending
// actual content; when false you get black/silence
mozilla::Atomic<bool> mEnabled;
// Written and read on the MediaTrackGraph thread
bool mDirectConnect;
};
MediaPipelineTransmit::MediaPipelineTransmit(
const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
bool aIsVideo, RefPtr<MediaSessionConduit> aConduit)
: MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT,
std::move(aCallThread), std::move(aStsThread),
std::move(aConduit)),
mWatchManager(this, AbstractThread::MainThread()),
mIsVideo(aIsVideo),
mListener(new PipelineListener(mConduit)),
mDomTrack(nullptr, "MediaPipelineTransmit::mDomTrack"),
mSendTrackOverride(nullptr, "MediaPipelineTransmit::mSendTrackOverride") {
if (!IsVideo()) {
mAudioProcessing =
MakeAndAddRef<AudioProxyThread>(*mConduit->AsAudioSessionConduit());
mListener->SetAudioProxy(mAudioProcessing);
}
mWatchManager.Watch(mActive, &MediaPipelineTransmit::UpdateSendState);
mWatchManager.Watch(mDomTrack, &MediaPipelineTransmit::UpdateSendState);
mWatchManager.Watch(mSendTrackOverride,
&MediaPipelineTransmit::UpdateSendState);
mDescription = GenerateDescription();
}
void MediaPipelineTransmit::RegisterListener() {
if (!IsVideo()) {
return;
}
mConverter = VideoFrameConverter::Create(GetTimestampMaker());
mConverter->SetIdleFrameDuplicationInterval(TimeDuration::FromSeconds(1));
mFrameListener = mConverter->VideoFrameConvertedEvent().Connect(
mConverter->mTaskQueue,
[listener = mListener](webrtc::VideoFrame aFrame) {
listener->OnVideoFrameConverted(std::move(aFrame));
});
mListener->SetVideoFrameConverter(mConverter);
}
already_AddRefed<MediaPipelineTransmit> MediaPipelineTransmit::Create(
const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
bool aIsVideo, RefPtr<MediaSessionConduit> aConduit) {
RefPtr<MediaPipelineTransmit> transmit = new MediaPipelineTransmit(
aPc, std::move(aTransportHandler), std::move(aCallThread),
std::move(aStsThread), aIsVideo, std::move(aConduit));
transmit->RegisterListener();
return transmit.forget();
}
MediaPipelineTransmit::~MediaPipelineTransmit() {
mFrameListener.DisconnectIfExists();
MOZ_ASSERT(!mTransmitting);
MOZ_ASSERT(!mDomTrack.Ref());
}
void MediaPipelineTransmit::InitControl(
MediaPipelineTransmitControlInterface* aControl) {
aControl->CanonicalTransmitting().ConnectMirror(&mActive);
}
void MediaPipelineTransmit::Shutdown() {
MediaPipeline::Shutdown();
mWatchManager.Shutdown();
if (mDomTrack.Ref()) {
mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
mDomTrack = nullptr;
}
mUnsettingSendTrack = false;
UpdateSendState();
MOZ_ASSERT(!mTransmitting);
}
void MediaPipeline::SetDescription_s(const std::string& description) {
ASSERT_ON_THREAD(mStsThread);
mDescription = description;
}
std::string MediaPipelineTransmit::GenerateDescription() const {
MOZ_ASSERT(NS_IsMainThread());
std::stringstream description;
description << mPc << "| ";
description << (mIsVideo ? "Transmit video[" : "Transmit audio[");
if (mDomTrack.Ref()) {
nsString nsTrackId;
mDomTrack.Ref()->GetId(nsTrackId);
description << NS_ConvertUTF16toUTF8(nsTrackId).get();
} else if (mSendTrackOverride.Ref()) {
description << "override " << mSendTrackOverride.Ref().get();
} else {
description << "no track";
}
description << "]";
return description.str();
}
void MediaPipelineTransmit::UpdateSendState() {
MOZ_ASSERT(NS_IsMainThread());
// This runs because either mActive, mDomTrack or mSendTrackOverride changed,
// or because mSendTrack was unset async. Based on these inputs this method
// is responsible for hooking up mSendTrack to mListener in order to feed data
// to the conduit.
//
// If we are inactive, or if the send track does not match what we want to
// send (mDomTrack or mSendTrackOverride), we must stop feeding data to the
// conduit. NB that removing the listener from mSendTrack is async, and we
// must wait for it to resolve before adding mListener to another track.
// mUnsettingSendTrack gates us until the listener has been removed from
// mSendTrack.
//
// If we are active and the send track does match what we want to send, we
// make sure mListener is added to the send track. Either now, or if we're
// still waiting for another send track to be removed, during a future call to
// this method.
if (mUnsettingSendTrack) {
// We must wait for the send track to be unset before we can set it again,
// to avoid races. Once unset this function is triggered again.
return;
}
const bool wasTransmitting = mTransmitting;
const bool haveLiveSendTrack = mSendTrack && !mSendTrack->IsDestroyed();
const bool haveLiveDomTrack = mDomTrack.Ref() && !mDomTrack.Ref()->Ended();
const bool haveLiveOverrideTrack =
mSendTrackOverride.Ref() && !mSendTrackOverride.Ref()->IsDestroyed();
const bool mustRemoveSendTrack =
haveLiveSendTrack && !mSendTrackOverride.Ref() &&
(!haveLiveDomTrack || mDomTrack.Ref()->GetTrack() != mSendPortSource);
mTransmitting = mActive && (haveLiveDomTrack || haveLiveOverrideTrack) &&
!mustRemoveSendTrack;
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("MediaPipeline %p UpdateSendState wasTransmitting=%d, active=%d, "
"sendTrack=%p (%s), domTrack=%p (%s), "
"sendTrackOverride=%p (%s), mustRemove=%d, mTransmitting=%d",
this, wasTransmitting, mActive.Ref(), mSendTrack.get(),
haveLiveSendTrack ? "live" : "ended", mDomTrack.Ref().get(),
haveLiveDomTrack ? "live" : "ended", mSendTrackOverride.Ref().get(),
haveLiveOverrideTrack ? "live" : "ended", mustRemoveSendTrack,
mTransmitting));
if (!wasTransmitting && mTransmitting) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("Attaching pipeline %p to track %p conduit type=%s", this,
mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
if (mDescriptionInvalidated) {
// Only update the description when we attach to a track, as detaching is
// always a longer async step than updating the description. Updating on
// detach would cause the wrong track id to be attributed in logs.
RUN_ON_THREAD(mStsThread,
WrapRunnable(RefPtr<MediaPipeline>(this),
&MediaPipelineTransmit::SetDescription_s,
GenerateDescription()),
NS_DISPATCH_NORMAL);
mDescriptionInvalidated = false;
}
if (mSendTrackOverride.Ref()) {
// Special path that allows unittests to avoid mDomTrack and the graph by
// manually calling SetSendTrack.
mSendTrack = mSendTrackOverride.Ref();
} else {
mSendTrack = mDomTrack.Ref()->Graph()->CreateForwardedInputTrack(
mDomTrack.Ref()->GetTrack()->mType);
mSendPortSource = mDomTrack.Ref()->GetTrack();
mSendPort = mSendTrack->AllocateInputPort(mSendPortSource.get());
}
if (mIsVideo) {
mConverter->SetTrackingId(mDomTrack.Ref()->GetSource().mTrackingId);
}
mSendTrack->QueueSetAutoend(false);
if (mIsVideo) {
mSendTrack->AddDirectListener(mListener);
}
mSendTrack->AddListener(mListener);
}
if (wasTransmitting && !mTransmitting) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("Detaching pipeline %p from track %p conduit type=%s", this,
mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
mUnsettingSendTrack = true;
if (mIsVideo) {
mSendTrack->RemoveDirectListener(mListener);
}
mSendTrack->RemoveListener(mListener)->Then(
GetMainThreadSerialEventTarget(), __func__,
[this, self = RefPtr<MediaPipelineTransmit>(this)] {
mUnsettingSendTrack = false;
mSendTrack = nullptr;
if (!mWatchManager.IsShutdown()) {
mWatchManager.ManualNotify(&MediaPipelineTransmit::UpdateSendState);
}
});
if (!mSendTrackOverride.Ref()) {
// If an override is set it may be re-used.
mSendTrack->Destroy();
mSendPort->Destroy();
mSendPort = nullptr;
mSendPortSource = nullptr;
}
}
}
bool MediaPipelineTransmit::Transmitting() const {
MOZ_ASSERT(NS_IsMainThread());
return mActive;
}
bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }
void MediaPipelineTransmit::PrincipalChanged(dom::MediaStreamTrack* aTrack) {
MOZ_ASSERT(aTrack && aTrack == mDomTrack.Ref());
PeerConnectionWrapper pcw(mPc);
if (pcw.impl()) {
Document* doc = pcw.impl()->GetParentObject()->GetExtantDoc();
if (doc) {
UpdateSinkIdentity(doc->NodePrincipal(), pcw.impl()->GetPeerIdentity());
} else {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("Can't update sink principal; document gone"));
}
}
}
void MediaPipelineTransmit::UpdateSinkIdentity(
nsIPrincipal* aPrincipal, const PeerIdentity* aSinkIdentity) {
MOZ_ASSERT(NS_IsMainThread());
if (!mDomTrack.Ref()) {
// Nothing to do here
return;
}
bool enableTrack = aPrincipal->Subsumes(mDomTrack.Ref()->GetPrincipal());
if (!enableTrack) {
// first try didn't work, but there's a chance that this is still available
// if our track is bound to a peerIdentity, and the peer connection (our
// sink) is bound to the same identity, then we can enable the track.
const PeerIdentity* trackIdentity = mDomTrack.Ref()->GetPeerIdentity();
if (aSinkIdentity && trackIdentity) {
enableTrack = (*aSinkIdentity == *trackIdentity);
}
}
mListener->SetEnabled(enableTrack);
}
void MediaPipelineTransmit::TransportReady_s() {
ASSERT_ON_THREAD(mStsThread);
// Call base ready function.
MediaPipeline::TransportReady_s();
mListener->SetActive(true);
}
nsresult MediaPipelineTransmit::SetTrack(
const RefPtr<MediaStreamTrack>& aDomTrack) {
MOZ_ASSERT(NS_IsMainThread());
if (mDomTrack.Ref()) {
mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
}
if (aDomTrack) {
nsString nsTrackId;
aDomTrack->GetId(nsTrackId);
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("Reattaching pipeline to track %p track %s conduit type: %s",
aDomTrack.get(), NS_ConvertUTF16toUTF8(nsTrackId).get(),
mIsVideo ? "video" : "audio"));
}
mDescriptionInvalidated = true;
mDomTrack = aDomTrack;
if (mDomTrack.Ref()) {
mDomTrack.Ref()->AddPrincipalChangeObserver(this);
PrincipalChanged(mDomTrack.Ref());
}
return NS_OK;
}
RefPtr<dom::MediaStreamTrack> MediaPipelineTransmit::GetTrack() const {
MOZ_ASSERT(NS_IsMainThread());
return mDomTrack;
}
void MediaPipelineTransmit::SetSendTrackOverride(
const RefPtr<ProcessedMediaTrack>& aSendTrack) {
MOZ_ASSERT(NS_IsMainThread());
MOZ_RELEASE_ASSERT(!mSendTrack);
MOZ_RELEASE_ASSERT(!mSendPort);
MOZ_RELEASE_ASSERT(!mSendTrackOverride.Ref());
mDescriptionInvalidated = true;
mSendTrackOverride = aSendTrack;
}
// Called if we're attached with AddDirectListener()
void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData(
MediaTrackGraph* aGraph, TrackTime aOffset, const MediaSegment& aMedia) {
MOZ_LOG(
gMediaPipelineLog, LogLevel::Debug,
("MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64
", duration=%" PRId64,
this, aOffset, aMedia.GetDuration()));
TRACE_COMMENT(
"MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData", "%s",
aMedia.GetType() == MediaSegment::VIDEO ? "Video" : "Audio");
NewData(aMedia, aGraph->GraphRate());
}
void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges(
MediaTrackGraph* aGraph, TrackTime aOffset,
const MediaSegment& aQueuedMedia) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("MediaPipeline::NotifyQueuedChanges()"));
if (aQueuedMedia.GetType() == MediaSegment::VIDEO) {
// We always get video from the direct listener.
return;
}
TRACE("MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges (Audio)");
if (mDirectConnect) {
// ignore non-direct data if we're also getting direct data
return;
}
size_t rate;
if (aGraph) {
rate = aGraph->GraphRate();
} else {
// When running tests, graph may be null. In that case use a default.
rate = 16000;
}
NewData(aQueuedMedia, rate);
}
void MediaPipelineTransmit::PipelineListener::NotifyEnabledStateChanged(
MediaTrackGraph* aGraph, bool aEnabled) {
if (mConduit->type() != MediaSessionConduit::VIDEO) {
return;
}
MOZ_ASSERT(mConverter);
mConverter->SetTrackEnabled(aEnabled);
}
void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled(
InstallationResult aResult) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("MediaPipeline::NotifyDirectListenerInstalled() listener=%p,"
" result=%d",
this, static_cast<int32_t>(aResult)));
mDirectConnect = InstallationResult::SUCCESS == aResult;
}
void MediaPipelineTransmit::PipelineListener::
NotifyDirectListenerUninstalled() {
MOZ_LOG(
gMediaPipelineLog, LogLevel::Info,
("MediaPipeline::NotifyDirectListenerUninstalled() listener=%p", this));
if (mConduit->type() == MediaSessionConduit::VIDEO) {
// Reset the converter's track-enabled state. If re-added to a new track
// later and that track is disabled, we will be signaled explicitly.
MOZ_ASSERT(mConverter);
mConverter->SetTrackEnabled(true);
}
mDirectConnect = false;
}
void MediaPipelineTransmit::PipelineListener::NewData(
const MediaSegment& aMedia, TrackRate aRate /* = 0 */) {
if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO
? MediaSessionConduit::AUDIO
: MediaSessionConduit::VIDEO)) {
MOZ_ASSERT(false,
"The media type should always be correct since the "
"listener is locked to a specific track");
return;
}
// TODO(ekr@rtfm.com): For now assume that we have only one
// track type and it's destined for us
// See bug 784517
if (aMedia.GetType() == MediaSegment::AUDIO) {
MOZ_RELEASE_ASSERT(aRate > 0);
if (!mActive) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("Discarding audio packets because transport not ready"));
return;
}
const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia);
for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded();
iter.Next()) {
mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled);
}
} else {
const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia);
for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded();
iter.Next()) {
mConverter->QueueVideoChunk(*iter, !mEnabled);
}
}
}
class GenericReceiveListener : public MediaTrackListener {
public:
GenericReceiveListener(RefPtr<SourceMediaTrack> aSource,
TrackingId aTrackingId)
: mSource(std::move(aSource)),
mTrackingId(std::move(aTrackingId)),
mIsAudio(mSource->mType == MediaSegment::AUDIO),
mEnabled(false) {
MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread());
MOZ_DIAGNOSTIC_ASSERT(mSource, "Must be used with a SourceMediaTrack");
}
virtual ~GenericReceiveListener() = default;
void Init() { mSource->AddListener(this); }
void Shutdown() { mSource->RemoveListener(this); }
void SetEnabled(bool aEnabled) {
if (mEnabled == aEnabled) {
return;
}
mEnabled = aEnabled;
if (mIsAudio && !mSource->IsDestroyed()) {
mSource->SetPullingEnabled(mEnabled);
}
}
protected:
const RefPtr<SourceMediaTrack> mSource;
const TrackingId mTrackingId;
const bool mIsAudio;
// Main thread only.
bool mEnabled;
};
MediaPipelineReceive::MediaPipelineReceive(
const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
RefPtr<MediaSessionConduit> aConduit)
: MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::RECEIVE,
std::move(aCallThread), std::move(aStsThread),
std::move(aConduit)),
mWatchManager(this, AbstractThread::MainThread()) {
mWatchManager.Watch(mActive, &MediaPipelineReceive::UpdateListener);
}
MediaPipelineReceive::~MediaPipelineReceive() = default;
void MediaPipelineReceive::InitControl(
MediaPipelineReceiveControlInterface* aControl) {
aControl->CanonicalReceiving().ConnectMirror(&mActive);
}
void MediaPipelineReceive::Shutdown() {
MOZ_ASSERT(NS_IsMainThread());
MediaPipeline::Shutdown();
mWatchManager.Shutdown();
}
class MediaPipelineReceiveAudio::PipelineListener
: public GenericReceiveListener {
public:
PipelineListener(RefPtr<SourceMediaTrack> aSource, TrackingId aTrackingId,
RefPtr<MediaSessionConduit> aConduit,
PrincipalHandle aPrincipalHandle, PrincipalPrivacy aPrivacy)
: GenericReceiveListener(std::move(aSource), std::move(aTrackingId)),
mConduit(std::move(aConduit)),
// AudioSession conduit only supports 16, 32, 44.1 and 48kHz
// This is an artificial limitation, it would however require more
// changes to support any rates. If the sampling rate is not-supported,
// we will use 48kHz instead.
mRate(static_cast<AudioSessionConduit*>(mConduit.get())
->IsSamplingFreqSupported(mSource->Graph()->GraphRate())
? mSource->Graph()->GraphRate()
: WEBRTC_MAX_SAMPLE_RATE),
mTaskQueue(TaskQueue::Create(
GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER),
"AudioPipelineListener")),
mPlayedTicks(0),
mAudioFrame(std::make_unique<webrtc::AudioFrame>()),
mPrincipalHandle(std::move(aPrincipalHandle)),
mPrivacy(aPrivacy),
mForceSilence(false) {}
void Init() {
GenericReceiveListener::Init();
mSource->SetAppendDataSourceRate(mRate);
}
// Implement MediaTrackListener
void NotifyPull(MediaTrackGraph* aGraph, TrackTime aEndOfAppendedData,
TrackTime aDesiredTime) override {
NotifyPullImpl(aDesiredTime);
}
void OnPrivacyRequested_s() {
if (mPrivacy == PrincipalPrivacy::Private) {
return;
}
mForceSilence = true;
}
void SetPrivatePrincipal(PrincipalHandle aHandle) {
MOZ_ASSERT(NS_IsMainThread());
if (mSource->IsDestroyed()) {
return;
}
mSource->QueueControlMessageWithNoShutdown(
[self = RefPtr{this}, this,
privatePrincipal = std::move(aHandle)]() mutable {
if (mPrivacy == PrincipalPrivacy::Private) {
return;
}
mPrincipalHandle = std::move(privatePrincipal);
mPrivacy = PrincipalPrivacy::Private;
mForceSilence = false;
});
}
private:
~PipelineListener() = default;
void NotifyPullImpl(TrackTime aDesiredTime) {
TRACE_COMMENT("PiplineListener::NotifyPullImpl", "PipelineListener %p",
this);
uint32_t samplesPer10ms = mRate / 100;
// mSource's rate is not necessarily the same as the graph rate, since there
// are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and
// 48kHz are supported. The audio frames we get here is going to be
// resampled when inserted into the graph. aDesiredTime and mPlayedTicks are
// in the graph rate.
while (mPlayedTicks < aDesiredTime) {
// This fetches 10ms of data, either mono or stereo
MediaConduitErrorCode err =
static_cast<AudioSessionConduit*>(mConduit.get())
->GetAudioFrame(mRate, mAudioFrame.get());
if (err != kMediaConduitNoError) {
// Insert silence on conduit/GIPS failure (extremely unlikely)
MOZ_LOG(gMediaPipelineLog, LogLevel::Error,
("Audio conduit failed (%d) to return data @ %" PRId64
" (desired %" PRId64 " -> %f)",
err, mPlayedTicks, aDesiredTime,
mSource->TrackTimeToSeconds(aDesiredTime)));
constexpr size_t mono = 1;
mAudioFrame->UpdateFrame(
mAudioFrame->timestamp_, nullptr, samplesPer10ms, mRate,
mAudioFrame->speech_type_, mAudioFrame->vad_activity_,
std::max(mono, mAudioFrame->num_channels()));
}
MOZ_LOG(
gMediaPipelineLog, LogLevel::Debug,
("Audio conduit returned buffer for %zu channels, %zu frames",
mAudioFrame->num_channels(), mAudioFrame->samples_per_channel()));
AudioSegment segment;
if (mForceSilence || mAudioFrame->muted()) {
segment.AppendNullData(mAudioFrame->samples_per_channel());
} else {
CheckedInt<size_t> bufferSize(sizeof(uint16_t));
bufferSize *= mAudioFrame->samples_per_channel();
bufferSize *= mAudioFrame->num_channels();
RefPtr<SharedBuffer> samples = SharedBuffer::Create(bufferSize);
int16_t* samplesData = static_cast<int16_t*>(samples->Data());
AutoTArray<int16_t*, 2> channels;
AutoTArray<const int16_t*, 2> outputChannels;
channels.SetLength(mAudioFrame->num_channels());
size_t offset = 0;
for (size_t i = 0; i < mAudioFrame->num_channels(); i++) {
channels[i] = samplesData + offset;
offset += mAudioFrame->samples_per_channel();
}
DeinterleaveAndConvertBuffer(
mAudioFrame->data(), mAudioFrame->samples_per_channel(),
mAudioFrame->num_channels(), channels.Elements());
outputChannels.AppendElements(channels);
segment.AppendFrames(samples.forget(), outputChannels,
mAudioFrame->samples_per_channel(),
mPrincipalHandle);
}
// Handle track not actually added yet or removed/finished
if (TrackTime appended = mSource->AppendData(&segment)) {
mPlayedTicks += appended;
} else {
MOZ_LOG(gMediaPipelineLog, LogLevel::Error, ("AppendData failed"));
// we can't un-read the data, but that's ok since we don't want to
// buffer - but don't i-loop!
break;
}
}
}
const RefPtr<MediaSessionConduit> mConduit;
// This conduit's sampling rate. This is either 16, 32, 44.1 or 48kHz, and
// tries to be the same as the graph rate. If the graph rate is higher than
// 48kHz, mRate is capped to 48kHz. If mRate does not match the graph rate,
// audio is resampled to the graph rate.
const TrackRate mRate;
const RefPtr<TaskQueue> mTaskQueue;
// Number of frames of data that has been added to the SourceMediaTrack in
// the graph's rate. Graph thread only.
TrackTicks mPlayedTicks;
// Allocation of an audio frame used as a scratch buffer when reading data out
// of libwebrtc for forwarding into the graph. Graph thread only.
std::unique_ptr<webrtc::AudioFrame> mAudioFrame;
// Principal handle used when appending data to the SourceMediaTrack. Graph
// thread only.
PrincipalHandle mPrincipalHandle;
// Privacy of mPrincipalHandle. Graph thread only.
PrincipalPrivacy mPrivacy;
// Set to true on the sts thread if privacy is requested when ALPN was
// negotiated. Set to false again when mPrincipalHandle is private.
Atomic<bool> mForceSilence;
};
MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
RefPtr<AudioSessionConduit> aConduit, RefPtr<SourceMediaTrack> aSource,
TrackingId aTrackingId, PrincipalHandle aPrincipalHandle,
PrincipalPrivacy aPrivacy)
: MediaPipelineReceive(aPc, std::move(aTransportHandler),
std::move(aCallThread), std::move(aStsThread),
std::move(aConduit)),
mListener(aSource ? new PipelineListener(
std::move(aSource), std::move(aTrackingId),
mConduit, std::move(aPrincipalHandle), aPrivacy)
: nullptr) {
mDescription = mPc + "| Receive audio";
if (mListener) {
mListener->Init();
}
}
void MediaPipelineReceiveAudio::Shutdown() {
MOZ_ASSERT(NS_IsMainThread());
MediaPipelineReceive::Shutdown();
if (mListener) {
mListener->Shutdown();
}
}
void MediaPipelineReceiveAudio::OnPrivacyRequested_s() {
ASSERT_ON_THREAD(mStsThread);
if (mListener) {
mListener->OnPrivacyRequested_s();
}
}
void MediaPipelineReceiveAudio::SetPrivatePrincipal(PrincipalHandle aHandle) {
MOZ_ASSERT(NS_IsMainThread());
if (mListener) {
mListener->SetPrivatePrincipal(std::move(aHandle));
}
}
void MediaPipelineReceiveAudio::UpdateListener() {
MOZ_ASSERT(NS_IsMainThread());
if (mListener) {
mListener->SetEnabled(mActive.Ref());
}
}
class MediaPipelineReceiveVideo::PipelineListener
: public GenericReceiveListener {
public:
PipelineListener(RefPtr<SourceMediaTrack> aSource, TrackingId aTrackingId,
PrincipalHandle aPrincipalHandle, PrincipalPrivacy aPrivacy)
: GenericReceiveListener(std::move(aSource), std::move(aTrackingId)),
mImageContainer(MakeAndAddRef<ImageContainer>(
ImageUsageType::Webrtc, ImageContainer::ASYNCHRONOUS)),
mMutex("MediaPipelineReceiveVideo::PipelineListener::mMutex"),
mPrincipalHandle(std::move(aPrincipalHandle)),
mPrivacy(aPrivacy) {}
void OnPrivacyRequested_s() {
MutexAutoLock lock(mMutex);
if (mPrivacy == PrincipalPrivacy::Private) {
return;
}
mForceDropFrames = true;
}
void SetPrivatePrincipal(PrincipalHandle aHandle) {
MutexAutoLock lock(mMutex);
if (mPrivacy == PrincipalPrivacy::Private) {
return;
}
mPrincipalHandle = std::move(aHandle);
mPrivacy = PrincipalPrivacy::Private;
mForceDropFrames = false;
}
void RenderVideoFrame(const webrtc::VideoFrame& aVideoFrame) {
PrincipalHandle principal;
{
MutexAutoLock lock(mMutex);
if (mForceDropFrames) {
return;
}
principal = mPrincipalHandle;
}
RefPtr<Image> image;
const webrtc::VideoFrameBuffer& buffer = *aVideoFrame.video_frame_buffer();
if (buffer.type() == webrtc::VideoFrameBuffer::Type::kNative) {
// We assume that only native handles are used with the
// WebrtcMediaDataCodec decoder.
const ImageBuffer* imageBuffer = static_cast<const ImageBuffer*>(&buffer);
image = imageBuffer->GetNativeImage();
} else {
MOZ_ASSERT(buffer.type() == webrtc::VideoFrameBuffer::Type::kI420);
rtc::scoped_refptr<const webrtc::I420BufferInterface> i420(
buffer.GetI420());
MOZ_ASSERT(i420->DataY());
// Create a video frame using |buffer|.
PerformanceRecorder<CopyVideoStage> rec(
"MediaPipelineReceiveVideo::CopyToImage"_ns, mTrackingId,
i420->width(), i420->height());
RefPtr<PlanarYCbCrImage> yuvImage =
mImageContainer->CreatePlanarYCbCrImage();
PlanarYCbCrData yuvData;
yuvData.mYChannel = const_cast<uint8_t*>(i420->DataY());
yuvData.mYStride = i420->StrideY();
MOZ_ASSERT(i420->StrideU() == i420->StrideV());
yuvData.mCbCrStride = i420->StrideU();
yuvData.mCbChannel = const_cast<uint8_t*>(i420->DataU());
yuvData.mCrChannel = const_cast<uint8_t*>(i420->DataV());
yuvData.mPictureRect = IntRect(0, 0, i420->width(), i420->height());
yuvData.mStereoMode = StereoMode::MONO;
// This isn't the best default.
yuvData.mYUVColorSpace = gfx::YUVColorSpace::BT601;
yuvData.mChromaSubsampling =
gfx::ChromaSubsampling::HALF_WIDTH_AND_HEIGHT;
if (NS_FAILED(yuvImage->CopyData(yuvData))) {
MOZ_ASSERT(false);
return;
}
rec.Record();
image = std::move(yuvImage);
}
Maybe<webrtc::Timestamp> receiveTime;
for (const auto& packet : aVideoFrame.packet_infos()) {
if (!receiveTime || *receiveTime < packet.receive_time()) {
receiveTime = Some(packet.receive_time());
}
}
VideoSegment segment;
auto size = image->GetSize();
auto processingDuration =
aVideoFrame.processing_time()
? media::TimeUnit::FromMicroseconds(
aVideoFrame.processing_time()->Elapsed().us())
: media::TimeUnit::Invalid();
segment.AppendWebrtcRemoteFrame(
image.forget(), size, principal,
/* aForceBlack */ false, TimeStamp::Now(), processingDuration,
aVideoFrame.rtp_timestamp(), aVideoFrame.ntp_time_ms(),
receiveTime ? receiveTime->us() : 0);
mSource->AppendData(&segment);
}
private:
RefPtr<layers::ImageContainer> mImageContainer;
Mutex mMutex;
PrincipalHandle mPrincipalHandle MOZ_GUARDED_BY(mMutex);
PrincipalPrivacy mPrivacy MOZ_GUARDED_BY(mMutex);
// Set to true on the sts thread if privacy is requested when ALPN was
// negotiated. Set to false again when mPrincipalHandle is private.
bool mForceDropFrames MOZ_GUARDED_BY(mMutex) = false;
};
class MediaPipelineReceiveVideo::PipelineRenderer
: public mozilla::VideoRenderer {
public:
explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline)
: mPipeline(aPipeline) {}
void Detach() { mPipeline = nullptr; }
// Implement VideoRenderer
void FrameSizeChange(unsigned int aWidth, unsigned int aHeight) override {}
void RenderVideoFrame(const webrtc::VideoFrame& aVideoFrame) override {
mPipeline->mListener->RenderVideoFrame(aVideoFrame);
}
private:
MediaPipelineReceiveVideo* mPipeline; // Raw pointer to avoid cycles
};
MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
RefPtr<VideoSessionConduit> aConduit, RefPtr<SourceMediaTrack> aSource,
TrackingId aTrackingId, PrincipalHandle aPrincipalHandle,
PrincipalPrivacy aPrivacy)
: MediaPipelineReceive(aPc, std::move(aTransportHandler),
std::move(aCallThread), std::move(aStsThread),
std::move(aConduit)),
mRenderer(new PipelineRenderer(this)),
mListener(aSource ? new PipelineListener(
std::move(aSource), std::move(aTrackingId),
std::move(aPrincipalHandle), aPrivacy)
: nullptr) {
mDescription = mPc + "| Receive video";
if (mListener) {
mListener->Init();
}
static_cast<VideoSessionConduit*>(mConduit.get())->AttachRenderer(mRenderer);
}
void MediaPipelineReceiveVideo::Shutdown() {
MOZ_ASSERT(NS_IsMainThread());
MediaPipelineReceive::Shutdown();
if (mListener) {
mListener->Shutdown();
}
// stop generating video and thus stop invoking the PipelineRenderer
// and PipelineListener - the renderer has a raw ptr to the Pipeline to
// avoid cycles, and the render callbacks are invoked from a different
// thread so simple null-checks would cause TSAN bugs without locks.
static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer();
}
void MediaPipelineReceiveVideo::OnPrivacyRequested_s() {
ASSERT_ON_THREAD(mStsThread);
if (mListener) {
mListener->OnPrivacyRequested_s();
}
}
void MediaPipelineReceiveVideo::SetPrivatePrincipal(PrincipalHandle aHandle) {
MOZ_ASSERT(NS_IsMainThread());
if (mListener) {
mListener->SetPrivatePrincipal(std::move(aHandle));
}
}
void MediaPipelineReceiveVideo::UpdateListener() {
MOZ_ASSERT(NS_IsMainThread());
if (mListener) {
mListener->SetEnabled(mActive.Ref());
}
}
const dom::RTCStatsTimestampMaker& MediaPipeline::GetTimestampMaker() const {
return mConduit->GetTimestampMaker();
}
DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime(
const DOMHighResTimeStamp aTime) {
// DOMHighResTimeStamp is a unit measured in ms
return aTime + EXPIRY_TIME_MILLISECONDS;
}
MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc,
const DOMHighResTimeStamp aTime)
: mCsrc(aCsrc), mTimestamp(aTime) {}
void MediaPipeline::RtpCSRCStats::GetWebidlInstance(
dom::RTCRTPContributingSourceStats& aWebidlObj,
const nsString& aInboundRtpStreamId) const {
nsString statId = u"csrc_"_ns + aInboundRtpStreamId;
statId.AppendLiteral("_");
statId.AppendInt(mCsrc);
aWebidlObj.mId.Construct(statId);
aWebidlObj.mType.Construct(RTCStatsType::Csrc);
aWebidlObj.mTimestamp.Construct(mTimestamp);
aWebidlObj.mContributorSsrc.Construct(mCsrc);
aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId);
}
} // namespace mozilla