Source code
Revision control
Copy as Markdown
Other Tools
/* ex: set tabstop=2 shiftwidth=2 expandtab:
* Copyright © 2019 Jan Kelling
*
* This program is made available under an ISC-style license. See the
* accompanying file LICENSE for details.
*/
#include "cubeb-internal.h"
#include "cubeb/cubeb.h"
#include "cubeb_android.h"
#include "cubeb_log.h"
#include "cubeb_resampler.h"
#include "cubeb_triple_buffer.h"
#include <aaudio/AAudio.h>
#include <android/api-level.h>
#include <atomic>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstring>
#include <dlfcn.h>
#include <inttypes.h>
#include <limits>
#include <memory>
#include <mutex>
#include <thread>
#include <variant>
#include <vector>
using namespace std;
#ifdef DISABLE_LIBAAUDIO_DLOPEN
#define WRAP(x) x
#else
#define WRAP(x) (*cubeb_##x)
#define LIBAAUDIO_API_VISIT(X) \
X(AAudio_convertResultToText) \
X(AAudio_convertStreamStateToText) \
X(AAudio_createStreamBuilder) \
X(AAudioStreamBuilder_openStream) \
X(AAudioStreamBuilder_setChannelCount) \
X(AAudioStreamBuilder_setBufferCapacityInFrames) \
X(AAudioStreamBuilder_setDirection) \
X(AAudioStreamBuilder_setFormat) \
X(AAudioStreamBuilder_setSharingMode) \
X(AAudioStreamBuilder_setPerformanceMode) \
X(AAudioStreamBuilder_setSampleRate) \
X(AAudioStreamBuilder_delete) \
X(AAudioStreamBuilder_setDataCallback) \
X(AAudioStreamBuilder_setErrorCallback) \
X(AAudioStream_close) \
X(AAudioStream_read) \
X(AAudioStream_requestStart) \
X(AAudioStream_requestPause) \
X(AAudioStream_getTimestamp) \
X(AAudioStream_requestFlush) \
X(AAudioStream_requestStop) \
X(AAudioStream_getPerformanceMode) \
X(AAudioStream_getSharingMode) \
X(AAudioStream_getBufferSizeInFrames) \
X(AAudioStream_getBufferCapacityInFrames) \
X(AAudioStream_getSampleRate) \
X(AAudioStream_waitForStateChange) \
X(AAudioStream_getFramesRead) \
X(AAudioStream_getState) \
X(AAudioStream_getFramesWritten) \
X(AAudioStream_getFramesPerBurst) \
X(AAudioStreamBuilder_setInputPreset) \
X(AAudioStreamBuilder_setUsage) \
X(AAudioStreamBuilder_setFramesPerDataCallback)
// not needed or added later on \
// X(AAudioStreamBuilder_setDeviceId) \
// X(AAudioStreamBuilder_setSamplesPerFrame) \
// X(AAudioStream_getSamplesPerFrame) \
// X(AAudioStream_getDeviceId) \
// X(AAudioStream_write) \
// X(AAudioStream_getChannelCount) \
// X(AAudioStream_getFormat) \
// X(AAudioStream_getXRunCount) \
// X(AAudioStream_isMMapUsed) \
// X(AAudioStreamBuilder_setContentType) \
// X(AAudioStreamBuilder_setSessionId) \
// X(AAudioStream_getUsage) \
// X(AAudioStream_getContentType) \
// X(AAudioStream_getInputPreset) \
// X(AAudioStream_getSessionId) \
// X(AAudioStream_setBufferSizeInFrames) \
// END: not needed or added later on
#define MAKE_TYPEDEF(x) static decltype(x) * cubeb_##x;
LIBAAUDIO_API_VISIT(MAKE_TYPEDEF)
#undef MAKE_TYPEDEF
#endif
const uint8_t MAX_STREAMS = 16;
const int64_t NS_PER_S = static_cast<int64_t>(1e9);
static void
aaudio_stream_destroy(cubeb_stream * stm);
static int
aaudio_stream_start(cubeb_stream * stm);
static int
aaudio_stream_stop(cubeb_stream * stm);
static int
aaudio_stream_init_impl(cubeb_stream * stm, lock_guard<mutex> & lock);
static int
aaudio_stream_stop_locked(cubeb_stream * stm, lock_guard<mutex> & lock);
static void
aaudio_stream_destroy_locked(cubeb_stream * stm, lock_guard<mutex> & lock);
static int
aaudio_stream_start_locked(cubeb_stream * stm, lock_guard<mutex> & lock);
static void
reinitialize_stream(cubeb_stream * stm);
enum class stream_state {
INIT = 0,
STOPPED,
STOPPING,
STARTED,
STARTING,
DRAINING,
ERROR,
SHUTDOWN,
};
struct AAudioTimingInfo {
// The timestamp at which the audio engine last called the calback.
uint64_t tstamp;
// The number of output frames sent to the engine.
uint64_t output_frame_index;
// The current output latency in frames. 0 if there is no output stream.
uint32_t output_latency;
// The current input latency in frames. 0 if there is no input stream.
uint32_t input_latency;
};
/* To guess the current position of the stream when it's playing, the elapsed
* time between the last callback and now is used. However, when the stream was
* stopped and there was no new callback after playing restarted yet, the time
* spent in stopped state should be excluded. It's also necessary to track the
* number of audio frames written to stream before reinitialization so it can be
* used to offset the position later, because
* `AAudioTimingInfo.output_frame_index` will restart from zero after
* reinitializing.
* This class defines an internal state machine that takes the stream state
* changes and callback emissions as events to changes it own states and
* estimates played time accordingly.
*
* A simplified |stream_state| transitions of playing looks like:
* INIT -> [STARTING/STARTED -> callback* -> STOPPING/STOPPED]* -> SHUTDOWN|INIT
*
* Internal states:
* - None: the initial state.
* - Play: stream is playing.
* - Pause: stream is not playing. Holds stop timestamp.
* - Resume: stream is playing after stopping and no callback emitted yet. Holds
* time elapsed in the previous Pause state.
* Transitions:
* - None -(STARTING)-> Play
* - Play -(STOPPING)-> Pause
* - Pause -(STARTING)-> Resume
* - Resume -(callback)-> Play
* - Resume -(STARTING)-> Resume
* - Pause -(INIT)-> None
*/
class position_estimate {
public:
// Called with the current time when stopping the stream.
void stop(uint64_t timestamp)
{
assert(in_state<Play>() || in_state<Resume>());
// Change to Pause and save the current time in it. Timestamp offset by the
// elapsed time in previous Pause if stream stops again before any callback
// clears it.
set_pause_timestamp(in_state<Play>() ? timestamp
: timestamp - get_pause_time());
}
// Called with the current time when starting the stream.
void start(uint64_t timestamp)
{
assert(in_state<None>() || in_state<Pause>());
if (in_state<Pause>()) {
// Change to Resume and record elapsed time in it.
set_pause_time(timestamp - get_pause_timestamp());
} else {
set_state<Play>();
}
}
// Calculate how much time the stream bas been playing since last callback.
uint64_t elapsed_time_since_callback(uint64_t now,
uint64_t last_callback_timestamp)
{
if (in_state<Play>()) {
if (callback_timestamp != last_callback_timestamp) {
callback_timestamp = last_callback_timestamp;
}
return now - last_callback_timestamp;
} else if (in_state<Resume>()) {
if (callback_timestamp == last_callback_timestamp) {
// Stream was stopped and no callback emited yet: exclude elapsed time
// in Pause state.
return now - last_callback_timestamp - get_pause_time();
}
// Callback emitted: update callback timestamp and change to Play.
callback_timestamp = last_callback_timestamp;
set_state<Play>();
return now - last_callback_timestamp;
} else if (in_state<Pause>()) {
assert(callback_timestamp == last_callback_timestamp);
// Use recorded timestamps when Paused.
return get_pause_timestamp() - callback_timestamp;
} else {
assert(in_state<None>());
return 0;
}
}
// Called when reinitializing stream. The input parameter is how many frames
// have already been written to AAudio since the first initialization.
void reinit(uint64_t position)
{
init_position = position;
state = None{};
callback_timestamp = 0;
}
// Frame index when last reinitialized.
uint64_t initial_position() { return init_position; }
private:
template <typename T> void set_state() { state.emplace<T>(); }
template <typename T> bool in_state()
{
return std::holds_alternative<T>(state);
}
void set_pause_time(uint64_t time) { state.emplace<Resume>(time); }
uint64_t get_pause_time()
{
assert(in_state<Resume>());
return std::get<Resume>(state).pause_time;
}
void set_pause_timestamp(uint64_t timestamp)
{
state.emplace<Pause>(timestamp);
}
uint64_t get_pause_timestamp()
{
assert(in_state<Pause>());
return std::get<Pause>(state).timestamp;
}
struct None {};
struct Play {};
struct Pause {
Pause() = delete;
explicit Pause(uint64_t timestamp) : timestamp(timestamp) {}
uint64_t timestamp; // The time when stopping stream.
};
struct Resume {
Resume() = delete;
explicit Resume(uint64_t time) : pause_time(time) {}
uint64_t pause_time; // Elapsed time from stopping to starting stream.
};
std::variant<None, Play, Pause, Resume> state;
// Track input callback timestamp to detect callback emission.
uint64_t callback_timestamp{0};
// Track number of written frames to adjust position after reinitialization.
uint64_t init_position{0};
};
struct cubeb_stream {
/* Note: Must match cubeb_stream layout in cubeb.c. */
cubeb * context{};
void * user_ptr{};
std::atomic<bool> in_use{false};
std::atomic<bool> latency_metrics_available{false};
std::atomic<stream_state> state{stream_state::INIT};
std::atomic<bool> in_data_callback{false};
triple_buffer<AAudioTimingInfo> timing_info;
AAudioStream * ostream{};
AAudioStream * istream{};
cubeb_data_callback data_callback{};
cubeb_state_callback state_callback{};
cubeb_resampler * resampler{};
// mutex synchronizes access to the stream from the state thread
// and user-called functions. Everything that is accessed in the
// aaudio data (or error) callback is synchronized only via atomics.
// This lock is acquired for the entirety of the reinitialization period, when
// changing device.
std::mutex mutex;
std::vector<uint8_t> in_buf;
unsigned in_frame_size{}; // size of one input frame
unique_ptr<cubeb_stream_params> output_stream_params;
unique_ptr<cubeb_stream_params> input_stream_params;
uint32_t latency_frames{};
cubeb_sample_format out_format{};
uint32_t sample_rate{};
std::atomic<float> volume{1.f};
unsigned out_channels{};
unsigned out_frame_size{};
bool voice_input{};
bool voice_output{};
uint64_t previous_clock{};
position_estimate pos_estimate;
};
struct cubeb {
struct cubeb_ops const * ops{};
void * libaaudio{};
struct {
// The state thread: it waits for state changes and stops
// drained streams.
std::thread thread;
std::thread notifier;
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> join{false};
std::atomic<bool> waiting{false};
} state;
// streams[i].in_use signals whether a stream is used
struct cubeb_stream streams[MAX_STREAMS];
};
struct AutoInCallback {
AutoInCallback(cubeb_stream * stm) : stm(stm)
{
stm->in_data_callback.store(true);
}
~AutoInCallback() { stm->in_data_callback.store(false); }
cubeb_stream * stm;
};
// Returns when aaudio_stream's state is equal to desired_state.
// poll_frequency_ns is the duration that is slept in between asking for
// state updates and getting the new state.
// When waiting for a stream to stop, it is best to pick a value similar
// to the callback time because STOPPED will happen after
// draining.
static int
wait_for_state_change(AAudioStream * aaudio_stream,
aaudio_stream_state_t desired_state,
int64_t poll_frequency_ns)
{
aaudio_stream_state_t new_state;
do {
aaudio_result_t res = WRAP(AAudioStream_waitForStateChange)(
aaudio_stream, AAUDIO_STREAM_STATE_UNKNOWN, &new_state,
poll_frequency_ns);
if (res != AAUDIO_OK) {
LOG("AAudioStream_waitForStateChanged: %s",
WRAP(AAudio_convertResultToText)(res));
return CUBEB_ERROR;
}
} while (new_state != desired_state);
LOG("wait_for_state_change: current state now: %s",
cubeb_AAudio_convertStreamStateToText(new_state));
return CUBEB_OK;
}
// Only allowed from state thread, while mutex on stm is locked
static void
shutdown_with_error(cubeb_stream * stm)
{
if (stm->istream) {
WRAP(AAudioStream_requestStop)(stm->istream);
}
if (stm->ostream) {
WRAP(AAudioStream_requestStop)(stm->ostream);
}
int64_t poll_frequency_ns = NS_PER_S * stm->out_frame_size / stm->sample_rate;
int rv;
if (stm->istream) {
rv = wait_for_state_change(stm->istream, AAUDIO_STREAM_STATE_STOPPED,
poll_frequency_ns);
if (rv != CUBEB_OK) {
LOG("Failure when waiting for stream change on the input side when "
"shutting down in error");
// Not much we can do, carry on
}
}
if (stm->ostream) {
rv = wait_for_state_change(stm->ostream, AAUDIO_STREAM_STATE_STOPPED,
poll_frequency_ns);
if (rv != CUBEB_OK) {
LOG("Failure when waiting for stream change on the output side when "
"shutting down in error");
// Not much we can do, carry on
}
}
assert(!stm->in_data_callback.load());
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
stm->state.store(stream_state::SHUTDOWN);
}
// Returns whether the given state is one in which we wait for
// an asynchronous change
static bool
waiting_state(stream_state state)
{
switch (state) {
case stream_state::DRAINING:
case stream_state::STARTING:
case stream_state::STOPPING:
return true;
default:
return false;
}
}
static void
update_state(cubeb_stream * stm)
{
// Fast path for streams that don't wait for state change or are invalid
enum stream_state old_state = stm->state.load();
if (old_state == stream_state::INIT || old_state == stream_state::STARTED ||
old_state == stream_state::STOPPED ||
old_state == stream_state::SHUTDOWN) {
return;
}
// If the main thread currently operates on this thread, we don't
// have to wait for it
unique_lock lock(stm->mutex, std::try_to_lock);
if (!lock.owns_lock()) {
return;
}
// check again: if this is true now, the stream was destroyed or
// changed between our fast path check and locking the mutex
old_state = stm->state.load();
if (old_state == stream_state::INIT || old_state == stream_state::STARTED ||
old_state == stream_state::STOPPED ||
old_state == stream_state::SHUTDOWN) {
return;
}
// We compute the new state the stream has and then compare_exchange it
// if it has changed. This way we will never just overwrite state
// changes that were set from the audio thread in the meantime,
// such as a DRAINING or error state.
enum stream_state new_state;
do {
if (old_state == stream_state::SHUTDOWN) {
return;
}
if (old_state == stream_state::ERROR) {
shutdown_with_error(stm);
return;
}
new_state = old_state;
aaudio_stream_state_t istate = 0;
aaudio_stream_state_t ostate = 0;
// We use waitForStateChange (with zero timeout) instead of just
// getState since only the former internally updates the state.
// See the docs of aaudio getState/waitForStateChange for details,
// why we are passing STATE_UNKNOWN.
aaudio_result_t res;
if (stm->istream) {
res = WRAP(AAudioStream_waitForStateChange)(
stm->istream, AAUDIO_STREAM_STATE_UNKNOWN, &istate, 0);
if (res != AAUDIO_OK) {
LOG("AAudioStream_waitForStateChanged: %s",
WRAP(AAudio_convertResultToText)(res));
return;
}
assert(istate);
}
if (stm->ostream) {
res = WRAP(AAudioStream_waitForStateChange)(
stm->ostream, AAUDIO_STREAM_STATE_UNKNOWN, &ostate, 0);
if (res != AAUDIO_OK) {
LOG("AAudioStream_waitForStateChanged: %s",
WRAP(AAudio_convertResultToText)(res));
return;
}
assert(ostate);
}
// handle invalid stream states
if (istate == AAUDIO_STREAM_STATE_FLUSHING ||
istate == AAUDIO_STREAM_STATE_FLUSHED ||
istate == AAUDIO_STREAM_STATE_UNKNOWN ||
istate == AAUDIO_STREAM_STATE_DISCONNECTED) {
LOG("Unexpected android input stream state %s",
WRAP(AAudio_convertStreamStateToText)(istate));
shutdown_with_error(stm);
return;
}
if (ostate == AAUDIO_STREAM_STATE_FLUSHING ||
ostate == AAUDIO_STREAM_STATE_FLUSHED ||
ostate == AAUDIO_STREAM_STATE_UNKNOWN ||
ostate == AAUDIO_STREAM_STATE_DISCONNECTED) {
LOG("Unexpected android output stream state %s",
WRAP(AAudio_convertStreamStateToText)(istate));
shutdown_with_error(stm);
return;
}
switch (old_state) {
case stream_state::STARTING:
if ((!istate || istate == AAUDIO_STREAM_STATE_STARTED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_STARTED)) {
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STARTED);
new_state = stream_state::STARTED;
}
break;
case stream_state::DRAINING:
// The DRAINING state means that we want to stop the streams but
// may not have done so yet.
// The aaudio docs state that returning STOP from the callback isn't
// enough, the stream has to be stopped from another thread
// afterwards.
// No callbacks are triggered anymore when requestStop returns.
// That is important as we otherwise might read from a closed istream
// for a duplex stream.
// Therefor it is important to close ostream first.
if (ostate && ostate != AAUDIO_STREAM_STATE_STOPPING &&
ostate != AAUDIO_STREAM_STATE_STOPPED) {
res = WRAP(AAudioStream_requestStop)(stm->ostream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStop: %s",
WRAP(AAudio_convertResultToText)(res));
return;
}
}
if (istate && istate != AAUDIO_STREAM_STATE_STOPPING &&
istate != AAUDIO_STREAM_STATE_STOPPED) {
res = WRAP(AAudioStream_requestStop)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStop: %s",
WRAP(AAudio_convertResultToText)(res));
return;
}
}
// we always wait until both streams are stopped until we
// send CUBEB_STATE_DRAINED. Then we can directly transition
// our logical state to STOPPED, not triggering
// an additional CUBEB_STATE_STOPPED callback (which might
// be unexpected for the user).
if ((!ostate || ostate == AAUDIO_STREAM_STATE_STOPPED) &&
(!istate || istate == AAUDIO_STREAM_STATE_STOPPED)) {
new_state = stream_state::STOPPED;
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED);
}
break;
case stream_state::STOPPING:
assert(!istate || istate == AAUDIO_STREAM_STATE_PAUSING ||
istate == AAUDIO_STREAM_STATE_PAUSED);
assert(!ostate || ostate == AAUDIO_STREAM_STATE_PAUSING ||
ostate == AAUDIO_STREAM_STATE_PAUSED);
if ((!istate || istate == AAUDIO_STREAM_STATE_PAUSED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_PAUSED)) {
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STOPPED);
new_state = stream_state::STOPPED;
}
break;
default:
assert(false && "Unreachable: invalid state");
}
} while (old_state != new_state &&
!stm->state.compare_exchange_strong(old_state, new_state));
}
// why this is needed. The audio thread notifies the state thread about
// state changes and must not block. The state thread on the other hand should
// sleep until there is work to be done. So we need a lockfree producer
// and blocking producer. This can only be achieved safely with a new thread
// that only serves as notifier backup (in case the notification happens
// right between the state thread checking and going to sleep in which case
// this thread will kick in and signal it right again).
static void
notifier_thread(cubeb * ctx)
{
unique_lock lock(ctx->state.mutex);
while (!ctx->state.join.load()) {
ctx->state.cond.wait(lock);
if (ctx->state.waiting.load()) {
// This must signal our state thread since there is no other
// thread currently waiting on the condition variable.
// The state change thread is guaranteed to be waiting since
// we hold the mutex it locks when awake.
ctx->state.cond.notify_one();
}
}
// make sure other thread joins as well
ctx->state.cond.notify_one();
LOG("Exiting notifier thread");
}
static void
state_thread(cubeb * ctx)
{
unique_lock lock(ctx->state.mutex);
bool waiting = false;
while (!ctx->state.join.load()) {
waiting |= ctx->state.waiting.load();
if (waiting) {
ctx->state.waiting.store(false);
waiting = false;
for (auto & stream : ctx->streams) {
cubeb_stream * stm = &stream;
update_state(stm);
waiting |= waiting_state(atomic_load(&stm->state));
}
// state changed from another thread, update again immediately
if (ctx->state.waiting.load()) {
waiting = true;
continue;
}
// Not waiting for any change anymore: we can wait on the
// condition variable without timeout
if (!waiting) {
continue;
}
// while any stream is waiting for state change we sleep with regular
// timeouts. But we wake up immediately if signaled.
// This might seem like a poor man's implementation of state change
// waiting but (as of october 2020), the implementation of
// AAudioStream_waitForStateChange is just sleeping with regular
// timeouts as well:
auto dur = std::chrono::milliseconds(5);
ctx->state.cond.wait_for(lock, dur);
} else {
ctx->state.cond.wait(lock);
}
}
// make sure other thread joins as well
ctx->state.cond.notify_one();
LOG("Exiting state thread");
}
static char const *
aaudio_get_backend_id(cubeb * /* ctx */)
{
return "aaudio";
}
static int
aaudio_get_max_channel_count(cubeb * ctx, uint32_t * max_channels)
{
assert(ctx && max_channels);
// NOTE: we might get more, AAudio docs don't specify anything.
*max_channels = 2;
return CUBEB_OK;
}
static void
aaudio_destroy(cubeb * ctx)
{
assert(ctx);
#ifndef NDEBUG
// make sure all streams were destroyed
for (auto & stream : ctx->streams) {
assert(!stream.in_use.load());
}
#endif
// broadcast joining to both threads
// they will additionally signal each other before joining
ctx->state.join.store(true);
ctx->state.cond.notify_all();
if (ctx->state.thread.joinable()) {
ctx->state.thread.join();
}
if (ctx->state.notifier.joinable()) {
ctx->state.notifier.join();
}
#ifndef DISABLE_LIBAAUDIO_DLOPEN
if (ctx->libaaudio) {
dlclose(ctx->libaaudio);
}
#endif
delete ctx;
}
static void
apply_volume(cubeb_stream * stm, void * audio_data, uint32_t num_frames)
{
float volume = stm->volume.load();
// optimization: we don't have to change anything in this case
if (volume == 1.f) {
return;
}
switch (stm->out_format) {
case CUBEB_SAMPLE_S16NE: {
int16_t * integer_data = static_cast<int16_t *>(audio_data);
for (uint32_t i = 0u; i < num_frames * stm->out_channels; ++i) {
integer_data[i] =
static_cast<int16_t>(static_cast<float>(integer_data[i]) * volume);
}
break;
}
case CUBEB_SAMPLE_FLOAT32NE:
for (uint32_t i = 0u; i < num_frames * stm->out_channels; ++i) {
(static_cast<float *>(audio_data))[i] *= volume;
}
break;
default:
assert(false && "Unreachable: invalid stream out_format");
}
}
uint64_t
now_ns()
{
using namespace std::chrono;
return duration_cast<nanoseconds>(steady_clock::now().time_since_epoch())
.count();
}
// To be called from the real-time audio callback
uint64_t
aaudio_get_latency(cubeb_stream * stm, aaudio_direction_t direction,
uint64_t tstamp_ns)
{
bool is_output = direction == AAUDIO_DIRECTION_OUTPUT;
int64_t hw_frame_index;
int64_t hw_tstamp;
AAudioStream * stream = is_output ? stm->ostream : stm->istream;
// For an output stream (resp. input stream), get the number of frames
// written to (resp read from) the hardware.
int64_t app_frame_index = is_output
? WRAP(AAudioStream_getFramesWritten)(stream)
: WRAP(AAudioStream_getFramesRead)(stream);
assert(tstamp_ns < std::numeric_limits<uint64_t>::max());
int64_t signed_tstamp_ns = static_cast<int64_t>(tstamp_ns);
// Get a timestamp for a particular frame index written to or read from the
// hardware.
auto result = WRAP(AAudioStream_getTimestamp)(stream, CLOCK_MONOTONIC,
&hw_frame_index, &hw_tstamp);
if (result != AAUDIO_OK) {
LOG("AAudioStream_getTimestamp failure for %s: %s",
is_output ? "output" : "input",
WRAP(AAudio_convertResultToText)(result));
return 0;
}
// Compute the difference between the app and the hardware indices.
int64_t frame_index_delta = app_frame_index - hw_frame_index;
// Convert to ns
int64_t frame_time_delta = (frame_index_delta * NS_PER_S) / stm->sample_rate;
// Extrapolate from the known timestamp for a particular frame presented.
int64_t app_frame_hw_time = hw_tstamp + frame_time_delta;
// For an output stream, the latency is positive, for an input stream, it's
// negative. It can happen in some instances, e.g. around start of the stream
// that the latency for output is negative, return 0 in this case.
int64_t latency_ns = is_output
? std::max(static_cast<int64_t>(0),
app_frame_hw_time - signed_tstamp_ns)
: signed_tstamp_ns - app_frame_hw_time;
int64_t latency_frames = stm->sample_rate * latency_ns / NS_PER_S;
LOGV("Latency in frames (%s): %d (%dms)", is_output ? "output" : "input",
latency_frames, latency_ns / 1e6);
return latency_frames;
}
void
compute_and_report_latency_metrics(cubeb_stream * stm)
{
AAudioTimingInfo info = {};
info.tstamp = now_ns();
if (stm->ostream) {
uint64_t latency_frames =
aaudio_get_latency(stm, AAUDIO_DIRECTION_OUTPUT, info.tstamp);
if (latency_frames) {
info.output_latency = latency_frames;
info.output_frame_index =
WRAP(AAudioStream_getFramesWritten)(stm->ostream);
}
}
if (stm->istream) {
uint64_t latency_frames =
aaudio_get_latency(stm, AAUDIO_DIRECTION_INPUT, info.tstamp);
if (latency_frames) {
info.input_latency = latency_frames;
}
}
if (info.output_latency || info.input_latency) {
stm->latency_metrics_available = true;
stm->timing_info.write(info);
}
}
// Returning AAUDIO_CALLBACK_RESULT_STOP seems to put the stream in
// an invalid state. Seems like an AAudio bug/bad documentation.
// We therefore only return it on error.
static aaudio_data_callback_result_t
aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
void * audio_data, int32_t num_frames)
{
cubeb_stream * stm = (cubeb_stream *)user_data;
AutoInCallback aic(stm);
assert(stm->ostream == astream);
assert(stm->istream);
assert(num_frames >= 0);
stream_state state = atomic_load(&stm->state);
int istate = WRAP(AAudioStream_getState)(stm->istream);
int ostate = WRAP(AAudioStream_getState)(stm->ostream);
// all other states may happen since the callback might be called
// from within requestStart
assert(state != stream_state::SHUTDOWN);
// This might happen when we started draining but not yet actually
// stopped the stream from the state thread.
if (state == stream_state::DRAINING) {
LOG("Draining in duplex callback");
std::memset(audio_data, 0x0, num_frames * stm->out_frame_size);
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
if (num_frames * stm->in_frame_size > stm->in_buf.size()) {
LOG("Resizing input buffer in duplex callback");
stm->in_buf.resize(num_frames * stm->in_frame_size);
}
// The aaudio docs state that AAudioStream_read must not be called on
// the stream associated with a callback. But we call it on the input stream
// while this callback is for the output stream so this is ok.
// We also pass timeout 0, giving us strong non-blocking guarantees.
// This is exactly how it's done in the aaudio duplex example code snippet.
long in_num_frames =
WRAP(AAudioStream_read)(stm->istream, stm->in_buf.data(), num_frames, 0);
if (in_num_frames < 0) { // error
if (in_num_frames == AAUDIO_STREAM_STATE_DISCONNECTED) {
LOG("AAudioStream_read: %s (reinitializing)",
WRAP(AAudio_convertResultToText)(in_num_frames));
reinitialize_stream(stm);
} else {
stm->state.store(stream_state::ERROR);
}
LOG("AAudioStream_read: %s",
WRAP(AAudio_convertResultToText)(in_num_frames));
return AAUDIO_CALLBACK_RESULT_STOP;
}
ALOGV("aaudio duplex data cb on stream %p: state %ld (in: %d, out: %d), "
"num_frames: %ld, read: %ld",
(void *)stm, state, istate, ostate, num_frames, in_num_frames);
compute_and_report_latency_metrics(stm);
// This can happen shortly after starting the stream. AAudio might immediately
// begin to buffer output but not have any input ready yet. We could
// block AAudioStream_read (passing a timeout > 0) but that leads to issues
// since blocking in this callback is a bad idea in general and it might break
// the stream when it is stopped by another thread shortly after being
// started. We therefore simply send silent input to the application, as shown
// in the AAudio duplex stream code example.
if (in_num_frames < num_frames) {
// LOG("AAudioStream_read returned not enough frames: %ld instead of %d",
// in_num_frames, num_frames);
unsigned left = num_frames - in_num_frames;
uint8_t * buf = stm->in_buf.data() + in_num_frames * stm->in_frame_size;
std::memset(buf, 0x0, left * stm->in_frame_size);
in_num_frames = num_frames;
}
long done_frames =
cubeb_resampler_fill(stm->resampler, stm->in_buf.data(), &in_num_frames,
audio_data, num_frames);
if (done_frames < 0 || done_frames > num_frames) {
LOG("Error in data callback or resampler: %ld", done_frames);
stm->state.store(stream_state::ERROR);
return AAUDIO_CALLBACK_RESULT_STOP;
}
if (done_frames < num_frames) {
stm->state.store(stream_state::DRAINING);
stm->context->state.waiting.store(true);
stm->context->state.cond.notify_one();
char * begin =
static_cast<char *>(audio_data) + done_frames * stm->out_frame_size;
std::memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
}
apply_volume(stm, audio_data, done_frames);
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
static aaudio_data_callback_result_t
aaudio_output_data_cb(AAudioStream * astream, void * user_data,
void * audio_data, int32_t num_frames)
{
cubeb_stream * stm = (cubeb_stream *)user_data;
AutoInCallback aic(stm);
assert(stm->ostream == astream);
assert(!stm->istream);
assert(num_frames >= 0);
stream_state state = stm->state.load();
int ostate = WRAP(AAudioStream_getState)(stm->ostream);
ALOGV("aaudio output data cb on stream %p: state %ld (%d), num_frames: %ld",
stm, state, ostate, num_frames);
// all other states may happen since the callback might be called
// from within requestStart
assert(state != stream_state::SHUTDOWN);
// This might happen when we started draining but not yet actually
// stopped the stream from the state thread.
if (state == stream_state::DRAINING) {
std::memset(audio_data, 0x0, num_frames * stm->out_frame_size);
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
compute_and_report_latency_metrics(stm);
long done_frames = cubeb_resampler_fill(stm->resampler, nullptr, nullptr,
audio_data, num_frames);
if (done_frames < 0 || done_frames > num_frames) {
LOG("Error in data callback or resampler: %ld", done_frames);
stm->state.store(stream_state::ERROR);
return AAUDIO_CALLBACK_RESULT_STOP;
}
if (done_frames < num_frames) {
stm->state.store(stream_state::DRAINING);
stm->context->state.waiting.store(true);
stm->context->state.cond.notify_one();
char * begin =
static_cast<char *>(audio_data) + done_frames * stm->out_frame_size;
std::memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
}
apply_volume(stm, audio_data, done_frames);
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
static aaudio_data_callback_result_t
aaudio_input_data_cb(AAudioStream * astream, void * user_data,
void * audio_data, int32_t num_frames)
{
cubeb_stream * stm = (cubeb_stream *)user_data;
AutoInCallback aic(stm);
assert(stm->istream == astream);
assert(!stm->ostream);
assert(num_frames >= 0);
stream_state state = stm->state.load();
int istate = WRAP(AAudioStream_getState)(stm->istream);
ALOGV("aaudio input data cb on stream %p: state %ld (%d), num_frames: %ld",
stm, state, istate, num_frames);
// all other states may happen since the callback might be called
// from within requestStart
assert(state != stream_state::SHUTDOWN);
// This might happen when we started draining but not yet actually
// STOPPED the stream from the state thread.
if (state == stream_state::DRAINING) {
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
compute_and_report_latency_metrics(stm);
long input_frame_count = num_frames;
long done_frames = cubeb_resampler_fill(stm->resampler, audio_data,
&input_frame_count, nullptr, 0);
if (done_frames < 0 || done_frames > num_frames) {
LOG("Error in data callback or resampler: %ld", done_frames);
stm->state.store(stream_state::ERROR);
return AAUDIO_CALLBACK_RESULT_STOP;
}
if (done_frames < input_frame_count) {
// we don't really drain an input stream, just have to
// stop it from the state thread. That is signaled via the
// DRAINING state.
stm->state.store(stream_state::DRAINING);
stm->context->state.waiting.store(true);
stm->context->state.cond.notify_one();
}
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
static void
reinitialize_stream(cubeb_stream * stm)
{
// This cannot be done from within the error callback, bounce to another
// thread.
// In this situation, the lock is acquired for the entire duration of the
// function, so that this reinitialization period is atomic.
std::thread([stm] {
lock_guard lock(stm->mutex);
stream_state state = stm->state.load();
bool was_playing = state == stream_state::STARTED ||
state == stream_state::STARTING ||
state == stream_state::DRAINING;
int err = aaudio_stream_stop_locked(stm, lock);
// get total number of written frames before destroying the stream.
uint64_t total_frames = stm->pos_estimate.initial_position() +
WRAP(AAudioStream_getFramesWritten)(stm->ostream);
// error ignored.
aaudio_stream_destroy_locked(stm, lock);
err = aaudio_stream_init_impl(stm, lock);
assert(stm->in_use.load());
// set the new initial position.
stm->pos_estimate.reinit(total_frames);
if (err != CUBEB_OK) {
aaudio_stream_destroy_locked(stm, lock);
LOG("aaudio_stream_init_impl error while reiniting: %s",
WRAP(AAudio_convertResultToText)(err));
stm->state.store(stream_state::ERROR);
return;
}
if (was_playing) {
err = aaudio_stream_start_locked(stm, lock);
if (err != CUBEB_OK) {
aaudio_stream_destroy_locked(stm, lock);
LOG("aaudio_stream_start error while reiniting: %s",
WRAP(AAudio_convertResultToText)(err));
stm->state.store(stream_state::ERROR);
return;
}
}
}).detach();
}
static void
aaudio_error_cb(AAudioStream * astream, void * user_data, aaudio_result_t error)
{
cubeb_stream * stm = static_cast<cubeb_stream *>(user_data);
assert(stm->ostream == astream || stm->istream == astream);
// Device change -- reinitialize on the new default device.
if (error == AAUDIO_ERROR_DISCONNECTED || error == AAUDIO_ERROR_TIMEOUT) {
LOG("Audio device change, reinitializing stream");
reinitialize_stream(stm);
return;
}
LOG("AAudio error callback: %s", WRAP(AAudio_convertResultToText)(error));
stm->state.store(stream_state::ERROR);
}
static int
realize_stream(AAudioStreamBuilder * sb, const cubeb_stream_params * params,
AAudioStream ** stream, unsigned * frame_size)
{
aaudio_result_t res;
assert(params->rate);
assert(params->channels);
WRAP(AAudioStreamBuilder_setSampleRate)
(sb, static_cast<int32_t>(params->rate));
WRAP(AAudioStreamBuilder_setChannelCount)
(sb, static_cast<int32_t>(params->channels));
aaudio_format_t fmt;
switch (params->format) {
case CUBEB_SAMPLE_S16NE:
fmt = AAUDIO_FORMAT_PCM_I16;
*frame_size = sizeof(int16_t) * params->channels;
break;
case CUBEB_SAMPLE_FLOAT32NE:
fmt = AAUDIO_FORMAT_PCM_FLOAT;
*frame_size = sizeof(float) * params->channels;
break;
default:
return CUBEB_ERROR_INVALID_FORMAT;
}
WRAP(AAudioStreamBuilder_setFormat)(sb, fmt);
res = WRAP(AAudioStreamBuilder_openStream)(sb, stream);
if (res == AAUDIO_ERROR_INVALID_FORMAT) {
LOG("AAudio device doesn't support output format %d", fmt);
return CUBEB_ERROR_INVALID_FORMAT;
}
if (params->rate && res == AAUDIO_ERROR_INVALID_RATE) {
// The requested rate is not supported.
// Just try again with default rate, we create a resampler anyways
WRAP(AAudioStreamBuilder_setSampleRate)(sb, AAUDIO_UNSPECIFIED);
res = WRAP(AAudioStreamBuilder_openStream)(sb, stream);
LOG("Requested rate of %u is not supported, inserting resampler",
params->rate);
}
// When the app has no permission to record audio
// (android.permission.RECORD_AUDIO) but requested and input stream, this will
// return INVALID_ARGUMENT.
if (res != AAUDIO_OK) {
LOG("AAudioStreamBuilder_openStream: %s",
WRAP(AAudio_convertResultToText)(res));
return CUBEB_ERROR;
}
return CUBEB_OK;
}
static void
aaudio_stream_destroy(cubeb_stream * stm)
{
lock_guard lock(stm->mutex);
stm->in_use.store(false);
aaudio_stream_destroy_locked(stm, lock);
}
static void
aaudio_stream_destroy_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
{
assert(stm->state == stream_state::STOPPED ||
stm->state == stream_state::STOPPING ||
stm->state == stream_state::INIT ||
stm->state == stream_state::DRAINING ||
stm->state == stream_state::ERROR ||
stm->state == stream_state::SHUTDOWN);
aaudio_result_t res;
// No callbacks are triggered anymore when requestStop returns.
// That is important as we otherwise might read from a closed istream
// for a duplex stream.
if (stm->ostream) {
if (stm->state != stream_state::STOPPED &&
stm->state != stream_state::STOPPING &&
stm->state != stream_state::SHUTDOWN) {
res = WRAP(AAudioStream_requestStop)(stm->ostream);
if (res != AAUDIO_OK) {
LOG("AAudioStreamBuilder_requestStop: %s",
WRAP(AAudio_convertResultToText)(res));
}
}
WRAP(AAudioStream_close)(stm->ostream);
stm->ostream = nullptr;
}
if (stm->istream) {
if (stm->state != stream_state::STOPPED &&
stm->state != stream_state::STOPPING &&
stm->state != stream_state::SHUTDOWN) {
res = WRAP(AAudioStream_requestStop)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStreamBuilder_requestStop: %s",
WRAP(AAudio_convertResultToText)(res));
}
}
WRAP(AAudioStream_close)(stm->istream);
stm->istream = nullptr;
}
stm->timing_info.invalidate();
stm->previous_clock = 0;
stm->pos_estimate = {};
if (stm->resampler) {
cubeb_resampler_destroy(stm->resampler);
stm->resampler = nullptr;
}
stm->in_buf = {};
stm->in_frame_size = {};
stm->out_format = {};
stm->out_channels = {};
stm->out_frame_size = {};
stm->state.store(stream_state::INIT);
}
static int
aaudio_stream_init_impl(cubeb_stream * stm, lock_guard<mutex> & lock)
{
assert(stm->state.load() == stream_state::INIT);
cubeb_async_log_reset_threads();
aaudio_result_t res;
AAudioStreamBuilder * sb;
res = WRAP(AAudio_createStreamBuilder)(&sb);
if (res != AAUDIO_OK) {
LOG("AAudio_createStreamBuilder: %s",
WRAP(AAudio_convertResultToText)(res));
return CUBEB_ERROR;
}
// make sure the builder is always destroyed
struct StreamBuilderDestructor {
void operator()(AAudioStreamBuilder * sb)
{
WRAP(AAudioStreamBuilder_delete)(sb);
}
};
std::unique_ptr<AAudioStreamBuilder, StreamBuilderDestructor> sbPtr(sb);
WRAP(AAudioStreamBuilder_setErrorCallback)(sb, aaudio_error_cb, stm);
// Capacity should be at least twice the frames-per-callback to allow double
// buffering.
WRAP(AAudioStreamBuilder_setBufferCapacityInFrames)
(sb, static_cast<int32_t>(3 * stm->latency_frames));
AAudioStream_dataCallback in_data_callback{};
AAudioStream_dataCallback out_data_callback{};
if (stm->output_stream_params && stm->input_stream_params) {
out_data_callback = aaudio_duplex_data_cb;
in_data_callback = nullptr;
} else if (stm->input_stream_params) {
in_data_callback = aaudio_input_data_cb;
} else if (stm->output_stream_params) {
out_data_callback = aaudio_output_data_cb;
} else {
LOG("Tried to open stream without input or output parameters");
return CUBEB_ERROR;
}
#ifdef CUBEB_AAUDIO_EXCLUSIVE_STREAM
LOG("AAudio setting exclusive share mode for stream");
WRAP(AAudioStreamBuilder_setSharingMode)(sb, AAUDIO_SHARING_MODE_EXCLUSIVE);
#endif
if (stm->latency_frames <= POWERSAVE_LATENCY_FRAMES_THRESHOLD) {
LOG("AAudio setting low latency mode for stream");
WRAP(AAudioStreamBuilder_setPerformanceMode)
(sb, AAUDIO_PERFORMANCE_MODE_LOW_LATENCY);
} else {
LOG("AAudio setting power saving mode for stream");
WRAP(AAudioStreamBuilder_setPerformanceMode)
(sb, AAUDIO_PERFORMANCE_MODE_POWER_SAVING);
}
unsigned frame_size;
// initialize streams
// output
cubeb_stream_params out_params;
if (stm->output_stream_params) {
int output_preset = stm->voice_output ? AAUDIO_USAGE_VOICE_COMMUNICATION
: AAUDIO_USAGE_MEDIA;
WRAP(AAudioStreamBuilder_setUsage)(sb, output_preset);
WRAP(AAudioStreamBuilder_setDirection)(sb, AAUDIO_DIRECTION_OUTPUT);
WRAP(AAudioStreamBuilder_setDataCallback)(sb, out_data_callback, stm);
assert(stm->latency_frames < std::numeric_limits<int32_t>::max());
LOG("Frames per callback set to %d for output", stm->latency_frames);
WRAP(AAudioStreamBuilder_setFramesPerDataCallback)
(sb, static_cast<int32_t>(stm->latency_frames));
int res_err = realize_stream(sb, stm->output_stream_params.get(),
&stm->ostream, &frame_size);
if (res_err) {
return res_err;
}
int rate = WRAP(AAudioStream_getSampleRate)(stm->ostream);
LOG("AAudio output stream sharing mode: %d",
WRAP(AAudioStream_getSharingMode)(stm->ostream));
LOG("AAudio output stream performance mode: %d",
WRAP(AAudioStream_getPerformanceMode)(stm->ostream));
LOG("AAudio output stream buffer capacity: %d",
WRAP(AAudioStream_getBufferCapacityInFrames)(stm->ostream));
LOG("AAudio output stream buffer size: %d",
WRAP(AAudioStream_getBufferSizeInFrames)(stm->ostream));
LOG("AAudio output stream sample-rate: %d", rate);
stm->sample_rate = stm->output_stream_params->rate;
out_params = *stm->output_stream_params;
out_params.rate = rate;
stm->out_channels = stm->output_stream_params->channels;
stm->out_format = stm->output_stream_params->format;
stm->out_frame_size = frame_size;
stm->volume.store(1.f);
}
// input
cubeb_stream_params in_params;
if (stm->input_stream_params) {
// Match what the OpenSL backend does for now, we could use UNPROCESSED and
// VOICE_COMMUNICATION here, but we'd need to make it clear that
// application-level AEC and other voice processing should be disabled
// there.
int input_preset = stm->voice_input ? AAUDIO_INPUT_PRESET_VOICE_RECOGNITION
: AAUDIO_INPUT_PRESET_CAMCORDER;
WRAP(AAudioStreamBuilder_setInputPreset)(sb, input_preset);
WRAP(AAudioStreamBuilder_setDirection)(sb, AAUDIO_DIRECTION_INPUT);
WRAP(AAudioStreamBuilder_setDataCallback)(sb, in_data_callback, stm);
assert(stm->latency_frames < std::numeric_limits<int32_t>::max());
LOG("Frames per callback set to %d for input", stm->latency_frames);
WRAP(AAudioStreamBuilder_setFramesPerDataCallback)
(sb, static_cast<int32_t>(stm->latency_frames));
int res_err = realize_stream(sb, stm->input_stream_params.get(),
&stm->istream, &frame_size);
if (res_err) {
return res_err;
}
int bcap = WRAP(AAudioStream_getBufferCapacityInFrames)(stm->istream);
int rate = WRAP(AAudioStream_getSampleRate)(stm->istream);
LOG("AAudio input stream sharing mode: %d",
WRAP(AAudioStream_getSharingMode)(stm->istream));
LOG("AAudio input stream performance mode: %d",
WRAP(AAudioStream_getPerformanceMode)(stm->istream));
LOG("AAudio input stream buffer capacity: %d", bcap);
LOG("AAudio input stream buffer size: %d",
WRAP(AAudioStream_getBufferSizeInFrames)(stm->istream));
LOG("AAudio input stream buffer rate: %d", rate);
stm->in_buf.resize(bcap * frame_size);
assert(!stm->sample_rate ||
stm->sample_rate == stm->input_stream_params->rate);
stm->sample_rate = stm->input_stream_params->rate;
in_params = *stm->input_stream_params;
in_params.rate = rate;
stm->in_frame_size = frame_size;
}
// initialize resampler
stm->resampler = cubeb_resampler_create(
stm, stm->input_stream_params ? &in_params : nullptr,
stm->output_stream_params ? &out_params : nullptr, stm->sample_rate,
stm->data_callback, stm->user_ptr, CUBEB_RESAMPLER_QUALITY_DEFAULT,
CUBEB_RESAMPLER_RECLOCK_NONE);
if (!stm->resampler) {
LOG("Failed to create resampler");
return CUBEB_ERROR;
}
// the stream isn't started initially. We don't need to differentiate
// between a stream that was just initialized and one that played
// already but was stopped.
stm->state.store(stream_state::STOPPED);
LOG("Cubeb stream (%p) INIT success", (void *)stm);
return CUBEB_OK;
}
static int
aaudio_stream_init(cubeb * ctx, cubeb_stream ** stream,
char const * /* stream_name */, cubeb_devid input_device,
cubeb_stream_params * input_stream_params,
cubeb_devid output_device,
cubeb_stream_params * output_stream_params,
unsigned int latency_frames,
cubeb_data_callback data_callback,
cubeb_state_callback state_callback, void * user_ptr)
{
assert(!input_device);
assert(!output_device);
// atomically find a free stream.
cubeb_stream * stm = nullptr;
unique_lock<mutex> lock;
for (auto & stream : ctx->streams) {
// This check is only an optimization, we don't strictly need it
// since we check again after locking the mutex.
if (stream.in_use.load()) {
continue;
}
// if this fails, another thread initialized this stream
// between our check of in_use and this.
lock = unique_lock(stream.mutex, std::try_to_lock);
if (!lock.owns_lock()) {
continue;
}
if (stream.in_use.load()) {
lock = {};
continue;
}
stm = &stream;
break;
}
if (!stm) {
LOG("Error: maximum number of streams reached");
return CUBEB_ERROR;
}
stm->in_use.store(true);
stm->context = ctx;
stm->user_ptr = user_ptr;
stm->data_callback = data_callback;
stm->state_callback = state_callback;
stm->voice_input = input_stream_params &&
!!(input_stream_params->prefs & CUBEB_STREAM_PREF_VOICE);
stm->voice_output = output_stream_params &&
!!(output_stream_params->prefs & CUBEB_STREAM_PREF_VOICE);
stm->previous_clock = 0;
stm->latency_frames = latency_frames;
if (output_stream_params) {
stm->output_stream_params = std::make_unique<cubeb_stream_params>();
*(stm->output_stream_params) = *output_stream_params;
} else {
stm->output_stream_params = nullptr;
}
if (input_stream_params) {
stm->input_stream_params = std::make_unique<cubeb_stream_params>();
*(stm->input_stream_params) = *input_stream_params;
} else {
stm->input_stream_params = nullptr;
}
LOG("cubeb stream prefs: voice_input: %s voice_output: %s",
stm->voice_input ? "true" : "false",
stm->voice_output ? "true" : "false");
// This is ok: the thread is marked as being in use
lock.unlock();
int err;
{
lock_guard guard(stm->mutex);
err = aaudio_stream_init_impl(stm, guard);
}
if (err != CUBEB_OK) {
aaudio_stream_destroy(stm);
return err;
}
*stream = stm;
return CUBEB_OK;
}
static int
aaudio_stream_start(cubeb_stream * stm)
{
lock_guard lock(stm->mutex);
return aaudio_stream_start_locked(stm, lock);
}
static int
aaudio_stream_start_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
{
assert(stm && stm->in_use.load());
stream_state state = stm->state.load();
int istate = stm->istream ? WRAP(AAudioStream_getState)(stm->istream) : 0;
int ostate = stm->ostream ? WRAP(AAudioStream_getState)(stm->ostream) : 0;
LOGV("STARTING stream %p: %d (%d %d)", (void *)stm, state, istate, ostate);
switch (state) {
case stream_state::STARTED:
case stream_state::STARTING:
LOG("cubeb stream %p already STARTING/STARTED", (void *)stm);
return CUBEB_OK;
case stream_state::ERROR:
case stream_state::SHUTDOWN:
return CUBEB_ERROR;
case stream_state::INIT:
assert(false && "Invalid stream");
return CUBEB_ERROR;
case stream_state::STOPPED:
case stream_state::STOPPING:
case stream_state::DRAINING:
break;
}
aaudio_result_t res;
// Important to start istream before ostream.
// As soon as we start ostream, the callbacks might be triggered an we
// might read from istream (on duplex). If istream wasn't started yet
// this is a problem.
if (stm->istream) {
res = WRAP(AAudioStream_requestStart)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStart (istream): %s",
WRAP(AAudio_convertResultToText)(res));
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}
if (stm->ostream) {
res = WRAP(AAudioStream_requestStart)(stm->ostream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStart (ostream): %s",
WRAP(AAudio_convertResultToText)(res));
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}
int ret = CUBEB_OK;
bool success;
while (!(success = stm->state.compare_exchange_strong(
state, stream_state::STARTING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
// The stream will be stopped when shut down.
case stream_state::ERROR:
ret = CUBEB_ERROR;
break;
// The only situation in which the state could have switched to draining
// is if the callback was already fired and requested draining. Don't
// overwrite that. It's not an error either though.
case stream_state::DRAINING:
break;
// If the state switched [DRAINING -> STOPPING] or [DRAINING/STOPPING ->
// STOPPED] in the meantime, we can simply overwrite that since we
// restarted the stream.
case stream_state::STOPPING:
case stream_state::STOPPED:
continue;
// There is no situation in which the state could have been valid before
// but now in shutdown mode, since we hold the streams mutex.
// There is also no way that it switched *into* STARTING or
// STARTED mode.
default:
assert(false && "Invalid state change");
ret = CUBEB_ERROR;
break;
}
break;
}
if (success) {
stm->pos_estimate.start(now_ns());
stm->context->state.waiting.store(true);
stm->context->state.cond.notify_one();
}
return ret;
}
static int
aaudio_stream_stop(cubeb_stream * stm)
{
assert(stm && stm->in_use.load());
lock_guard lock(stm->mutex);
return aaudio_stream_stop_locked(stm, lock);
}
static int
aaudio_stream_stop_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
{
assert(stm && stm->in_use.load());
stream_state state = stm->state.load();
int istate = stm->istream ? WRAP(AAudioStream_getState)(stm->istream) : 0;
int ostate = stm->ostream ? WRAP(AAudioStream_getState)(stm->ostream) : 0;
LOG("STOPPING stream %p: %d (%d %d)", (void *)stm, state, istate, ostate);
switch (state) {
case stream_state::STOPPED:
case stream_state::STOPPING:
case stream_state::DRAINING:
LOG("cubeb stream %p already STOPPING/STOPPED", (void *)stm);
return CUBEB_OK;
case stream_state::ERROR:
case stream_state::SHUTDOWN:
return CUBEB_ERROR;
case stream_state::INIT:
assert(false && "Invalid stream");
return CUBEB_ERROR;
case stream_state::STARTED:
case stream_state::STARTING:
break;
}
aaudio_result_t res;
// No callbacks are triggered anymore when requestPause returns.
// That is important as we otherwise might read from a closed istream
// for a duplex stream.
// Therefor it is important to close ostream first.
if (stm->ostream) {
// Could use pause + flush here as well, the public cubeb interface
// doesn't state behavior.
res = WRAP(AAudioStream_requestPause)(stm->ostream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestPause (ostream): %s",
WRAP(AAudio_convertResultToText)(res));
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}
if (stm->istream) {
res = WRAP(AAudioStream_requestPause)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestPause (istream): %s",
WRAP(AAudio_convertResultToText)(res));
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}
int ret = CUBEB_OK;
bool success;
while (!(success = atomic_compare_exchange_strong(&stm->state, &state,
stream_state::STOPPING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
// The stream will be STOPPED when shut down.
case stream_state::ERROR:
ret = CUBEB_ERROR;
break;
// If it was switched to DRAINING in the meantime, it was or
// will be STOPPED soon anyways. We don't interfere with
// the DRAINING process, no matter in which state.
// Not an error
case stream_state::DRAINING:
case stream_state::STOPPING:
case stream_state::STOPPED:
break;
// If the state switched from STARTING to STARTED in the meantime
// we can simply overwrite that since we just STOPPED it.
case stream_state::STARTED:
continue;
// There is no situation in which the state could have been valid before
// but now in shutdown mode, since we hold the streams mutex.
// There is also no way that it switched *into* STARTING mode.
default:
assert(false && "Invalid state change");
ret = CUBEB_ERROR;
break;
}
break;
}
if (success) {
stm->pos_estimate.stop(now_ns());
stm->context->state.waiting.store(true);
stm->context->state.cond.notify_one();
}
return ret;
}
static int
aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position)
{
assert(stm && stm->in_use.load());
lock_guard lock(stm->mutex);
stream_state state = stm->state.load();
uint64_t init_position = stm->pos_estimate.initial_position();
AAudioStream * stream = stm->ostream ? stm->ostream : stm->istream;
switch (state) {
case stream_state::ERROR:
case stream_state::SHUTDOWN:
return CUBEB_ERROR;
case stream_state::DRAINING:
case stream_state::STOPPED:
case stream_state::STOPPING:
// getTimestamp is only valid when the stream is playing.
// Simply return the number of frames passed to aaudio
*position = init_position + WRAP(AAudioStream_getFramesRead)(stream);
if (*position < stm->previous_clock) {
*position = stm->previous_clock;
} else {
stm->previous_clock = *position;
}
return CUBEB_OK;
case stream_state::INIT:
assert(false && "Invalid stream");
return CUBEB_ERROR;
case stream_state::STARTED:
case stream_state::STARTING:
break;
}
// No callback yet, the stream hasn't really started.
if (stm->previous_clock == 0 && !stm->timing_info.updated()) {
LOG("Not timing info yet");
*position = init_position;
return CUBEB_OK;
}
AAudioTimingInfo info = stm->timing_info.read();
LOGV("AAudioTimingInfo idx:%lu tstamp:%lu latency:%u",
info.output_frame_index, info.tstamp, info.output_latency);
// Interpolate client side since the last callback.
uint64_t interpolation =
(stm->sample_rate *
stm->pos_estimate.elapsed_time_since_callback(now_ns(), info.tstamp) /
NS_PER_S);
*position = init_position + info.output_frame_index + interpolation -
info.output_latency;
if (*position < stm->previous_clock) {
*position = stm->previous_clock;
} else {
stm->previous_clock = *position;
}
LOG("aaudio_stream_get_position: %" PRIu64 " frames", *position);
return CUBEB_OK;
}
static int
aaudio_stream_get_latency(cubeb_stream * stm, uint32_t * latency)
{
if (!stm->ostream) {
LOG("error: aaudio_stream_get_latency on input-only stream");
return CUBEB_ERROR;
}
if (!stm->latency_metrics_available) {
LOG("Not timing info yet (output)");
return CUBEB_OK;
}
AAudioTimingInfo info = stm->timing_info.read();
*latency = info.output_latency;
LOG("aaudio_stream_get_latency, %u frames", *latency);
return CUBEB_OK;
}
static int
aaudio_stream_get_input_latency(cubeb_stream * stm, uint32_t * latency)
{
if (!stm->istream) {
LOG("error: aaudio_stream_get_input_latency on an output-only stream");
return CUBEB_ERROR;
}
if (!stm->latency_metrics_available) {
LOG("Not timing info yet (input)");
return CUBEB_OK;
}
AAudioTimingInfo info = stm->timing_info.read();
*latency = info.input_latency;
LOG("aaudio_stream_get_latency, %u frames", *latency);
return CUBEB_OK;
}
static int
aaudio_stream_set_volume(cubeb_stream * stm, float volume)
{
assert(stm && stm->in_use.load() && stm->ostream);
stm->volume.store(volume);
return CUBEB_OK;
}
aaudio_data_callback_result_t
dummy_callback(AAudioStream * stream, void * userData, void * audioData,
int32_t numFrames)
{
return AAUDIO_CALLBACK_RESULT_STOP;
}
// Returns a dummy stream with all default settings
static AAudioStream *
init_dummy_stream()
{
AAudioStreamBuilder * streamBuilder;
aaudio_result_t res;
res = WRAP(AAudio_createStreamBuilder)(&streamBuilder);
if (res != AAUDIO_OK) {
LOG("init_dummy_stream: AAudio_createStreamBuilder: %s",
WRAP(AAudio_convertResultToText)(res));
return nullptr;
}
WRAP(AAudioStreamBuilder_setDataCallback)
(streamBuilder, dummy_callback, nullptr);
WRAP(AAudioStreamBuilder_setPerformanceMode)
(streamBuilder, AAUDIO_PERFORMANCE_MODE_LOW_LATENCY);
AAudioStream * stream;
res = WRAP(AAudioStreamBuilder_openStream)(streamBuilder, &stream);
if (res != AAUDIO_OK) {
LOG("init_dummy_stream: AAudioStreamBuilder_openStream %s",
WRAP(AAudio_convertResultToText)(res));
return nullptr;
}
WRAP(AAudioStreamBuilder_delete)(streamBuilder);
return stream;
}
static void
destroy_dummy_stream(AAudioStream * stream)
{
WRAP(AAudioStream_close)(stream);
}
static int
aaudio_get_min_latency(cubeb * ctx, cubeb_stream_params params,
uint32_t * latency_frames)
{
AAudioStream * stream = init_dummy_stream();
if (!stream) {
return CUBEB_ERROR;
}
*latency_frames = WRAP(AAudioStream_getFramesPerBurst)(stream);
LOG("aaudio_get_min_latency: %u frames", *latency_frames);
destroy_dummy_stream(stream);
return CUBEB_OK;
}
int
aaudio_get_preferred_sample_rate(cubeb * ctx, uint32_t * rate)
{
AAudioStream * stream = init_dummy_stream();
if (!stream) {
return CUBEB_ERROR;
}
*rate = WRAP(AAudioStream_getSampleRate)(stream);
LOG("aaudio_get_preferred_sample_rate %uHz", *rate);
destroy_dummy_stream(stream);
return CUBEB_OK;
}
extern "C" int
aaudio_init(cubeb ** context, char const * context_name);
const static struct cubeb_ops aaudio_ops = {
/*.init =*/aaudio_init,
/*.get_backend_id =*/aaudio_get_backend_id,
/*.get_max_channel_count =*/aaudio_get_max_channel_count,
/* .get_min_latency =*/aaudio_get_min_latency,
/*.get_preferred_sample_rate =*/aaudio_get_preferred_sample_rate,
/*.get_supported_input_processing_params =*/nullptr,
/*.enumerate_devices =*/nullptr,
/*.device_collection_destroy =*/nullptr,
/*.destroy =*/aaudio_destroy,
/*.stream_init =*/aaudio_stream_init,
/*.stream_destroy =*/aaudio_stream_destroy,
/*.stream_start =*/aaudio_stream_start,
/*.stream_stop =*/aaudio_stream_stop,
/*.stream_get_position =*/aaudio_stream_get_position,
/*.stream_get_latency =*/aaudio_stream_get_latency,
/*.stream_get_input_latency =*/aaudio_stream_get_input_latency,
/*.stream_set_volume =*/aaudio_stream_set_volume,
/*.stream_set_name =*/nullptr,
/*.stream_get_current_device =*/nullptr,
/*.stream_set_input_mute =*/nullptr,
/*.stream_set_input_processing_params =*/nullptr,
/*.stream_device_destroy =*/nullptr,
/*.stream_register_device_changed_callback =*/nullptr,
/*.register_device_collection_changed =*/nullptr};
extern "C" /*static*/ int
aaudio_init(cubeb ** context, char const * /* context_name */)
{
if (android_get_device_api_level() <= 30) {
return CUBEB_ERROR;
}
// load api
void * libaaudio = nullptr;
#ifndef DISABLE_LIBAAUDIO_DLOPEN
libaaudio = dlopen("libaaudio.so", RTLD_NOW);
if (!libaaudio) {
return CUBEB_ERROR;
}
#define LOAD(x) \
{ \
cubeb_##x = (decltype(x) *)(dlsym(libaaudio, #x)); \
if (!WRAP(x)) { \
LOG("AAudio: Failed to load %s", #x); \
dlclose(libaaudio); \
return CUBEB_ERROR; \
} \
}
LIBAAUDIO_API_VISIT(LOAD);
#undef LOAD
#endif
cubeb * ctx = new cubeb;
ctx->ops = &aaudio_ops;
ctx->libaaudio = libaaudio;
ctx->state.thread = std::thread(state_thread, ctx);
// NOTE: using platform-specific APIs we could set the priority of the
// notifier thread lower than the priority of the state thread.
// This way, it's more likely that the state thread will be woken up
// by the condition variable signal when both are currently waiting
ctx->state.notifier = std::thread(notifier_thread, ctx);
*context = ctx;
return CUBEB_OK;
}