Source code
Revision control
Copy as Markdown
Other Tools
use std::{marker::PhantomData, mem::MaybeUninit, sync::Once};
use crate::Error;
pub(crate) struct JobToken(PhantomData<()>);
impl JobToken {
fn new() -> Self {
Self(PhantomData)
}
}
impl Drop for JobToken {
fn drop(&mut self) {
match JobTokenServer::new() {
JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(),
JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(),
}
}
}
enum JobTokenServer {
Inherited(inherited_jobserver::JobServer),
InProcess(inprocess_jobserver::JobServer),
}
impl JobTokenServer {
/// This function returns a static reference to the jobserver because
/// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might
/// be closed by other jobserver users in the process) and better do it
/// at the start of the program.
/// - in case a jobserver cannot be created from env (e.g. it's not
/// present), we will create a global in-process only jobserver
/// that has to be static so that it will be shared by all cc
/// compilation.
fn new() -> &'static Self {
static INIT: Once = Once::new();
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
unsafe {
INIT.call_once(|| {
let server = inherited_jobserver::JobServer::from_env()
.map(Self::Inherited)
.unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()));
JOBSERVER = MaybeUninit::new(server);
});
// TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
&*JOBSERVER.as_ptr()
}
}
}
pub(crate) enum ActiveJobTokenServer {
Inherited(inherited_jobserver::ActiveJobServer<'static>),
InProcess(&'static inprocess_jobserver::JobServer),
}
impl ActiveJobTokenServer {
pub(crate) fn new() -> Result<Self, Error> {
match JobTokenServer::new() {
JobTokenServer::Inherited(inherited_jobserver) => {
inherited_jobserver.enter_active().map(Self::Inherited)
}
JobTokenServer::InProcess(inprocess_jobserver) => {
Ok(Self::InProcess(inprocess_jobserver))
}
}
}
pub(crate) async fn acquire(&self) -> Result<JobToken, Error> {
match &self {
Self::Inherited(jobserver) => jobserver.acquire().await,
Self::InProcess(jobserver) => Ok(jobserver.acquire().await),
}
}
}
mod inherited_jobserver {
use super::JobToken;
use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
use std::{
io, mem,
sync::{mpsc, Mutex, MutexGuard, PoisonError},
};
pub(super) struct JobServer {
/// Implicit token for this process which is obtained and will be
/// released in parent. Since JobTokens only give back what they got,
/// there should be at most one global implicit token in the wild.
///
/// Since Rust does not execute any `Drop` for global variables,
/// we can't just put it back to jobserver and then re-acquire it at
/// the end of the process.
///
/// Use `Mutex` to avoid race between acquire and release.
/// If an `AtomicBool` is used, then it's possible for:
/// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already
/// set to `true`, continue to release it to jobserver
/// - `acquire` takes the global implicit token, set `global_implicit_token` to false
/// - `release_token_raw` now writes the token back into the jobserver, while
/// `global_implicit_token` is `false`
///
/// If the program exits here, then cc effectively increases parallelism by one, which is
/// incorrect, hence we use a `Mutex` here.
global_implicit_token: Mutex<bool>,
inner: jobserver::Client,
}
impl JobServer {
pub(super) unsafe fn from_env() -> Option<Self> {
jobserver::Client::from_env().map(|inner| Self {
inner,
global_implicit_token: Mutex::new(true),
})
}
fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> {
self.global_implicit_token
.lock()
.unwrap_or_else(PoisonError::into_inner)
}
/// All tokens except for the global implicit token will be put back into the jobserver
/// immediately and they cannot be cached, since Rust does not call `Drop::drop` on
/// global variables.
pub(super) fn release_token_raw(&self) {
let mut global_implicit_token = self.get_global_implicit_token();
if *global_implicit_token {
// There's already a global implicit token, so this token must
// be released back into jobserver.
//
// `release_raw` should not block
let _ = self.inner.release_raw();
} else {
*global_implicit_token = true;
}
}
pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
ActiveJobServer::new(self)
}
}
pub(crate) struct ActiveJobServer<'a> {
jobserver: &'a JobServer,
helper_thread: jobserver::HelperThread,
/// When rx is dropped, all the token stored within it will be dropped.
rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
}
impl<'a> ActiveJobServer<'a> {
fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel();
Ok(Self {
rx,
helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
let _ = tx.send(res);
})?,
jobserver,
})
}
pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
let mut has_requested_token = false;
loop {
// Fast path
if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) {
break Ok(JobToken::new());
}
// Cold path, no global implicit token, obtain one
match self.rx.try_recv() {
Ok(res) => {
let acquired = res?;
acquired.drop_without_releasing();
break Ok(JobToken::new());
}
Err(mpsc::TryRecvError::Disconnected) => {
break Err(Error::new(
ErrorKind::JobserverHelpThreadError,
"jobserver help thread has returned before ActiveJobServer is dropped",
))
}
Err(mpsc::TryRecvError::Empty) => {
if !has_requested_token {
self.helper_thread.request_token();
has_requested_token = true;
}
YieldOnce::default().await
}
}
}
}
}
}
mod inprocess_jobserver {
use super::JobToken;
use crate::parallel::async_executor::YieldOnce;
use std::{
env::var,
sync::atomic::{
AtomicU32,
Ordering::{AcqRel, Acquire},
},
};
pub(crate) struct JobServer(AtomicU32);
impl JobServer {
pub(super) fn new() -> Self {
// Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise
// just fall back to a semi-reasonable number.
//
// Note that we could use `num_cpus` here but it's an extra
// dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
// TODO: Use std::thread::available_parallelism as an upper bound
// when MSRV is bumped.
if let Ok(amt) = var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}
Self(AtomicU32::new(parallelism))
}
pub(super) async fn acquire(&self) -> JobToken {
loop {
let res = self
.0
.fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));
if res.is_ok() {
break JobToken::new();
}
YieldOnce::default().await
}
}
pub(super) fn release_token_raw(&self) {
self.0.fetch_add(1, AcqRel);
}
}
}