From 4fa3f5ddbac646f0cec0ddb4b95828ce29182ece Mon Sep 17 00:00:00 2001 From: datdenkikniet Date: Thu, 13 Mar 2025 22:41:16 +0100 Subject: [PATCH] rtic-sync: Channel: Sender: rewriter `send` logic to be easier to validate --- rtic-sync/src/channel.rs | 65 ++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs index d5574d0bd21..00e94765096 100644 --- a/rtic-sync/src/channel.rs +++ b/rtic-sync/src/channel.rs @@ -333,44 +333,51 @@ impl Sender<'_, T, N> { // Do all this in one critical section, else there can be race conditions critical_section::with(|cs| { let wq_empty = self.0.wait_queue.is_empty(); - let fq_empty = self.0.access(cs).freeq.is_empty(); + let freeq_empty = self.0.access(cs).freeq.is_empty(); - if !wq_empty || fq_empty { - // SAFETY: This pointer is only dereferenced here and on drop of the future - // which happens outside this `poll_fn`'s stack frame. - let link = unsafe { link_ptr.get() }; - if let Some(link) = link { - if !link.is_popped() { - return Poll::Pending; + // SAFETY: This pointer is only dereferenced here and on drop of the future + // which happens outside this `poll_fn`'s stack frame. + let link = unsafe { link_ptr.get() }; + + // We are already in the wait queue. + if let Some(link) = link { + if link.is_popped() { + // If our link is popped, then: + // 1. We were popped by `try_recv` and it provided us with a slot. + // 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed. + let slot = unsafe { free_slot_ptr.replace(None, cs) }; + + if let Some(slot) = slot { + Poll::Ready(Ok(slot)) } else { - // Fall through to dequeue + Poll::Ready(Err(())) } } else { - // Place the link in the wait queue on first run. - let link_ref = - link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone()))); - - // SAFETY(new_unchecked): The address to the link is stable as it is defined - // outside this stack frame. - // SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed, - // and we make sure in `dropper` that the link is removed from the queue - // before dropping `link_ptr` AND `dropper` makes sure that the shadowed - // `link_ptr` lives until the end of the stack frame. - unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) }; - - return Poll::Pending; + Poll::Pending } } + // We are not in the wait queue, but others are, or there is currently no free + // slot available. + else if !wq_empty || freeq_empty { + // Place the link in the wait queue. + let link_ref = + link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone()))); - // SAFETY: `free_slot_ptr` is valid for writes, as `free_slot_ptr` is still alive. - let slot = unsafe { free_slot_ptr.replace(None, cs) } - .or_else(|| self.0.access(cs).freeq.pop_back()); + // SAFETY(new_unchecked): The address to the link is stable as it is defined + // outside this stack frame. + // SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed, + // and we make sure in `dropper` that the link is removed from the queue + // before dropping `link_ptr` AND `dropper` makes sure that the shadowed + // `link_ptr` lives until the end of the stack frame. + unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) }; - if let Some(slot) = slot { + Poll::Pending + } + // We are not in the wait queue, no one else is waiting, and there is a free slot available. + else { + assert!(!self.0.access(cs).freeq.is_empty()); + let slot = unsafe { self.0.access(cs).freeq.pop_back_unchecked() }; Poll::Ready(Ok(slot)) - } else { - debug_assert!(self.is_closed()); - Poll::Ready(Err(())) } }) })