blob: 62f084acfd2599182bbe8d1e3e3fc8b0d29cc785 [file] [log] [blame]
//! Efficient read-write locking without `pthread_rwlock_t`.
//!
//! The readers-writer lock provided by the `pthread` library has a number of problems which make it
//! a suboptimal choice for `std`:
//!
//! * It is non-movable, so it needs to be allocated (lazily, to make the constructor `const`).
//! * `pthread` is an external library, meaning the fast path of acquiring an uncontended lock
//! cannot be inlined.
//! * Some platforms (at least glibc before version 2.25) have buggy implementations that can easily
//! lead to undefined behaviour in safe Rust code when not properly guarded against.
//! * On some platforms (e.g. macOS), the lock is very slow.
//!
//! Therefore, we implement our own [`RwLock`]! Naively, one might reach for a spinlock, but those
//! can be quite [problematic] when the lock is contended.
//!
//! Instead, this [`RwLock`] copies its implementation strategy from the Windows [SRWLOCK] and the
//! [usync] library implementations.
//!
//! Spinning is still used for the fast path, but it is bounded: after spinning fails, threads will
//! locklessly add an information structure ([`Node`]) containing a [`Thread`] handle into a queue
//! of waiters associated with the lock. The lock owner, upon releasing the lock, will scan through
//! the queue and wake up threads as appropriate, and the newly-awoken threads will then try to
//! acquire the lock themselves.
//!
//! The resulting [`RwLock`] is:
//!
//! * adaptive, since it spins before doing any heavyweight parking operations
//! * allocation-free, modulo the per-thread [`Thread`] handle, which is allocated anyways when
//! using threads created by `std`
//! * writer-preferring, even if some readers may still slip through
//! * unfair, which reduces context-switching and thus drastically improves performance
//!
//! and also quite fast in most cases.
//!
//! [problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html
//! [SRWLOCK]: https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks
//! [usync]: https://crates.io/crates/usync
//!
//! # Implementation
//!
//! ## State
//!
//! A single [`AtomicPtr`] is used as state variable. The lowest four bits are used to indicate the
//! meaning of the remaining bits:
//!
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | [`DOWNGRADED`] | Remaining | |
//! |------------|:-----------|:-----------------|:---------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
//! | 0 | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
//! | 1 | 0 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
//! | 1 | 0 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
//! | 0 | 1 | * | 0 | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
//! | 1 | 1 | * | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
//!
//! ## Waiter Queue
//!
//! When threads are waiting on the lock (the `QUEUE` bit is set), the lock state points to a queue
//! of waiters, which is implemented as a linked list of nodes stored on the stack to avoid memory
//! allocation.
//!
//! To enable lock-free enqueuing of new nodes to the queue, the linked list is singly-linked upon
//! creation.
//!
//! When the lock is read-locked, the lock count (number of readers) is stored in the last link of
//! the queue. Threads have to traverse the queue to find the last element upon releasing the lock.
//! To avoid having to traverse the entire list every time we want to access the reader count, a
//! pointer to the found tail is cached in the (current) first element of the queue.
//!
//! Also, while the lock is unfair for performance reasons, it is still best to wake the tail node
//! first (FIFO ordering). Since we always pop nodes off the tail of the queue, we must store
//! backlinks to previous nodes so that we can update the `tail` field of the (current) first
//! element of the queue. Adding backlinks is done at the same time as finding the tail (via the
//! function [`find_tail_and_add_backlinks`]), and thus encountering a set tail field on a node
//! indicates that all following nodes in the queue are initialized.
//!
//! TLDR: Here's a diagram of what the queue looks like:
//!
//! ```text
//! state
//! │
//! ▼
//! ╭───────╮ next ╭───────╮ next ╭───────╮ next ╭───────╮
//! │ ├─────►│ ├─────►│ ├─────►│ count │
//! │ │ │ │ │ │ │ │
//! │ │ │ │◄─────┤ │◄─────┤ │
//! ╰───────╯ ╰───────╯ prev ╰───────╯ prev ╰───────╯
//! │ ▲
//! └───────────────────────────┘
//! tail
//! ```
//!
//! Invariants:
//! 1. At least one node must contain a non-null, current `tail` field.
//! 2. The first non-null `tail` field must be valid and current.
//! 3. All nodes preceding this node must have a correct, non-null `next` field.
//! 4. All nodes following this node must have a correct, non-null `prev` field.
//!
//! Access to the queue is controlled by the `QUEUE_LOCKED` bit. Threads will try to set this bit
//! in two cases: one is when a thread enqueues itself and eagerly adds backlinks to the queue
//! (which drastically improves performance), and the other is after a thread unlocks the lock to
//! wake up the next waiter(s).
//!
//! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The
//! thread releasing the `QUEUE_LOCKED` bit will check the state of the lock (in particular, whether
//! a downgrade was requested using the [`DOWNGRADED`] bit) and wake up waiters as appropriate. This
//! guarantees forward progress even if the unlocking thread could not acquire the queue lock.
//!
//! ## Memory Orderings
//!
//! To properly synchronize changes to the data protected by the lock, the lock is acquired and
//! released with [`Acquire`] and [`Release`] ordering, respectively. To propagate the
//! initialization of nodes, changes to the queue lock are also performed using these orderings.
#![forbid(unsafe_op_in_unsafe_fn)]
use crate::cell::OnceCell;
use crate::hint::spin_loop;
use crate::mem;
use crate::ptr::{self, NonNull, null_mut, without_provenance_mut};
use crate::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use crate::sync::atomic::{Atomic, AtomicBool, AtomicPtr};
use crate::thread::{self, Thread};
/// The atomic lock state.
type AtomicState = Atomic<State>;
/// The inner lock state.
type State = *mut ();
const UNLOCKED: State = without_provenance_mut(0);
const LOCKED: usize = 1 << 0;
const QUEUED: usize = 1 << 1;
const QUEUE_LOCKED: usize = 1 << 2;
const DOWNGRADED: usize = 1 << 3;
const SINGLE: usize = 1 << 4;
const STATE: usize = DOWNGRADED | QUEUE_LOCKED | QUEUED | LOCKED;
const NODE_MASK: usize = !STATE;
/// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation
/// will be retried.
///
/// In other words, `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times.
const SPIN_COUNT: usize = 7;
/// Marks the state as write-locked, if possible.
#[inline]
fn write_lock(state: State) -> Option<State> {
if state.addr() & LOCKED == 0 { Some(state.map_addr(|addr| addr | LOCKED)) } else { None }
}
/// Marks the state as read-locked, if possible.
#[inline]
fn read_lock(state: State) -> Option<State> {
if state.addr() & QUEUED == 0 && state.addr() != LOCKED {
Some(without_provenance_mut(state.addr().checked_add(SINGLE)? | LOCKED))
} else {
None
}
}
/// Converts a `State` into a `Node` by masking out the bottom bits of the state, assuming that the
/// state points to a queue node.
///
/// # Safety
///
/// The state must contain a valid pointer to a queue node.
#[inline]
unsafe fn to_node(state: State) -> NonNull<Node> {
unsafe { NonNull::new_unchecked(state.mask(NODE_MASK)).cast() }
}
/// The representation of a thread waiting on the lock queue.
///
/// We initialize these `Node`s on thread execution stacks to avoid allocation.
///
/// Note that we need an alignment of 16 to ensure that the last 4 bits of any
/// pointers to `Node`s are always zeroed (for the bit flags described in the
/// module-level documentation).
#[repr(align(16))]
struct Node {
next: AtomicLink,
prev: AtomicLink,
tail: AtomicLink,
write: bool,
thread: OnceCell<Thread>,
completed: Atomic<bool>,
}
/// An atomic node pointer with relaxed operations.
struct AtomicLink(Atomic<*mut Node>);
impl AtomicLink {
fn new(v: Option<NonNull<Node>>) -> AtomicLink {
AtomicLink(AtomicPtr::new(v.map_or(null_mut(), NonNull::as_ptr)))
}
fn get(&self) -> Option<NonNull<Node>> {
NonNull::new(self.0.load(Relaxed))
}
fn set(&self, v: Option<NonNull<Node>>) {
self.0.store(v.map_or(null_mut(), NonNull::as_ptr), Relaxed);
}
}
impl Node {
/// Creates a new queue node.
fn new(write: bool) -> Node {
Node {
next: AtomicLink::new(None),
prev: AtomicLink::new(None),
tail: AtomicLink::new(None),
write,
thread: OnceCell::new(),
completed: AtomicBool::new(false),
}
}
/// Prepare this node for waiting.
fn prepare(&mut self) {
// Fall back to creating an unnamed `Thread` handle to allow locking in TLS destructors.
self.thread.get_or_init(thread::current_or_unnamed);
self.completed = AtomicBool::new(false);
}
/// Wait until this node is marked as [`complete`](Node::complete)d by another thread.
///
/// # Safety
///
/// May only be called from the thread that created the node.
unsafe fn wait(&self) {
while !self.completed.load(Acquire) {
unsafe {
self.thread.get().unwrap().park();
}
}
}
/// Atomically mark this node as completed.
///
/// # Safety
///
/// `node` must point to a valid `Node`, and the node may not outlive this call.
unsafe fn complete(node: NonNull<Node>) {
// Since the node may be destroyed immediately after the completed flag is set, clone the
// thread handle before that.
let thread = unsafe { node.as_ref().thread.get().unwrap().clone() };
unsafe {
node.as_ref().completed.store(true, Release);
}
thread.unpark();
}
}
/// Traverse the queue and find the tail, adding backlinks to the queue while traversing.
///
/// This may be called from multiple threads at the same time as long as the queue is not being
/// modified (this happens when unlocking multiple readers).
///
/// # Safety
///
/// * `head` must point to a node in a valid queue.
/// * `head` must be in front of the previous head node that was used to perform the last removal.
/// * The part of the queue starting with `head` must not be modified during this call.
unsafe fn find_tail_and_add_backlinks(head: NonNull<Node>) -> NonNull<Node> {
let mut current = head;
// Traverse the queue until we find a node that has a set `tail`.
let tail = loop {
let c = unsafe { current.as_ref() };
if let Some(tail) = c.tail.get() {
break tail;
}
// SAFETY: All `next` fields before the first node with a set `tail` are non-null and valid
// (by Invariant 3).
unsafe {
let next = c.next.get().unwrap_unchecked();
next.as_ref().prev.set(Some(current));
current = next;
}
};
unsafe {
head.as_ref().tail.set(Some(tail));
tail
}
}
/// [`complete`](Node::complete)s all threads in the queue ending with `tail`.
///
/// # Safety
///
/// * `tail` must be a valid tail of a fully linked queue.
/// * The current thread must have exclusive access to that queue.
unsafe fn complete_all(tail: NonNull<Node>) {
let mut current = tail;
// Traverse backwards through the queue (FIFO) and `complete` all of the nodes.
loop {
let prev = unsafe { current.as_ref().prev.get() };
unsafe {
Node::complete(current);
}
match prev {
Some(prev) => current = prev,
None => return,
}
}
}
/// A type to guard against the unwinds of stacks that nodes are located on due to panics.
struct PanicGuard;
impl Drop for PanicGuard {
fn drop(&mut self) {
rtabort!("tried to drop node in intrusive list.");
}
}
/// The public inner `RwLock` type.
pub struct RwLock {
state: AtomicState,
}
impl RwLock {
#[inline]
pub const fn new() -> RwLock {
RwLock { state: AtomicPtr::new(UNLOCKED) }
}
#[inline]
pub fn try_read(&self) -> bool {
self.state.fetch_update(Acquire, Relaxed, read_lock).is_ok()
}
#[inline]
pub fn read(&self) {
if !self.try_read() {
self.lock_contended(false)
}
}
#[inline]
pub fn try_write(&self) -> bool {
// Atomically set the `LOCKED` bit. This is lowered to a single atomic instruction on most
// modern processors (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore
// is more efficient than `fetch_update(lock(true))`, which can spuriously fail if a new
// node is appended to the queue.
self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0
}
#[inline]
pub fn write(&self) {
if !self.try_write() {
self.lock_contended(true)
}
}
#[cold]
fn lock_contended(&self, write: bool) {
let mut node = Node::new(write);
let mut state = self.state.load(Relaxed);
let mut count = 0;
let update_fn = if write { write_lock } else { read_lock };
loop {
// Optimistically update the state.
if let Some(next) = update_fn(state) {
// The lock is available, try locking it.
match self.state.compare_exchange_weak(state, next, Acquire, Relaxed) {
Ok(_) => return,
Err(new) => state = new,
}
continue;
} else if state.addr() & QUEUED == 0 && count < SPIN_COUNT {
// If the lock is not available and no threads are queued, optimistically spin for a
// while, using exponential backoff to decrease cache contention.
for _ in 0..(1 << count) {
spin_loop();
}
state = self.state.load(Relaxed);
count += 1;
continue;
}
// The optimistic paths did not succeed, so fall back to parking the thread.
// First, prepare the node.
node.prepare();
// If there are threads queued, this will set the `next` field to be a pointer to the
// first node in the queue.
// If the state is read-locked, this will set `next` to the lock count.
// If it is write-locked, it will set `next` to zero.
node.next.0 = AtomicPtr::new(state.mask(NODE_MASK).cast());
node.prev = AtomicLink::new(None);
// Set the `QUEUED` bit and preserve the `LOCKED` and `DOWNGRADED` bit.
let mut next = ptr::from_ref(&node)
.map_addr(|addr| addr | QUEUED | (state.addr() & (DOWNGRADED | LOCKED)))
as State;
let mut is_queue_locked = false;
if state.addr() & QUEUED == 0 {
// If this is the first node in the queue, set the `tail` field to the node itself
// to ensure there is a valid `tail` field in the queue (Invariants 1 & 2).
// This needs to use `set` to avoid invalidating the new pointer.
node.tail.set(Some(NonNull::from(&node)));
} else {
// Otherwise, the tail of the queue is not known.
node.tail.set(None);
// Try locking the queue to eagerly add backlinks.
next = next.map_addr(|addr| addr | QUEUE_LOCKED);
// Track if we changed the `QUEUE_LOCKED` bit from off to on.
is_queue_locked = state.addr() & QUEUE_LOCKED == 0;
}
// Register the node, using release ordering to propagate our changes to the waking
// thread.
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
// The state has changed, just try again.
state = new;
continue;
}
// The node has been registered, so the structure must not be mutably accessed or
// destroyed while other threads may be accessing it.
// Guard against unwinds using a `PanicGuard` that aborts when dropped.
let guard = PanicGuard;
// If the current thread locked the queue, unlock it to eagerly adding backlinks.
if is_queue_locked {
// SAFETY: This thread set the `QUEUE_LOCKED` bit above.
unsafe {
self.unlock_queue(next);
}
}
// Wait until the node is removed from the queue.
// SAFETY: the node was created by the current thread.
unsafe {
node.wait();
}
// The node was removed from the queue, disarm the guard.
mem::forget(guard);
// Reload the state and try again.
state = self.state.load(Relaxed);
count = 0;
}
}
#[inline]
pub unsafe fn read_unlock(&self) {
match self.state.fetch_update(Release, Acquire, |state| {
if state.addr() & QUEUED == 0 {
// If there are no threads queued, simply decrement the reader count.
let count = state.addr() - (SINGLE | LOCKED);
Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED })
} else if state.addr() & DOWNGRADED != 0 {
// This thread used to have exclusive access, but requested a downgrade. This has
// not been completed yet, so we still have exclusive access.
// Retract the downgrade request and unlock, but leave waking up new threads to the
// thread that already holds the queue lock.
Some(state.mask(!(DOWNGRADED | LOCKED)))
} else {
None
}
}) {
Ok(_) => {}
// There are waiters queued and the lock count was moved to the tail of the queue.
Err(state) => unsafe { self.read_unlock_contended(state) },
}
}
/// # Safety
///
/// * There must be threads queued on the lock.
/// * `state` must be a pointer to a node in a valid queue.
/// * There cannot be a `downgrade` in progress.
#[cold]
unsafe fn read_unlock_contended(&self, state: State) {
// SAFETY:
// The state was observed with acquire ordering above, so the current thread will have
// observed all node initializations.
// We also know that no threads can be modifying the queue starting at `state`: because new
// read-locks cannot be acquired while there are any threads queued on the lock, all
// queue-lock owners will observe a set `LOCKED` bit in `self.state` and will not modify
// the queue. The other case that a thread could modify the queue is if a downgrade is in
// progress (removal of the entire queue), but since that is part of this function's safety
// contract, we can guarantee that no other threads can modify the queue.
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)).as_ref() };
// The lock count is stored in the `next` field of `tail`.
// Decrement it, making sure to observe all changes made to the queue by the other lock
// owners by using acquire-release ordering.
let was_last = tail.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0;
if was_last {
// SAFETY: Other threads cannot read-lock while threads are queued. Also, the `LOCKED`
// bit is still set, so there are no writers. Thus the current thread exclusively owns
// this lock, even though it is a reader.
unsafe { self.unlock_contended(state) }
}
}
#[inline]
pub unsafe fn write_unlock(&self) {
if let Err(state) =
self.state.compare_exchange(without_provenance_mut(LOCKED), UNLOCKED, Release, Relaxed)
{
// SAFETY: Since other threads cannot acquire the lock, the state can only have changed
// because there are threads queued on the lock.
unsafe { self.unlock_contended(state) }
}
}
/// # Safety
///
/// * The lock must be exclusively owned by this thread.
/// * There must be threads queued on the lock.
/// * `state` must be a pointer to a node in a valid queue.
/// * There cannot be a `downgrade` in progress.
#[cold]
unsafe fn unlock_contended(&self, state: State) {
debug_assert_eq!(state.addr() & (DOWNGRADED | QUEUED | LOCKED), QUEUED | LOCKED);
let mut current = state;
// We want to atomically release the lock and try to acquire the queue lock.
loop {
// First check if the queue lock is already held.
if current.addr() & QUEUE_LOCKED != 0 {
// Another thread holds the queue lock, so let them wake up waiters for us.
let next = current.mask(!LOCKED);
match self.state.compare_exchange_weak(current, next, Release, Relaxed) {
Ok(_) => return,
Err(new) => {
current = new;
continue;
}
}
}
// Atomically release the lock and try to acquire the queue lock.
let next = current.map_addr(|addr| (addr & !LOCKED) | QUEUE_LOCKED);
match self.state.compare_exchange_weak(current, next, AcqRel, Relaxed) {
// Now that we have the queue lock, we can wake up the next waiter.
Ok(_) => {
// SAFETY: This thread just acquired the queue lock, and this function's safety
// contract requires that there are threads already queued on the lock.
unsafe { self.unlock_queue(next) };
return;
}
Err(new) => current = new,
}
}
}
/// # Safety
///
/// * The lock must be write-locked by this thread.
#[inline]
pub unsafe fn downgrade(&self) {
// Optimistically change the state from write-locked with a single writer and no waiters to
// read-locked with a single reader and no waiters.
if let Err(state) = self.state.compare_exchange(
without_provenance_mut(LOCKED),
without_provenance_mut(SINGLE | LOCKED),
Release,
Relaxed,
) {
// SAFETY: The only way the state can have changed is if there are threads queued.
// Wake all of them up.
unsafe { self.downgrade_slow(state) }
}
}
/// Downgrades the lock from write-locked to read-locked in the case that there are threads
/// waiting on the wait queue.
///
/// This function will either wake up all of the waiters on the wait queue or designate the
/// current holder of the queue lock to wake up all of the waiters instead. Once the waiters
/// wake up, they will continue in the execution loop of `lock_contended`.
///
/// # Safety
///
/// * The lock must be write-locked by this thread.
/// * `state` must be a pointer to a node in a valid queue.
/// * There must be threads queued on the lock.
#[cold]
unsafe fn downgrade_slow(&self, mut state: State) {
debug_assert_eq!(state.addr() & (DOWNGRADED | QUEUED | LOCKED), QUEUED | LOCKED);
// Attempt to wake up all waiters by taking ownership of the entire waiter queue.
loop {
if state.addr() & QUEUE_LOCKED != 0 {
// Another thread already holds the queue lock. Tell it to wake up all waiters.
// If the other thread succeeds in waking up waiters before we release our lock, the
// effect will be just the same as if we had changed the state below.
// Otherwise, the `DOWNGRADED` bit will still be set, meaning that when this thread
// calls `read_unlock` later (because it holds a read lock and must unlock
// eventually), it will realize that the lock is still exclusively locked and act
// accordingly.
let next = state.map_addr(|addr| addr | DOWNGRADED);
match self.state.compare_exchange_weak(state, next, Release, Relaxed) {
Ok(_) => return,
Err(new) => state = new,
}
} else {
// Grab the entire queue by swapping the `state` with a single reader.
let next = ptr::without_provenance_mut(SINGLE | LOCKED);
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
state = new;
continue;
}
// SAFETY: We have full ownership of this queue now, so nobody else can modify it.
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
// Wake up all waiters.
// SAFETY: `tail` was just computed, meaning the whole queue is linked, and we have
// full ownership of the queue, so we have exclusive access.
unsafe { complete_all(tail) };
return;
}
}
}
/// Unlocks the queue. Wakes up all threads if a downgrade was requested, otherwise wakes up the
/// next eligible thread(s) if the lock is unlocked.
///
/// # Safety
///
/// * The queue lock must be held by the current thread.
/// * `state` must be a pointer to a node in a valid queue.
/// * There must be threads queued on the lock.
unsafe fn unlock_queue(&self, mut state: State) {
debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED);
loop {
// SAFETY: Since we have the queue lock, nobody else can be modifying the queue.
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
if state.addr() & (DOWNGRADED | LOCKED) == LOCKED {
// Another thread has locked the lock and no downgrade was requested.
// Leave waking up waiters to them by releasing the queue lock.
match self.state.compare_exchange_weak(
state,
state.mask(!QUEUE_LOCKED),
Release,
Acquire,
) {
Ok(_) => return,
Err(new) => {
state = new;
continue;
}
}
}
// Since we hold the queue lock and downgrades cannot be requested if the lock is
// already read-locked, we have exclusive control over the queue here and can make
// modifications.
let downgrade = state.addr() & DOWNGRADED != 0;
let is_writer = unsafe { tail.as_ref().write };
if !downgrade
&& is_writer
&& let Some(prev) = unsafe { tail.as_ref().prev.get() }
{
// If we are not downgrading and the next thread is a writer, only wake up that
// writing thread.
// Split off `tail`.
// There are no set `tail` links before the node pointed to by `state`, so the first
// non-null tail field will be current (Invariant 2).
// We also fulfill Invariant 4 since `find_tail` was called on this node, which
// ensures all backlinks are set.
unsafe {
to_node(state).as_ref().tail.set(Some(prev));
}
// Try to release the queue lock. We need to check the state again since another
// thread might have acquired the lock and requested a downgrade.
let next = state.mask(!QUEUE_LOCKED);
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
// Undo the tail modification above, so that we can find the tail again above.
// As mentioned above, we have exclusive control over the queue, so no other
// thread could have noticed the change.
unsafe {
to_node(state).as_ref().tail.set(Some(tail));
}
state = new;
continue;
}
// The tail was split off and the lock was released. Mark the node as completed.
unsafe {
return Node::complete(tail);
}
} else {
// We are either downgrading, the next waiter is a reader, or the queue only
// consists of one waiter. In any case, just wake all threads.
// Clear the queue.
let next =
if downgrade { ptr::without_provenance_mut(SINGLE | LOCKED) } else { UNLOCKED };
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
state = new;
continue;
}
// SAFETY: we computed `tail` above, and no new nodes can have been added since
// (otherwise the CAS above would have failed).
// Thus we have complete control over the whole queue.
unsafe {
return complete_all(tail);
}
}
}
}
}