Source code

Revision control

Copy as Markdown

Other Tools

use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures_test::task::noop_context;
use std::panic::{self, AssertUnwindSafe};
use std::sync::{Arc, Barrier};
use std::thread;
#[test]
fn basic_usage() {
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
queue.push(rx1);
queue.push(rx2);
queue.push(rx3);
assert!(!queue.poll_next_unpin(cx).is_ready());
tx2.send("hello").unwrap();
assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
assert!(!queue.poll_next_unpin(cx).is_ready());
tx1.send("world").unwrap();
tx3.send("world2").unwrap();
assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}
#[test]
fn resolving_errors() {
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
queue.push(rx1);
queue.push(rx2);
queue.push(rx3);
assert!(!queue.poll_next_unpin(cx).is_ready());
drop(tx2);
assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
assert!(!queue.poll_next_unpin(cx).is_ready());
drop(tx1);
tx3.send("world2").unwrap();
assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}
#[test]
fn dropping_ready_queue() {
block_on(future::lazy(move |_| {
let queue = FuturesUnordered::new();
let (mut tx1, rx1) = oneshot::channel::<()>();
let (mut tx2, rx2) = oneshot::channel::<()>();
let (mut tx3, rx3) = oneshot::channel::<()>();
queue.push(rx1);
queue.push(rx2);
queue.push(rx3);
{
let cx = &mut noop_context();
assert!(!tx1.poll_canceled(cx).is_ready());
assert!(!tx2.poll_canceled(cx).is_ready());
assert!(!tx3.poll_canceled(cx).is_ready());
drop(queue);
assert!(tx1.poll_canceled(cx).is_ready());
assert!(tx2.poll_canceled(cx).is_ready());
assert!(tx3.poll_canceled(cx).is_ready());
}
}));
}
#[test]
fn stress() {
const ITER: usize = if cfg!(miri) { 30 } else { 300 };
for i in 0..ITER {
let n = (i % 10) + 1;
let mut queue = FuturesUnordered::new();
for _ in 0..5 {
let barrier = Arc::new(Barrier::new(n + 1));
for num in 0..n {
let barrier = barrier.clone();
let (tx, rx) = oneshot::channel();
queue.push(rx);
thread::spawn(move || {
barrier.wait();
tx.send(num).unwrap();
});
}
barrier.wait();
let mut sync = block_on_stream(queue);
let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
assert_eq!(rx.len(), n);
rx.sort_unstable();
for (i, x) in rx.into_iter().enumerate() {
assert_eq!(i, x);
}
queue = sync.into_inner();
}
}
}
#[test]
fn panicking_future_dropped() {
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
assert!(r.is_err());
assert!(queue.is_empty());
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}