blob: ba1e52cd39712f96945b5b6fabea57010ba338e6 [file] [edit]
use std::collections::BTreeMap;
use std::io;
use std::time::Duration;
use mio::event::Source;
use mio::{Events, Interest, Poll, Token};
use rustc_data_structures::fx::FxHashMap;
use crate::shims::{FdId, FileDescriptionRef};
use crate::*;
/// Capacity of the event queue which can be polled at a time.
/// Since we don't expect many simultaneous blocking I/O events
/// this value can be set rather low.
const IO_EVENT_CAPACITY: usize = 16;
/// Trait for values that contain a mio [`Source`].
pub trait WithSource {
/// Invoke `f` on the source inside `self`.
fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
}
/// An interest receiver defines the action that should be taken when
/// the associated [`Interest`] is fulfilled.
#[derive(Debug, Hash, PartialEq, Clone, Copy, Eq, PartialOrd, Ord)]
pub enum InterestReceiver {
/// The specified thread should be unblocked.
UnblockThread(ThreadId),
}
/// Manager for managing blocking host I/O in a non-blocking manner.
/// We use [`Poll`] to poll for new I/O events from the OS for sources
/// registered using this manager.
///
/// Since blocking host I/O is inherently non-deterministic, no method on this
/// manager should be called when isolation is enabled. The only exception is
/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
/// we assert that isolation is disabled!
pub struct BlockingIoManager {
/// Poll instance to monitor I/O events from the OS.
/// This is only [`None`] when Miri is run with isolation enabled.
poll: Option<Poll>,
/// Buffer used to store the ready I/O events when calling [`Poll::poll`].
/// This is not part of the state and only stored to avoid allocating a
/// new buffer for every poll.
events: Events,
/// Map from source ids to the actual sources and their registered receivers
/// together with their associated interests.
sources:
BTreeMap<FdId, (FileDescriptionRef<dyn WithSource>, FxHashMap<InterestReceiver, Interest>)>,
}
impl BlockingIoManager {
/// Create a new blocking I/O manager instance based on the availability
/// of communication with the host.
pub fn new(communicate: bool) -> Result<Self, io::Error> {
let manager = Self {
poll: communicate.then_some(Poll::new()?),
events: Events::with_capacity(IO_EVENT_CAPACITY),
sources: BTreeMap::default(),
};
Ok(manager)
}
/// Poll for new I/O events from the OS or wait until the timeout expired.
///
/// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
/// reads all events since the last poll.
/// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
/// specified duration.
/// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
///
/// Returns the interest receivers for all file descriptions which received an I/O event together
/// with the file description they were registered for.
pub fn poll(
&mut self,
timeout: Option<Duration>,
) -> Result<Vec<(InterestReceiver, FileDescriptionRef<dyn WithSource>)>, io::Error> {
let poll =
self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled");
// Poll for new I/O events from OS and store them in the events buffer.
poll.poll(&mut self.events, timeout)?;
let ready = self
.events
.iter()
.flat_map(|event| {
let token = event.token();
// We know all tokens are valid `FdId`.
let fd_id = FdId::new_unchecked(token.0);
let (source, interests) =
self.sources.get(&fd_id).expect("Source should be registered");
assert_eq!(source.id(), fd_id);
// Because we allow spurious wake-ups, we mark all interests as ready even
// though some may not have been fulfilled.
interests.keys().map(move |receiver| (*receiver, source.clone()))
})
.collect::<Vec<_>>();
// Deregister all ready sources as we only want to receive one event per receiver.
ready.iter().for_each(|(receiver, source)| self.deregister(source.id(), *receiver));
Ok(ready)
}
/// Register an interest for a blocking I/O source.
///
/// As the OS can always produce spurious wake-ups, it's the callers responsibility to
/// verify the requested I/O interests are really ready and to register again if they're not.
///
/// It's assumed that no interest is already registered for this source with the same reason!
pub fn register(
&mut self,
source_fd: FileDescriptionRef<dyn WithSource>,
receiver: InterestReceiver,
interest: Interest,
) {
let poll =
self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
let id = source_fd.id();
let token = Token(id.to_usize());
let Some((_, current_interests)) = self.sources.get_mut(&id) else {
// The source is not yet registered.
// Treat errors from registering as fatal. On UNIX hosts this can only
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
source_fd
.with_source(&mut |source| poll.registry().register(source, token, interest))
.unwrap();
self.sources.insert(id, (source_fd, FxHashMap::from_iter([(receiver, interest)])));
return;
};
// The source is already registered. We need to check whether we need to
// reregister because the provided interest contains new interests for the source.
let old_interest =
interest_union(current_interests).expect("Source should contain at least one interest");
current_interests
.try_insert(receiver, interest)
.unwrap_or_else(|_| panic!("Receiver should be unique"));
let new_interest = old_interest.add(interest);
// Reregister the source since the overall interests might have changed.
// Treat errors from reregistering as fatal. On UNIX hosts this can only
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
source_fd
.with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
.unwrap();
}
/// Deregister an interest from a blocking I/O source.
///
/// The receiver is assumed to be registered for the provided source!
pub fn deregister(&mut self, source_id: FdId, receiver: InterestReceiver) {
let poll =
self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
let token = Token(source_id.to_usize());
let (fd, current_interests) =
self.sources.get_mut(&source_id).expect("Source should be registered");
current_interests
.remove(&receiver)
.unwrap_or_else(|| panic!("Receiver should be registered for source"));
let Some(new_interest) = interest_union(current_interests) else {
// There are no longer any interests in this source.
// We can thus deregister the source from the poll.
// Treat errors from deregistering as fatal. On UNIX hosts this can only
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
fd.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
self.sources.remove(&source_id);
return;
};
// Reregister the source since the overall interests might have changed.
// Treat errors from reregistering as fatal. On UNIX hosts this can only
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
fd.with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
.unwrap();
}
}
/// Get the union of all interests for a source. Returns `None` if the map is empty.
fn interest_union(interests: &FxHashMap<InterestReceiver, Interest>) -> Option<Interest> {
interests
.values()
.copied()
.fold(None, |acc, interest| acc.map(|acc: Interest| acc.add(interest)).or(Some(interest)))
}
impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
/// Block the current thread until some interests on an I/O source
/// are fulfilled or the optional timeout exceeded.
/// The callback will be invoked when the thread gets unblocked.
///
/// There can be spurious wake-ups by the OS and thus it's the callers
/// responsibility to verify that the requested I/O interests are
/// really ready and to block again if they're not.
#[inline]
fn block_thread_for_io(
&mut self,
source_fd: FileDescriptionRef<dyn WithSource>,
interests: Interest,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
callback: DynUnblockCallback<'tcx>,
) {
let this = self.eval_context_mut();
this.machine.blocking_io.register(
source_fd,
InterestReceiver::UnblockThread(this.machine.threads.active_thread()),
interests,
);
this.block_thread(BlockReason::IO, timeout, callback);
}
}