Revision control

Copy as Markdown

Other Tools

#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
use tokio::{runtime, task, time};
use tokio_test::assert_ok;
use std::thread;
use std::time::Duration;
mod support {
pub(crate) mod mpsc_stream;
}
#[tokio::test]
async fn basic_blocking() {
// Run a few times
for _ in 0..100 {
let out = assert_ok!(
tokio::spawn(async {
assert_ok!(
task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(5));
"hello"
})
.await
)
})
.await
);
assert_eq!(out, "hello");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn block_in_blocking() {
// Run a few times
for _ in 0..100 {
let out = assert_ok!(
tokio::spawn(async {
assert_ok!(
task::spawn_blocking(|| {
task::block_in_place(|| {
thread::sleep(Duration::from_millis(5));
});
"hello"
})
.await
)
})
.await
);
assert_eq!(out, "hello");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn block_in_block() {
// Run a few times
for _ in 0..100 {
let out = assert_ok!(
tokio::spawn(async {
task::block_in_place(|| {
task::block_in_place(|| {
thread::sleep(Duration::from_millis(5));
});
"hello"
})
})
.await
);
assert_eq!(out, "hello");
}
}
#[tokio::test(flavor = "current_thread")]
#[should_panic]
async fn no_block_in_current_thread_scheduler() {
task::block_in_place(|| {});
}
#[test]
fn yes_block_in_threaded_block_on() {
let rt = runtime::Runtime::new().unwrap();
rt.block_on(async {
task::block_in_place(|| {});
});
}
#[test]
#[should_panic]
fn no_block_in_current_thread_block_on() {
let rt = runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async {
task::block_in_place(|| {});
});
}
#[test]
fn can_enter_current_thread_rt_from_within_block_in_place() {
let outer = tokio::runtime::Runtime::new().unwrap();
outer.block_on(async {
tokio::task::block_in_place(|| {
let inner = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
inner.block_on(async {})
})
});
}
#[test]
#[cfg(panic = "unwind")]
fn useful_panic_message_when_dropping_rt_in_rt() {
use std::panic::{catch_unwind, AssertUnwindSafe};
let outer = tokio::runtime::Runtime::new().unwrap();
let result = catch_unwind(AssertUnwindSafe(|| {
outer.block_on(async {
let _ = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
});
}));
assert!(result.is_err());
let err = result.unwrap_err();
let err: &'static str = err.downcast_ref::<&'static str>().unwrap();
assert!(
err.contains("Cannot drop a runtime"),
"Wrong panic message: {:?}",
err
);
}
#[test]
fn can_shutdown_with_zero_timeout_in_runtime() {
let outer = tokio::runtime::Runtime::new().unwrap();
outer.block_on(async {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.shutdown_timeout(Duration::from_nanos(0));
});
}
#[test]
fn can_shutdown_now_in_runtime() {
let outer = tokio::runtime::Runtime::new().unwrap();
outer.block_on(async {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.shutdown_background();
});
}
#[test]
fn coop_disabled_in_block_in_place() {
let outer = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()
.unwrap();
let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
for i in 0..200 {
tx.send(i).unwrap();
}
drop(tx);
outer.block_on(async move {
let jh = tokio::spawn(async move {
tokio::task::block_in_place(move || {
futures::executor::block_on(async move {
use tokio_stream::StreamExt;
assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
})
})
});
tokio::time::timeout(Duration::from_secs(1), jh)
.await
.expect("timed out (probably hanging)")
.unwrap()
});
}
#[test]
fn coop_disabled_in_block_in_place_in_block_on() {
let (done_tx, done_rx) = std::sync::mpsc::channel();
let done = done_tx.clone();
thread::spawn(move || {
let outer = tokio::runtime::Runtime::new().unwrap();
let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
for i in 0..200 {
tx.send(i).unwrap();
}
drop(tx);
outer.block_on(async move {
tokio::task::block_in_place(move || {
futures::executor::block_on(async move {
use tokio_stream::StreamExt;
assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
})
})
});
let _ = done.send(Ok(()));
});
thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
let _ = done_tx.send(Err("timed out (probably hanging)"));
});
done_rx.recv().unwrap().unwrap();
}
#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_when_paused() {
// Do not auto-advance time when we have started a blocking task that has
// not yet finished.
time::timeout(
Duration::from_secs(3),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");
// Really: Do not auto-advance time, even if the timeout is short and the
// blocking task runs for longer than that. It doesn't matter: Tokio time
// is paused; system time is not.
time::timeout(
Duration::from_millis(1),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");
}
#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
time::timeout(
Duration::from_secs(15),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}
#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn unawaited_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
// When this task finishes, time should auto-advance, even though the
// JoinHandle has not been awaited yet.
let a = task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(1));
});
crate::time::sleep(Duration::from_secs(15)).await;
a.await.expect("blocking task should finish");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}
#[cfg(panic = "unwind")]
#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn panicking_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
let result = time::timeout(
Duration::from_secs(15),
task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(1));
panic!("blocking task panicked");
}),
)
.await
.expect("timeout should not trigger");
assert!(result.is_err(), "blocking task should have panicked");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}