Skip to content
Open
91 changes: 91 additions & 0 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,30 @@ impl Client {
pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
let mut buf = [0];

// On Linux, we can use preadv2 to do non-blocking read,
// even if `O_NONBLOCK` is not set.
#[cfg(any(target_os = "linux", target_os = "android"))]
{
let read = self.read().as_raw_fd();
loop {
match linux::non_blocking_read(read, &mut buf) {
Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
Ok(_) => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"early EOF on jobserver pipe",
))
}

Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) if e.kind() == io::ErrorKind::Unsupported => break,

Err(err) => return Err(err),
}
}
}

let (mut fifo, is_non_blocking) = match self {
Self::Fifo {
file,
Expand Down Expand Up @@ -368,6 +392,73 @@ impl Client {
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
mod linux {
use super::*;

use libc::{iovec, off_t, ssize_t, syscall, SYS_preadv2};

// TODO: Replace this with libc::RWF_NOWAIT once they have it for musl
// targets
const RWF_NOWAIT: c_int = 0x00000008;

fn cvt_ssize(t: ssize_t) -> io::Result<ssize_t> {
if t == -1 {
Err(io::Error::last_os_error())
} else {
Ok(t)
}
}

fn preadv2(fd: c_int, iov: &iovec) -> ssize_t {
let iovcnt: c_int = 1;
let offset: off_t = -1;

if cfg!(all(target_arch = "x86_64", target_pointer_width = "64")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The branches here could use some comment. It's a bit surprising that 86-64 needs two special-cases but all other platforms (including powerpc, aarch64 and even i686) all go in the same bucket.

unsafe { syscall(SYS_preadv2, fd, iov, iovcnt, offset, 0 as off_t, RWF_NOWAIT) }
} else if cfg!(all(target_arch = "x86_64", target_pointer_width = "32")) {
unsafe { syscall(SYS_preadv2, fd, iov, iovcnt, offset, RWF_NOWAIT) }
} else {
unsafe {
syscall(
SYS_preadv2,
fd,
iov,
iovcnt,
offset as libc::c_long,
((offset as u64) >> 32) as libc::c_long,
RWF_NOWAIT,
)
}
}
.try_into()
.unwrap()
}

pub fn non_blocking_read(fd: c_int, buf: &mut [u8]) -> io::Result<usize> {
static IS_NONBLOCKING_READ_UNSUPPORTED: AtomicBool = AtomicBool::new(false);

if IS_NONBLOCKING_READ_UNSUPPORTED.load(Ordering::Relaxed) {
return Err(io::ErrorKind::Unsupported.into());
}

match cvt_ssize(preadv2(
fd,
&iovec {
iov_base: buf.as_ptr() as *mut _,
iov_len: buf.len(),
},
)) {
Ok(cnt) => Ok(cnt.try_into().unwrap()),
Err(err) if matches!(err.raw_os_error(), Some(libc::EOPNOTSUPP | libc::ENOSYS)) => {
IS_NONBLOCKING_READ_UNSUPPORTED.store(true, Ordering::Relaxed);
Err(io::ErrorKind::Unsupported.into())
}
Err(err) => Err(err),
}
}
}

#[derive(Debug)]
pub struct Helper {
thread: JoinHandle<()>,
Expand Down