blob: b2c9ba34c0ff6c94f3aba3549e5f9a966b1212b5 [file] [log] [blame] [edit]
//! A single-producer, single-consumer (oneshot) channel.
//!
//! This is an experimental module, so the API will likely change.
use crate::sync::mpmc;
use crate::sync::mpsc::{RecvError, SendError};
use crate::time::{Duration, Instant};
use crate::{error, fmt};
/// Creates a new oneshot channel, returning the sender/receiver halves.
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
///
/// let (sender, receiver) = oneshot::channel();
///
/// // Spawn off an expensive computation.
/// thread::spawn(move || {
/// # fn expensive_computation() -> i32 { 42 }
/// sender.send(expensive_computation()).unwrap();
/// // `sender` is consumed by `send`, so we cannot use it anymore.
/// });
///
/// # fn do_other_work() -> i32 { 42 }
/// do_other_work();
///
/// // Let's see what that answer was...
/// println!("{:?}", receiver.recv().unwrap());
/// // `receiver` is consumed by `recv`, so we cannot use it anymore.
/// ```
#[must_use]
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
// Using a `sync_channel` with capacity 1 means that the internal implementation will use the
// `Array`-flavored channel implementation.
let (sender, receiver) = mpmc::sync_channel(1);
(Sender { inner: sender }, Receiver { inner: receiver })
}
////////////////////////////////////////////////////////////////////////////////////////////////////
// Sender
////////////////////////////////////////////////////////////////////////////////////////////////////
/// The sending half of a oneshot channel.
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
///
/// let (sender, receiver) = oneshot::channel();
///
/// thread::spawn(move || {
/// sender.send("Hello from thread!").unwrap();
/// });
///
/// assert_eq!(receiver.recv().unwrap(), "Hello from thread!");
/// ```
///
/// `Sender` cannot be sent between threads if it is sending non-`Send` types.
///
/// ```compile_fail
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
/// use std::ptr;
///
/// let (sender, receiver) = oneshot::channel();
///
/// struct NotSend(*mut ());
/// thread::spawn(move || {
/// sender.send(NotSend(ptr::null_mut()));
/// });
///
/// let reply = receiver.try_recv().unwrap();
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub struct Sender<T> {
/// The `oneshot` channel is simply a wrapper around a `mpmc` channel.
inner: mpmc::Sender<T>,
}
// SAFETY: Since the only methods in which synchronization must occur take full ownership of the
// [`Sender`], it is perfectly safe to share a `&Sender` between threads (as it is effectively
// useless without ownership).
#[unstable(feature = "oneshot_channel", issue = "143674")]
unsafe impl<T> Sync for Sender<T> {}
impl<T> Sender<T> {
/// Attempts to send a value through this channel. This can only fail if the corresponding
/// [`Receiver<T>`] has been dropped.
///
/// This method is non-blocking (wait-free).
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
///
/// let (tx, rx) = oneshot::channel();
///
/// thread::spawn(move || {
/// // Perform some computation.
/// let result = 2 + 2;
/// tx.send(result).unwrap();
/// });
///
/// assert_eq!(rx.recv().unwrap(), 4);
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub fn send(self, t: T) -> Result<(), SendError<T>> {
self.inner.send(t)
}
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender").finish_non_exhaustive()
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
// Receiver
////////////////////////////////////////////////////////////////////////////////////////////////////
/// The receiving half of a oneshot channel.
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
/// use std::time::Duration;
///
/// let (sender, receiver) = oneshot::channel();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(100));
/// sender.send("Hello after delay!").unwrap();
/// });
///
/// println!("Waiting for message...");
/// println!("{}", receiver.recv().unwrap());
/// ```
///
/// `Receiver` cannot be sent between threads if it is receiving non-`Send` types.
///
/// ```compile_fail
/// # #![feature(oneshot_channel)]
/// # use std::sync::oneshot;
/// # use std::thread;
/// # use std::ptr;
/// #
/// let (sender, receiver) = oneshot::channel();
///
/// struct NotSend(*mut ());
/// sender.send(NotSend(ptr::null_mut()));
///
/// thread::spawn(move || {
/// let reply = receiver.try_recv().unwrap();
/// });
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub struct Receiver<T> {
/// The `oneshot` channel is simply a wrapper around a `mpmc` channel.
inner: mpmc::Receiver<T>,
}
// SAFETY: Since the only methods in which synchronization must occur take full ownership of the
// [`Receiver`], it is perfectly safe to share a `&Receiver` between threads (as it is unable to
// receive any values without ownership).
#[unstable(feature = "oneshot_channel", issue = "143674")]
unsafe impl<T> Sync for Receiver<T> {}
impl<T> Receiver<T> {
/// Receives the value from the sending end, blocking the calling thread until it gets it.
///
/// Can only fail if the corresponding [`Sender<T>`] has been dropped.
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
/// use std::time::Duration;
///
/// let (tx, rx) = oneshot::channel();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// tx.send("Done!").unwrap();
/// });
///
/// // This will block until the message arrives.
/// println!("{}", rx.recv().unwrap());
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub fn recv(self) -> Result<T, RecvError> {
self.inner.recv()
}
// Fallible methods.
/// Attempts to return a pending value on this receiver without blocking.
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
/// use std::time::Duration;
///
/// let (sender, mut receiver) = oneshot::channel();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(100));
/// sender.send(42).unwrap();
/// });
///
/// // Keep trying until we get the message, doing other work in the process.
/// loop {
/// match receiver.try_recv() {
/// Ok(value) => {
/// assert_eq!(value, 42);
/// break;
/// }
/// Err(oneshot::TryRecvError::Empty(rx)) => {
/// // Retake ownership of the receiver.
/// receiver = rx;
/// # fn do_other_work() { thread::sleep(Duration::from_millis(25)); }
/// do_other_work();
/// }
/// Err(oneshot::TryRecvError::Disconnected) => panic!("Sender disconnected"),
/// }
/// }
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub fn try_recv(self) -> Result<T, TryRecvError<T>> {
self.inner.try_recv().map_err(|err| match err {
mpmc::TryRecvError::Empty => TryRecvError::Empty(self),
mpmc::TryRecvError::Disconnected => TryRecvError::Disconnected,
})
}
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
/// [`Sender`] half of this channel has been dropped, or if it waits more than `timeout`.
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
/// use std::time::Duration;
///
/// let (sender, receiver) = oneshot::channel();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// sender.send("Success!").unwrap();
/// });
///
/// // Wait up to 1 second for the message
/// match receiver.recv_timeout(Duration::from_secs(1)) {
/// Ok(msg) => println!("Received: {}", msg),
/// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Timed out!"),
/// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
/// }
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub fn recv_timeout(self, timeout: Duration) -> Result<T, RecvTimeoutError<T>> {
self.inner.recv_timeout(timeout).map_err(|err| match err {
mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self),
mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
})
}
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
/// [`Sender`] half of this channel has been dropped, or if `deadline` is reached.
///
/// # Examples
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot;
/// use std::thread;
/// use std::time::{Duration, Instant};
///
/// let (sender, receiver) = oneshot::channel();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(100));
/// sender.send("Just in time!").unwrap();
/// });
///
/// let deadline = Instant::now() + Duration::from_millis(500);
/// match receiver.recv_deadline(deadline) {
/// Ok(msg) => println!("Received: {}", msg),
/// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Missed deadline!"),
/// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
/// }
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub fn recv_deadline(self, deadline: Instant) -> Result<T, RecvTimeoutError<T>> {
self.inner.recv_deadline(deadline).map_err(|err| match err {
mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self),
mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
})
}
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver").finish_non_exhaustive()
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////
// Receiver Errors
////////////////////////////////////////////////////////////////////////////////////////////////////
/// An error returned from the [`try_recv`](Receiver::try_recv) method.
///
/// See the documentation for [`try_recv`] for more information on how to use this error.
///
/// [`try_recv`]: Receiver::try_recv
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub enum TryRecvError<T> {
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
/// disconnected). This variant contains the [`Receiver`] that [`try_recv`](Receiver::try_recv)
/// took ownership over.
Empty(Receiver<T>),
/// The corresponding [`Sender`] half of this channel has become disconnected, and there will
/// never be any more data sent over the channel.
Disconnected,
}
/// An error returned from the [`recv_timeout`](Receiver::recv_timeout) or
/// [`recv_deadline`](Receiver::recv_deadline) methods.
///
/// # Examples
///
/// Usage of this error is similar to [`TryRecvError`].
///
/// ```
/// #![feature(oneshot_channel)]
/// use std::sync::oneshot::{self, RecvTimeoutError};
/// use std::thread;
/// use std::time::Duration;
///
/// let (sender, receiver) = oneshot::channel();
///
/// let send_failure = thread::spawn(move || {
/// // Simulate a long computation that takes longer than our timeout.
/// thread::sleep(Duration::from_millis(250));
///
/// // This will likely fail to send because we drop the receiver in the main thread.
/// sender.send("Goodbye!".to_string()).unwrap();
/// });
///
/// // Try to receive the message with a short timeout.
/// match receiver.recv_timeout(Duration::from_millis(10)) {
/// Ok(msg) => println!("Received: {}", msg),
/// Err(RecvTimeoutError::Timeout(rx)) => {
/// println!("Timed out waiting for message!");
///
/// // Note that you can reuse the receiver without dropping it.
/// drop(rx);
/// },
/// Err(RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
/// }
///
/// send_failure.join().unwrap_err();
/// ```
#[unstable(feature = "oneshot_channel", issue = "143674")]
pub enum RecvTimeoutError<T> {
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
/// disconnected). This variant contains the [`Receiver`] that either
/// [`recv_timeout`](Receiver::recv_timeout) or [`recv_deadline`](Receiver::recv_deadline) took
/// ownership over.
Timeout(Receiver<T>),
/// The corresponding [`Sender`] half of this channel has become disconnected, and there will
/// never be any more data sent over the channel.
Disconnected,
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> fmt::Debug for TryRecvError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("TryRecvError").finish_non_exhaustive()
}
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> fmt::Display for TryRecvError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TryRecvError::Empty(..) => "receiving on an empty oneshot channel".fmt(f),
TryRecvError::Disconnected => "receiving on a closed oneshot channel".fmt(f),
}
}
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> error::Error for TryRecvError<T> {}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> From<RecvError> for TryRecvError<T> {
/// Converts a `RecvError` into a `TryRecvError`.
///
/// This conversion always returns `TryRecvError::Disconnected`.
///
/// No data is allocated on the heap.
fn from(err: RecvError) -> TryRecvError<T> {
match err {
RecvError => TryRecvError::Disconnected,
}
}
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> fmt::Debug for RecvTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("RecvTimeoutError").finish_non_exhaustive()
}
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> fmt::Display for RecvTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
RecvTimeoutError::Timeout(..) => "timed out waiting on oneshot channel".fmt(f),
RecvTimeoutError::Disconnected => "receiving on a closed oneshot channel".fmt(f),
}
}
}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> error::Error for RecvTimeoutError<T> {}
#[unstable(feature = "oneshot_channel", issue = "143674")]
impl<T> From<RecvError> for RecvTimeoutError<T> {
/// Converts a `RecvError` into a `RecvTimeoutError`.
///
/// This conversion always returns `RecvTimeoutError::Disconnected`.
///
/// No data is allocated on the heap.
fn from(err: RecvError) -> RecvTimeoutError<T> {
match err {
RecvError => RecvTimeoutError::Disconnected,
}
}
}