Revision control

Copy as Markdown

Other Tools

#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support bind or UDP
use futures::future::poll_fn;
use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
use tokio_test::assert_ok;
const MSG: &[u8] = b"hello";
const MSG_LEN: usize = MSG.len();
#[tokio::test]
async fn send_recv() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
sender.connect(receiver.local_addr()?).await?;
receiver.connect(sender.local_addr()?).await?;
sender.send(MSG).await?;
let mut recv_buf = [0u8; 32];
let len = receiver.recv(&mut recv_buf[..]).await?;
assert_eq!(&recv_buf[..len], MSG);
Ok(())
}
#[tokio::test]
async fn send_recv_poll() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
sender.connect(receiver.local_addr()?).await?;
receiver.connect(sender.local_addr()?).await?;
poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
assert_eq!(read.filled(), MSG);
Ok(())
}
#[tokio::test]
async fn send_to_recv_from() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
sender.send_to(MSG, &receiver_addr).await?;
let mut recv_buf = [0u8; 32];
let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
assert_eq!(&recv_buf[..len], MSG);
assert_eq!(addr, sender.local_addr()?);
Ok(())
}
#[tokio::test]
async fn send_to_recv_from_poll() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
assert_eq!(read.filled(), MSG);
assert_eq!(addr, sender.local_addr()?);
Ok(())
}
#[tokio::test]
async fn send_to_peek_from() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
// peek
let mut recv_buf = [0u8; 32];
let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
assert_eq!(&recv_buf[..n], MSG);
assert_eq!(addr, sender.local_addr()?);
// peek
let mut recv_buf = [0u8; 32];
let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
assert_eq!(&recv_buf[..n], MSG);
assert_eq!(addr, sender.local_addr()?);
let mut recv_buf = [0u8; 32];
let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
assert_eq!(&recv_buf[..n], MSG);
assert_eq!(addr, sender.local_addr()?);
Ok(())
}
#[tokio::test]
async fn send_to_try_peek_from() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
// peek
let mut recv_buf = [0u8; 32];
loop {
match receiver.try_peek_from(&mut recv_buf) {
Ok((n, addr)) => {
assert_eq!(&recv_buf[..n], MSG);
assert_eq!(addr, sender.local_addr()?);
break;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
receiver.readable().await?;
}
Err(e) => return Err(e),
}
}
// peek
let mut recv_buf = [0u8; 32];
let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
assert_eq!(&recv_buf[..n], MSG);
assert_eq!(addr, sender.local_addr()?);
let mut recv_buf = [0u8; 32];
let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
assert_eq!(&recv_buf[..n], MSG);
assert_eq!(addr, sender.local_addr()?);
Ok(())
}
#[tokio::test]
async fn send_to_peek_from_poll() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
assert_eq!(read.filled(), MSG);
assert_eq!(addr, sender.local_addr()?);
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
assert_eq!(read.filled(), MSG);
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
assert_eq!(read.filled(), MSG);
Ok(())
}
#[tokio::test]
async fn peek_sender() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let sender_addr = sender.local_addr()?;
let receiver_addr = receiver.local_addr()?;
let msg = b"Hello, world!";
sender.send_to(msg, receiver_addr).await?;
let peeked_sender = receiver.peek_sender().await?;
assert_eq!(peeked_sender, sender_addr);
// Assert that `peek_sender()` returns the right sender but
// doesn't remove from the receive queue.
let mut recv_buf = [0u8; 32];
let (read, received_sender) = receiver.recv_from(&mut recv_buf).await?;
assert_eq!(&recv_buf[..read], msg);
assert_eq!(received_sender, peeked_sender);
Ok(())
}
#[tokio::test]
async fn poll_peek_sender() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let sender_addr = sender.local_addr()?;
let receiver_addr = receiver.local_addr()?;
let msg = b"Hello, world!";
poll_fn(|cx| sender.poll_send_to(cx, msg, receiver_addr)).await?;
let peeked_sender = poll_fn(|cx| receiver.poll_peek_sender(cx)).await?;
assert_eq!(peeked_sender, sender_addr);
// Assert that `poll_peek_sender()` returns the right sender but
// doesn't remove from the receive queue.
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
let received_sender = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
assert_eq!(read.filled(), msg);
assert_eq!(received_sender, peeked_sender);
Ok(())
}
#[tokio::test]
async fn try_peek_sender() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let sender_addr = sender.local_addr()?;
let receiver_addr = receiver.local_addr()?;
let msg = b"Hello, world!";
sender.send_to(msg, receiver_addr).await?;
let peeked_sender = loop {
match receiver.try_peek_sender() {
Ok(peeked_sender) => break peeked_sender,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
receiver.readable().await?;
}
Err(e) => return Err(e),
}
};
assert_eq!(peeked_sender, sender_addr);
// Assert that `try_peek_sender()` returns the right sender but
// didn't remove from the receive queue.
let mut recv_buf = [0u8; 32];
// We already peeked the sender so there must be data in the receive queue.
let (read, received_sender) = receiver.try_recv_from(&mut recv_buf).unwrap();
assert_eq!(&recv_buf[..read], msg);
assert_eq!(received_sender, peeked_sender);
Ok(())
}
#[tokio::test]
async fn split() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let s = Arc::new(socket);
let r = s.clone();
let addr = s.local_addr()?;
tokio::spawn(async move {
s.send_to(MSG, &addr).await.unwrap();
});
let mut recv_buf = [0u8; 32];
let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
assert_eq!(&recv_buf[..len], MSG);
Ok(())
}
#[tokio::test]
async fn split_chan() -> std::io::Result<()> {
// setup UdpSocket that will echo all sent items
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let addr = socket.local_addr().unwrap();
let s = Arc::new(socket);
let r = s.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
tokio::spawn(async move {
while let Some((bytes, addr)) = rx.recv().await {
s.send_to(&bytes, &addr).await.unwrap();
}
});
tokio::spawn(async move {
let mut buf = [0u8; 32];
loop {
let (len, addr) = r.recv_from(&mut buf).await.unwrap();
tx.send((buf[..len].to_vec(), addr)).await.unwrap();
}
});
// test that we can send a value and get back some response
let sender = UdpSocket::bind("127.0.0.1:0").await?;
sender.send_to(MSG, addr).await?;
let mut recv_buf = [0u8; 32];
let (len, _) = sender.recv_from(&mut recv_buf).await?;
assert_eq!(&recv_buf[..len], MSG);
Ok(())
}
#[tokio::test]
async fn split_chan_poll() -> std::io::Result<()> {
// setup UdpSocket that will echo all sent items
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let addr = socket.local_addr().unwrap();
let s = Arc::new(socket);
let r = s.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
tokio::spawn(async move {
while let Some((bytes, addr)) = rx.recv().await {
poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
.await
.unwrap();
}
});
tokio::spawn(async move {
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
loop {
let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
tx.send((read.filled().to_vec(), addr)).await.unwrap();
}
});
// test that we can send a value and get back some response
let sender = UdpSocket::bind("127.0.0.1:0").await?;
poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
assert_eq!(read.filled(), MSG);
Ok(())
}
// # Note
//
// This test is purposely written such that each time `sender` sends data on
// the socket, `receiver` awaits the data. On Unix, it would be okay waiting
// until the end of the test to receive all the data. On Windows, this would
// **not** be okay because it's resources are completion based (via IOCP).
// If data is sent and not yet received, attempting to send more data will
// result in `ErrorKind::WouldBlock` until the first operation completes.
#[tokio::test]
async fn try_send_spawn() {
const MSG2: &[u8] = b"world!";
const MSG2_LEN: usize = MSG2.len();
let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
receiver
.connect(sender.local_addr().unwrap())
.await
.unwrap();
sender.writable().await.unwrap();
let sent = &sender
.try_send_to(MSG, receiver.local_addr().unwrap())
.unwrap();
assert_eq!(sent, &MSG_LEN);
let mut buf = [0u8; 32];
let mut received = receiver.recv(&mut buf[..]).await.unwrap();
sender
.connect(receiver.local_addr().unwrap())
.await
.unwrap();
let sent = &sender.try_send(MSG2).unwrap();
assert_eq!(sent, &MSG2_LEN);
received += receiver.recv(&mut buf[..]).await.unwrap();
std::thread::spawn(move || {
let sent = &sender.try_send(MSG).unwrap();
assert_eq!(sent, &MSG_LEN);
})
.join()
.unwrap();
received += receiver.recv(&mut buf[..]).await.unwrap();
assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
}
#[tokio::test]
async fn try_send_recv() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
// Connect the two
client.connect(server.local_addr().unwrap()).await.unwrap();
server.connect(client.local_addr().unwrap()).await.unwrap();
for _ in 0..5 {
loop {
client.writable().await.unwrap();
match client.try_send(b"hello world") {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
loop {
server.readable().await.unwrap();
let mut buf = [0; 512];
match server.try_recv(&mut buf) {
Ok(n) => {
assert_eq!(n, 11);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}
#[tokio::test]
async fn try_send_to_recv_from() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let saddr = server.local_addr().unwrap();
// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let caddr = client.local_addr().unwrap();
for _ in 0..5 {
loop {
client.writable().await.unwrap();
match client.try_send_to(b"hello world", saddr) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
loop {
server.readable().await.unwrap();
let mut buf = [0; 512];
match server.try_recv_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr, caddr);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}
#[tokio::test]
async fn try_recv_buf() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
// Connect the two
client.connect(server.local_addr().unwrap()).await.unwrap();
server.connect(client.local_addr().unwrap()).await.unwrap();
for _ in 0..5 {
loop {
client.writable().await.unwrap();
match client.try_send(b"hello world") {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
loop {
server.readable().await.unwrap();
let mut buf = Vec::with_capacity(512);
match server.try_recv_buf(&mut buf) {
Ok(n) => {
assert_eq!(n, 11);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}
#[tokio::test]
async fn recv_buf() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
sender.connect(receiver.local_addr()?).await?;
receiver.connect(sender.local_addr()?).await?;
sender.send(MSG).await?;
let mut recv_buf = Vec::with_capacity(32);
let len = receiver.recv_buf(&mut recv_buf).await?;
assert_eq!(len, MSG_LEN);
assert_eq!(&recv_buf[..len], MSG);
Ok(())
}
#[tokio::test]
async fn try_recv_buf_from() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let saddr = server.local_addr().unwrap();
// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let caddr = client.local_addr().unwrap();
for _ in 0..5 {
loop {
client.writable().await.unwrap();
match client.try_send_to(b"hello world", saddr) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
loop {
server.readable().await.unwrap();
let mut buf = Vec::with_capacity(512);
match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr, caddr);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}
#[tokio::test]
async fn recv_buf_from() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
sender.connect(receiver.local_addr()?).await?;
sender.send(MSG).await?;
let mut recv_buf = Vec::with_capacity(32);
let (len, caddr) = receiver.recv_buf_from(&mut recv_buf).await?;
assert_eq!(len, MSG_LEN);
assert_eq!(&recv_buf[..len], MSG);
assert_eq!(caddr, sender.local_addr()?);
Ok(())
}
#[tokio::test]
async fn poll_ready() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let saddr = server.local_addr().unwrap();
// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let caddr = client.local_addr().unwrap();
for _ in 0..5 {
loop {
assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);
match client.try_send_to(b"hello world", saddr) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
loop {
assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);
let mut buf = Vec::with_capacity(512);
match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr, caddr);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}