blob: ffbcf633b987399ca775deeaba555ef5532090a8 [file] [log] [blame]
//@ignore-target: windows # No libc pipe on Windows
// test_race depends on a deterministic schedule.
//@compile-flags: -Zmiri-deterministic-concurrency
use std::thread;
#[path = "../../utils/libc.rs"]
mod libc_utils;
fn main() {
test_pipe();
test_pipe_threaded();
test_race();
test_pipe_array();
#[cfg(any(
target_os = "linux",
target_os = "illumos",
target_os = "freebsd",
target_os = "solaris"
))]
// `pipe2` only exists in some specific os.
test_pipe2();
test_pipe_setfl_getfl();
test_pipe_fcntl_threaded();
}
fn test_pipe() {
let mut fds = [-1, -1];
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
assert_eq!(res, 0);
// Read size == data available in buffer.
let data = "12345".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
let mut buf3: [u8; 5] = [0; 5];
let res = unsafe {
libc_utils::read_all(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t)
};
assert_eq!(res, 5);
assert_eq!(buf3, "12345".as_bytes());
// Read size > data available in buffer.
let data = "123".as_bytes();
let res = unsafe { libc_utils::write_all(fds[1], data.as_ptr() as *const libc::c_void, 3) };
assert_eq!(res, 3);
let mut buf4: [u8; 5] = [0; 5];
let res = unsafe { libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t) };
assert!(res > 0 && res <= 3);
let res = res as usize;
assert_eq!(buf4[..res], data[..res]);
if res < 3 {
// Drain the rest from the read end.
let res = unsafe { libc_utils::read_all(fds[0], buf4[res..].as_mut_ptr().cast(), 3 - res) };
assert!(res > 0);
}
}
fn test_pipe_threaded() {
let mut fds = [-1, -1];
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
assert_eq!(res, 0);
let thread1 = thread::spawn(move || {
let mut buf: [u8; 5] = [0; 5];
let res: i64 = unsafe {
libc_utils::read_all(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
.try_into()
.unwrap()
};
assert_eq!(res, 5);
assert_eq!(buf, "abcde".as_bytes());
});
thread::yield_now();
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
thread1.join().unwrap();
// Read and write from different direction
let thread2 = thread::spawn(move || {
thread::yield_now();
let data = "12345".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
});
let mut buf: [u8; 5] = [0; 5];
let res =
unsafe { libc_utils::read_all(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
assert_eq!(res, 5);
assert_eq!(buf, "12345".as_bytes());
thread2.join().unwrap();
}
// FIXME(static_mut_refs): Do not allow `static_mut_refs` lint
#[allow(static_mut_refs)]
fn test_race() {
static mut VAL: u8 = 0;
let mut fds = [-1, -1];
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
assert_eq!(res, 0);
let thread1 = thread::spawn(move || {
let mut buf: [u8; 1] = [0; 1];
// write() from the main thread will occur before the read() here
// because preemption is disabled and the main thread yields after write().
let res: i32 = unsafe {
libc_utils::read_all(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
.try_into()
.unwrap()
};
assert_eq!(res, 1);
assert_eq!(buf, "a".as_bytes());
// The read above establishes a happens-before so it is now safe to access this global variable.
unsafe { assert_eq!(VAL, 1) };
});
unsafe { VAL = 1 };
let data = "a".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 1) };
assert_eq!(res, 1);
thread::yield_now();
thread1.join().unwrap();
}
fn test_pipe_array() {
// Declare `pipe` to take an array rather than a `*mut i32`.
extern "C" {
fn pipe(pipefd: &mut [i32; 2]) -> i32;
}
let mut fds: [i32; 2] = [0; 2];
assert_eq!(unsafe { pipe(&mut fds) }, 0);
}
/// Test if pipe2 (including the O_NONBLOCK flag) is supported.
#[cfg(any(
target_os = "linux",
target_os = "illumos",
target_os = "freebsd",
target_os = "solaris"
))]
fn test_pipe2() {
let mut fds = [-1, -1];
let res = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK) };
assert_eq!(res, 0);
}
/// Basic test for pipe fcntl's F_SETFL and F_GETFL flag.
fn test_pipe_setfl_getfl() {
// Initialise pipe fds.
let mut fds = [-1, -1];
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
assert_eq!(res, 0);
// Both sides should either have O_RONLY or O_WRONLY.
let res = unsafe { libc::fcntl(fds[0], libc::F_GETFL) };
assert_eq!(res, libc::O_RDONLY);
let res = unsafe { libc::fcntl(fds[1], libc::F_GETFL) };
assert_eq!(res, libc::O_WRONLY);
// Add the O_NONBLOCK flag with F_SETFL.
let res = unsafe { libc::fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK) };
assert_eq!(res, 0);
// Test if the O_NONBLOCK flag is successfully added.
let res = unsafe { libc::fcntl(fds[0], libc::F_GETFL) };
assert_eq!(res, libc::O_RDONLY | libc::O_NONBLOCK);
// The other side remains unchanged.
let res = unsafe { libc::fcntl(fds[1], libc::F_GETFL) };
assert_eq!(res, libc::O_WRONLY);
// Test if O_NONBLOCK flag can be unset.
let res = unsafe { libc::fcntl(fds[0], libc::F_SETFL, 0) };
assert_eq!(res, 0);
let res = unsafe { libc::fcntl(fds[0], libc::F_GETFL) };
assert_eq!(res, libc::O_RDONLY);
}
/// Test the behaviour of F_SETFL/F_GETFL when a fd is blocking.
/// The expected execution is:
/// 1. Main thread blocks on fds[0] `read`.
/// 2. Thread 1 sets O_NONBLOCK flag on fds[0],
/// checks the value of F_GETFL,
/// then writes to fds[1] to unblock main thread's `read`.
fn test_pipe_fcntl_threaded() {
let mut fds = [-1, -1];
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
assert_eq!(res, 0);
let mut buf: [u8; 5] = [0; 5];
let thread1 = thread::spawn(move || {
// Add O_NONBLOCK flag while pipe is still blocked on read.
let res = unsafe { libc::fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK) };
assert_eq!(res, 0);
// Check the new flag value while the main thread is still blocked on fds[0].
let res = unsafe { libc::fcntl(fds[0], libc::F_GETFL) };
assert_eq!(res, libc::O_NONBLOCK);
// The write below will unblock the `read` in main thread: even though
// the socket is now "non-blocking", the shim needs to deal correctly
// with threads that were blocked before the socket was made non-blocking.
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
});
// The `read` below will block.
let res =
unsafe { libc_utils::read_all(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
thread1.join().unwrap();
assert_eq!(res, 5);
}