Revision control

Copy as Markdown

Other Tools

// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
#[cfg(target_os = "linux")]
use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo};
use audioipc::messages::SerializableHandle;
use audioipc::messages::{
CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams,
StreamInitParams, StreamParams,
};
use audioipc::shm::SharedMem;
use audioipc::{ipccore, rpccore, sys, PlatformHandle};
use cubeb::InputProcessingParams;
use cubeb_core as cubeb;
use cubeb_core::ffi;
use std::convert::TryInto;
use std::ffi::CStr;
use std::mem::size_of;
use std::os::raw::{c_long, c_void};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cell::RefCell, sync::Mutex};
use std::{panic, slice};
use crate::errors::*;
fn error(error: cubeb::Error) -> ClientMessage {
ClientMessage::Error(error.raw_code())
}
struct CubebDeviceCollectionManager {
servers: Mutex<Vec<(Rc<DeviceCollectionChangeCallback>, cubeb::DeviceType)>>,
}
impl CubebDeviceCollectionManager {
fn new() -> CubebDeviceCollectionManager {
CubebDeviceCollectionManager {
servers: Mutex::new(Vec::new()),
}
}
fn register(
&self,
context: &cubeb::Context,
server: &Rc<DeviceCollectionChangeCallback>,
devtype: cubeb::DeviceType,
) -> cubeb::Result<()> {
let mut servers = self.servers.lock().unwrap();
if servers.is_empty() {
self.internal_register(context, true)?;
}
servers.push((server.clone(), devtype));
Ok(())
}
fn unregister(
&self,
context: &cubeb::Context,
server: &Rc<DeviceCollectionChangeCallback>,
devtype: cubeb::DeviceType,
) -> cubeb::Result<()> {
let mut servers = self.servers.lock().unwrap();
servers.retain(|(s, d)| !Rc::ptr_eq(s, server) || d != &devtype);
if servers.is_empty() {
self.internal_register(context, false)?;
}
Ok(())
}
fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> {
for &(dir, cb) in &[
(
cubeb::DeviceType::INPUT,
device_collection_changed_input_cb_c as _,
),
(
cubeb::DeviceType::OUTPUT,
device_collection_changed_output_cb_c as _,
),
] {
unsafe {
context.register_device_collection_changed(
dir,
if enable { Some(cb) } else { None },
if enable {
self as *const CubebDeviceCollectionManager as *mut c_void
} else {
std::ptr::null_mut()
},
)?;
}
}
Ok(())
}
unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
let servers = self.servers.lock().unwrap();
servers.iter().for_each(|(s, d)| {
if d.contains(cubeb::DeviceType::from_bits_truncate(device_type)) {
s.device_collection_changed_callback(device_type)
}
});
}
}
impl Drop for CubebDeviceCollectionManager {
fn drop(&mut self) {
assert!(self.servers.lock().unwrap().is_empty());
}
}
struct DevIdMap {
devices: Vec<usize>,
}
// A cubeb_devid is an opaque type which may be implemented with a stable
// pointer in a cubeb backend. cubeb_devids received remotely must be
// validated before use, so DevIdMap provides a simple 1:1 mapping between a
// cubeb_devid and an IPC-transportable value suitable for use as a unique
// handle.
impl DevIdMap {
fn new() -> DevIdMap {
let mut d = DevIdMap {
devices: Vec::with_capacity(32),
};
// A null cubeb_devid is used for selecting the default device.
// Pre-populate the mapping with 0 -> 0 to handle nulls.
d.devices.push(0);
d
}
// Given a cubeb_devid, return a unique stable value suitable for use
// over IPC.
fn make_handle(&mut self, devid: usize) -> usize {
if let Some(i) = self.devices.iter().position(|&d| d == devid) {
return i;
}
self.devices.push(devid);
self.devices.len() - 1
}
// Given a handle produced by `make_handle`, return the associated
// cubeb_devid. Invalid handles result in a panic.
fn handle_to_id(&self, handle: usize) -> usize {
self.devices[handle]
}
}
struct CubebContextState {
// `manager` must be dropped before the `context` is destroyed.
manager: CubebDeviceCollectionManager,
context: cubeb::Result<cubeb::Context>,
}
thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = const { RefCell::new(None) });
fn cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context> {
let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
let context_name = Some(params.context_name.as_c_str());
let backend_name = params.backend_name.as_deref();
let r = cubeb::Context::init(context_name, backend_name);
r.inspect_err(|e| {
info!("cubeb::Context::init failed r={:?}", e);
})
}
fn with_local_context<T, F>(f: F) -> T
where
F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,
{
CONTEXT_KEY.with(|k| {
let mut state = k.borrow_mut();
if state.is_none() {
*state = Some(CubebContextState {
manager: CubebDeviceCollectionManager::new(),
context: cubeb_init_from_context_params(),
});
}
let CubebContextState { manager, context } = state.as_mut().unwrap();
// Always reattempt to initialize cubeb, OS config may have changed.
if context.is_err() {
*context = cubeb_init_from_context_params();
}
f(context, manager)
})
}
struct DeviceCollectionClient;
impl rpccore::Client for DeviceCollectionClient {
type ServerMessage = DeviceCollectionReq;
type ClientMessage = DeviceCollectionResp;
}
struct CallbackClient;
impl rpccore::Client for CallbackClient {
type ServerMessage = CallbackReq;
type ClientMessage = CallbackResp;
}
struct ServerStreamCallbacks {
/// Size of input frame in bytes
input_frame_size: u16,
/// Size of output frame in bytes
output_frame_size: u16,
/// Shared memory buffer for transporting audio data to/from client
shm: SharedMem,
/// RPC interface for data_callback (on OS audio thread) to server callback thread
data_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
/// RPC interface for state_callback (on any thread) to server callback thread
state_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
/// RPC interface for device_change_callback (on any thread) to server callback thread
device_change_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
/// Indicates stream is connected to client side. Callbacks received before
/// the stream is in the connected state cannot be sent to the client side, so
/// are logged and otherwise ignored.
connected: AtomicBool,
}
impl ServerStreamCallbacks {
fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize {
trace!(
"Stream data callback: {} {} {}",
nframes,
input.len(),
output.len()
);
if !self.connected.load(Ordering::Acquire) {
warn!("Stream data callback triggered before stream connected");
return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
}
if self.input_frame_size != 0 {
if input.len() > self.shm.get_size() {
debug!(
"bad input size: input={} shm={}",
input.len(),
self.shm.get_size()
);
return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
}
unsafe {
self.shm
.get_mut_slice(input.len())
.unwrap()
.copy_from_slice(input);
}
}
if self.output_frame_size != 0 && output.len() > self.shm.get_size() {
debug!(
"bad output size: output={} shm={}",
output.len(),
self.shm.get_size()
);
return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
}
if nframes == 0 {
// Optimization: skip the RPC call when there are no frames.
return 0;
}
let r = self.data_callback_rpc.call(CallbackReq::Data {
nframes,
input_frame_size: self.input_frame_size as usize,
output_frame_size: self.output_frame_size as usize,
});
match r {
Ok(CallbackResp::Data(frames)) => {
if frames >= 0 && self.output_frame_size != 0 {
let nbytes = frames as usize * self.output_frame_size as usize;
unsafe {
output[..nbytes].copy_from_slice(self.shm.get_slice(nbytes).unwrap());
}
}
frames
}
_ => {
debug!("Unexpected message {:?} during data_callback", r);
cubeb::ffi::CUBEB_ERROR.try_into().unwrap()
}
}
}
fn state_callback(&self, state: cubeb::State) {
trace!("Stream state callback: {:?}", state);
if !self.connected.load(Ordering::Acquire) {
warn!("Stream state callback triggered before stream connected");
return;
}
let r = self
.state_callback_rpc
.call(CallbackReq::State(state.into()));
match r {
Ok(CallbackResp::State) => {}
_ => {
debug!("Unexpected message {:?} during state callback", r);
}
}
}
fn device_change_callback(&self) {
trace!("Stream device change callback");
if !self.connected.load(Ordering::Acquire) {
warn!("Stream device_change callback triggered before stream connected");
return;
}
let r = self
.device_change_callback_rpc
.call(CallbackReq::DeviceChange);
match r {
Ok(CallbackResp::DeviceChange) => {}
_ => {
debug!("Unexpected message {:?} during device change callback", r);
}
}
}
}
static SHM_ID: AtomicUsize = AtomicUsize::new(0);
// Generate a temporary shm_id fragment that is unique to the process. This
// path is used temporarily to create a shm segment, which is then
// immediately deleted from the filesystem while retaining handles to the
// shm to be shared between the server and client.
fn get_shm_id() -> String {
format!(
"cubeb-shm-{}-{}",
std::process::id(),
SHM_ID.fetch_add(1, Ordering::SeqCst)
)
}
struct ServerStream {
stream: Option<cubeb::Stream>,
cbs: Box<ServerStreamCallbacks>,
client_pipe: Option<PlatformHandle>,
}
impl Drop for ServerStream {
fn drop(&mut self) {
// `stream` *must* be dropped before `cbs`.
drop(self.stream.take());
}
}
struct DeviceCollectionChangeCallback {
rpc: rpccore::Proxy<DeviceCollectionReq, DeviceCollectionResp>,
}
impl DeviceCollectionChangeCallback {
fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
// TODO: Assert device_type is in devtype.
debug!(
"Sending device collection ({:?}) changed event",
device_type
);
let _ = self
.rpc
.call(DeviceCollectionReq::DeviceChange(device_type));
}
}
pub struct CubebServer {
callback_thread: ipccore::EventLoopHandle,
device_collection_thread: ipccore::EventLoopHandle,
streams: slab::Slab<ServerStream>,
remote_pid: Option<u32>,
device_collection_change_callbacks: Option<Rc<DeviceCollectionChangeCallback>>,
devidmap: DevIdMap,
shm_area_size: usize,
}
impl Drop for CubebServer {
fn drop(&mut self) {
if let Some(device_collection_change_callbacks) = &self.device_collection_change_callbacks {
debug!("CubebServer: dropped with device_collection_change_callbacks registered");
CONTEXT_KEY.with(|k| {
let mut state = k.borrow_mut();
if let Some(CubebContextState {
manager,
context: Ok(context),
}) = state.as_mut()
{
for devtype in [cubeb::DeviceType::INPUT, cubeb::DeviceType::OUTPUT] {
let r = manager.unregister(
context,
device_collection_change_callbacks,
devtype,
);
if r.is_err() {
debug!("CubebServer: unregister failed: {:?}", r);
}
}
}
})
}
}
}
#[allow(unknown_lints)] // non_send_fields_in_send_ty is Nightly-only as of 2021-11-29.
#[allow(clippy::non_send_fields_in_send_ty)]
// XXX: required for server setup, verify this is safe.
unsafe impl Send for CubebServer {}
impl rpccore::Server for CubebServer {
type ServerMessage = ServerMessage;
type ClientMessage = ClientMessage;
fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
if let ServerMessage::ClientConnect(pid) = req {
self.remote_pid = Some(pid);
}
with_local_context(|context, manager| match *context {
Err(_) => error(cubeb::Error::error()),
Ok(ref context) => self.process_msg(context, manager, &req),
})
}
}
// Debugging for BMO 1594216/1612044.
macro_rules! try_stream {
($self:expr, $stm_tok:expr) => {
if $self.streams.contains($stm_tok) {
$self.streams[$stm_tok]
.stream
.as_mut()
.expect("uninitialized stream")
} else {
error!(
"{}:{}:{} - Stream({}): invalid token",
file!(),
line!(),
column!(),
$stm_tok
);
return error(cubeb::Error::invalid_parameter());
}
};
}
impl CubebServer {
pub fn new(
callback_thread: ipccore::EventLoopHandle,
device_collection_thread: ipccore::EventLoopHandle,
shm_area_size: usize,
) -> Self {
CubebServer {
callback_thread,
device_collection_thread,
streams: slab::Slab::<ServerStream>::new(),
remote_pid: None,
device_collection_change_callbacks: None,
devidmap: DevIdMap::new(),
shm_area_size,
}
}
// Process a request coming from the client.
fn process_msg(
&mut self,
context: &cubeb::Context,
manager: &mut CubebDeviceCollectionManager,
msg: &ServerMessage,
) -> ClientMessage {
let resp: ClientMessage = match *msg {
ServerMessage::ClientConnect(_) => {
// remote_pid is set before cubeb initialization, just verify here.
assert!(self.remote_pid.is_some());
ClientMessage::ClientConnected
}
ServerMessage::ClientDisconnect => {
// TODO:
//self.connection.client_disconnect();
ClientMessage::ClientDisconnected
}
ServerMessage::ContextGetBackendId => {
ClientMessage::ContextBackendId(context.backend_id().to_string())
}
ServerMessage::ContextGetMaxChannelCount => context
.max_channel_count()
.map(ClientMessage::ContextMaxChannelCount)
.unwrap_or_else(error),
ServerMessage::ContextGetMinLatency(ref params) => {
let format = cubeb::SampleFormat::from(params.format);
let layout = cubeb::ChannelLayout::from(params.layout);
let params = cubeb::StreamParamsBuilder::new()
.format(format)
.rate(params.rate)
.channels(params.channels)
.layout(layout)
.take();
context
.min_latency(&params)
.map(ClientMessage::ContextMinLatency)
.unwrap_or_else(error)
}
ServerMessage::ContextGetPreferredSampleRate => context
.preferred_sample_rate()
.map(ClientMessage::ContextPreferredSampleRate)
.unwrap_or_else(error),
ServerMessage::ContextGetSupportedInputProcessingParams => context
.supported_input_processing_params()
.map(|params| ClientMessage::ContextSupportedInputProcessingParams(params.bits()))
.unwrap_or_else(error),
ServerMessage::ContextGetDeviceEnumeration(device_type) => context
.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
.map(|devices| {
let v: Vec<DeviceInfo> = devices
.iter()
.map(|i| {
let mut tmp: DeviceInfo = i.as_ref().into();
// Replace each cubeb_devid with a unique handle suitable for IPC.
tmp.devid = self.devidmap.make_handle(tmp.devid);
tmp
})
.collect();
ClientMessage::ContextEnumeratedDevices(v)
})
.unwrap_or_else(error),
ServerMessage::StreamCreate(ref params) => self
.process_stream_create(params)
.unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamInit(stm_tok, ref params) => self
.process_stream_init(context, stm_tok, params)
.unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamDestroy(stm_tok) => {
if self.streams.contains(stm_tok) {
debug!("Unregistering stream {:?}", stm_tok);
self.streams.remove(stm_tok);
} else {
// Debugging for BMO 1594216/1612044.
error!("StreamDestroy({}): invalid token", stm_tok);
return error(cubeb::Error::invalid_parameter());
}
ClientMessage::StreamDestroyed
}
ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok)
.start()
.map(|_| ClientMessage::StreamStarted)
.unwrap_or_else(error),
ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok)
.stop()
.map(|_| ClientMessage::StreamStopped)
.unwrap_or_else(error),
ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok)
.position()
.map(ClientMessage::StreamPosition)
.unwrap_or_else(error),
ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok)
.latency()
.map(ClientMessage::StreamLatency)
.unwrap_or_else(error),
ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok)
.input_latency()
.map(ClientMessage::StreamInputLatency)
.unwrap_or_else(error),
ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok)
.set_volume(volume)
.map(|_| ClientMessage::StreamVolumeSet)
.unwrap_or_else(error),
ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok)
.set_name(name)
.map(|_| ClientMessage::StreamNameSet)
.unwrap_or_else(error),
ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok)
.current_device()
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error),
ServerMessage::StreamSetInputMute(stm_tok, mute) => try_stream!(self, stm_tok)
.set_input_mute(mute)
.map(|_| ClientMessage::StreamInputMuteSet)
.unwrap_or_else(error),
ServerMessage::StreamSetInputProcessingParams(stm_tok, params) => {
try_stream!(self, stm_tok)
.set_input_processing_params(InputProcessingParams::from_bits_truncate(params))
.map(|_| ClientMessage::StreamInputProcessingParamsSet)
.unwrap_or_else(error)
}
ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
try_stream!(self, stm_tok)
.register_device_changed_callback(if enable {
Some(device_change_cb_c)
} else {
None
})
.map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
.unwrap_or_else(error)
}
ServerMessage::ContextSetupDeviceCollectionCallback => {
let (server_pipe, client_pipe) = match sys::make_pipe_pair() {
Ok((server_pipe, client_pipe)) => (server_pipe, client_pipe),
Err(e) => {
debug!(
"ContextSetupDeviceCollectionCallback - make_pipe_pair failed: {:?}",
e
);
return error(cubeb::Error::error());
}
};
// TODO: this should bind the client_pipe and send the server_pipe to the remote, but
// additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle.
// TODO: Use the rpc_thread instead of an extra device_collection_thread, but a reentrant bind_client
// is required to support that.
let rpc = match self
.device_collection_thread
.bind_client::<DeviceCollectionClient>(server_pipe)
{
Ok(rpc) => rpc,
Err(e) => {
debug!(
"ContextSetupDeviceCollectionCallback - bind_client: {:?}",
e
);
return error(cubeb::Error::error());
}
};
self.device_collection_change_callbacks =
Some(Rc::new(DeviceCollectionChangeCallback { rpc }));
let fd = RegisterDeviceCollectionChanged {
platform_handle: SerializableHandle::new(client_pipe, self.remote_pid.unwrap()),
};
ClientMessage::ContextSetupDeviceCollectionCallback(fd)
}
ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self
.process_register_device_collection_changed(
context,
manager,
cubeb::DeviceType::from_bits_truncate(device_type),
enable,
)
.unwrap_or_else(error),
#[cfg(target_os = "linux")]
ServerMessage::PromoteThreadToRealTime(thread_info) => {
let info = RtPriorityThreadInfo::deserialize(thread_info);
match promote_thread_to_real_time(info, 0, 48000) {
Ok(_) => {
info!("Promotion of content process thread to real-time OK");
}
Err(_) => {
warn!("Promotion of content process thread to real-time error");
}
}
ClientMessage::ThreadPromoted
}
};
trace!("process_msg: req={:?}, resp={:?}", msg, resp);
resp
}
fn process_register_device_collection_changed(
&mut self,
context: &cubeb::Context,
manager: &mut CubebDeviceCollectionManager,
devtype: cubeb::DeviceType,
enable: bool,
) -> cubeb::Result<ClientMessage> {
if devtype == cubeb::DeviceType::UNKNOWN {
return Err(cubeb::Error::invalid_parameter());
}
assert!(self.device_collection_change_callbacks.is_some());
let cbs = self.device_collection_change_callbacks.as_ref().unwrap();
if enable {
manager.register(context, cbs, devtype)
} else {
manager.unregister(context, cbs, devtype)
}
.map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged)
}
// Stream create is special, so it's been separated from process_msg.
fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> {
fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
params
.map(|p| {
let format = p.format.into();
let sample_size = match format {
cubeb::SampleFormat::S16LE
| cubeb::SampleFormat::S16BE
| cubeb::SampleFormat::S16NE => 2,
cubeb::SampleFormat::Float32LE
| cubeb::SampleFormat::Float32BE
| cubeb::SampleFormat::Float32NE => 4,
};
let channel_count = p.channels as u16;
sample_size * channel_count
})
.unwrap_or(0u16)
}
// Create the callback handling struct which is attached the cubeb stream.
let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
// Estimate a safe shmem size for this stream configuration. If the server was configured with a fixed
// shm_area_size override, use that instead.
// TODO: Add a new cubeb API to query the precise buffer size required for a given stream config.
let shm_area_size = if self.shm_area_size == 0 {
let frame_size = output_frame_size.max(input_frame_size) as u32;
let in_rate = params.input_stream_params.map(|p| p.rate).unwrap_or(0);
let out_rate = params.output_stream_params.map(|p| p.rate).unwrap_or(0);
let rate = out_rate.max(in_rate);
// 1s of audio, rounded up to the nearest 64kB.
// Stream latency is capped at 1s in process_stream_init.
(((rate * frame_size) + 0xffff) & !0xffff) as usize
} else {
self.shm_area_size
};
debug!("shm_area_size = {}", shm_area_size);
let shm = SharedMem::new(&get_shm_id(), shm_area_size)?;
let shm_handle = unsafe { shm.make_handle()? };
let (server_pipe, client_pipe) = sys::make_pipe_pair()?;
// TODO: this should bind the client_pipe and send the server_pipe to the remote, but
// additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle.
let rpc = self
.callback_thread
.bind_client::<CallbackClient>(server_pipe)?;
let cbs = Box::new(ServerStreamCallbacks {
input_frame_size,
output_frame_size,
shm,
state_callback_rpc: rpc.clone(),
device_change_callback_rpc: rpc.clone(),
data_callback_rpc: rpc,
connected: AtomicBool::new(false),
});
let entry = self.streams.vacant_entry();
let key = entry.key();
debug!("Registering stream {:?}", key);
entry.insert(ServerStream {
stream: None,
cbs,
client_pipe: Some(client_pipe),
});
Ok(ClientMessage::StreamCreated(StreamCreate {
token: key,
shm_handle: SerializableHandle::new(shm_handle, self.remote_pid.unwrap()),
shm_area_size,
}))
}
// Stream init is special, so it's been separated from process_msg.
fn process_stream_init(
&mut self,
context: &cubeb::Context,
stm_tok: usize,
params: &StreamInitParams,
) -> Result<ClientMessage> {
// Create cubeb stream from params
let stream_name = params
.stream_name
.as_ref()
.and_then(|name| CStr::from_bytes_with_nul(name).ok());
// Map IPC handle back to cubeb_devid.
let input_device = self.devidmap.handle_to_id(params.input_device) as *const _;
let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
});
// Map IPC handle back to cubeb_devid.
let output_device = self.devidmap.handle_to_id(params.output_device) as *const _;
let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
});
// TODO: Manage stream latency requests with respect to the RT deadlines the callback_thread was configured for.
fn round_up_pow2(v: u32) -> u32 {
debug_assert!(v >= 1);
1 << (32 - (v - 1).leading_zeros())
}
let rate = params
.output_stream_params
.map(|p| p.rate)
.unwrap_or_else(|| params.input_stream_params.map(|p| p.rate).unwrap());
// Note: minimum latency supported by AudioIPC is currently ~5ms. This restriction may be reduced by later IPC improvements.
let min_latency = round_up_pow2(5 * rate / 1000);
// Note: maximum latency is restricted by the SharedMem size.
let max_latency = rate;
let latency = params.latency_frames.clamp(min_latency, max_latency);
trace!(
"stream rate={} latency requested={} calculated={}",
rate,
params.latency_frames,
latency
);
let server_stream = &mut self.streams[stm_tok];
assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
let stream = unsafe {
let stream = context.stream_init(
stream_name,
input_device,
input_stream_params,
output_device,
output_stream_params,
latency,
Some(data_cb_c),
Some(state_cb_c),
user_ptr,
);
match stream {
Ok(stream) => stream,
Err(e) => {
debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e);
self.streams.remove(stm_tok);
return Err(e.into());
}
}
};
server_stream.stream = Some(stream);
let client_pipe = server_stream
.client_pipe
.take()
.expect("invalid state after StreamCreated");
server_stream.cbs.connected.store(true, Ordering::Release);
Ok(ClientMessage::StreamInitialized(SerializableHandle::new(
client_pipe,
self.remote_pid.unwrap(),
)))
}
}
// C callable callbacks
unsafe extern "C" fn data_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
input_buffer: *const c_void,
output_buffer: *mut c_void,
nframes: c_long,
) -> c_long {
let ok = panic::catch_unwind(|| {
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
let input = if input_buffer.is_null() {
&[]
} else {
let nbytes = nframes * c_long::from(cbs.input_frame_size);
slice::from_raw_parts(input_buffer as *const u8, nbytes as usize)
};
let output: &mut [u8] = if output_buffer.is_null() {
&mut []
} else {
let nbytes = nframes * c_long::from(cbs.output_frame_size);
slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize)
};
cbs.data_callback(input, output, nframes as isize) as c_long
});
ok.unwrap_or(cubeb::ffi::CUBEB_ERROR as c_long)
}
unsafe extern "C" fn state_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
state: ffi::cubeb_state,
) {
let ok = panic::catch_unwind(|| {
let state = cubeb::State::from(state);
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
cbs.state_callback(state);
});
ok.expect("State callback panicked");
}
unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) {
let ok = panic::catch_unwind(|| {
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
cbs.device_change_callback();
});
ok.expect("Device change callback panicked");
}
unsafe extern "C" fn device_collection_changed_input_cb_c(
_: *mut ffi::cubeb,
user_ptr: *mut c_void,
) {
let ok = panic::catch_unwind(|| {
let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT);
});
ok.expect("Collection changed (input) callback panicked");
}
unsafe extern "C" fn device_collection_changed_output_cb_c(
_: *mut ffi::cubeb,
user_ptr: *mut c_void,
) {
let ok = panic::catch_unwind(|| {
let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT);
});
ok.expect("Collection changed (output) callback panicked");
}