rtic-sync: Channel: Sender: rewriter send logic to be easier to validate

This commit is contained in:
datdenkikniet 2025-03-13 22:41:16 +01:00 committed by Emil Fresk
parent daf977dcff
commit 4fa3f5ddba

View file

@ -333,44 +333,51 @@ impl<T, const N: usize> Sender<'_, T, N> {
// Do all this in one critical section, else there can be race conditions
critical_section::with(|cs| {
let wq_empty = self.0.wait_queue.is_empty();
let fq_empty = self.0.access(cs).freeq.is_empty();
let freeq_empty = self.0.access(cs).freeq.is_empty();
if !wq_empty || fq_empty {
// SAFETY: This pointer is only dereferenced here and on drop of the future
// which happens outside this `poll_fn`'s stack frame.
let link = unsafe { link_ptr.get() };
if let Some(link) = link {
if !link.is_popped() {
return Poll::Pending;
// SAFETY: This pointer is only dereferenced here and on drop of the future
// which happens outside this `poll_fn`'s stack frame.
let link = unsafe { link_ptr.get() };
// We are already in the wait queue.
if let Some(link) = link {
if link.is_popped() {
// If our link is popped, then:
// 1. We were popped by `try_recv` and it provided us with a slot.
// 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed.
let slot = unsafe { free_slot_ptr.replace(None, cs) };
if let Some(slot) = slot {
Poll::Ready(Ok(slot))
} else {
// Fall through to dequeue
Poll::Ready(Err(()))
}
} else {
// Place the link in the wait queue on first run.
let link_ref =
link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
// SAFETY(new_unchecked): The address to the link is stable as it is defined
// outside this stack frame.
// SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed,
// and we make sure in `dropper` that the link is removed from the queue
// before dropping `link_ptr` AND `dropper` makes sure that the shadowed
// `link_ptr` lives until the end of the stack frame.
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
return Poll::Pending;
Poll::Pending
}
}
// We are not in the wait queue, but others are, or there is currently no free
// slot available.
else if !wq_empty || freeq_empty {
// Place the link in the wait queue.
let link_ref =
link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
// SAFETY: `free_slot_ptr` is valid for writes, as `free_slot_ptr` is still alive.
let slot = unsafe { free_slot_ptr.replace(None, cs) }
.or_else(|| self.0.access(cs).freeq.pop_back());
// SAFETY(new_unchecked): The address to the link is stable as it is defined
// outside this stack frame.
// SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed,
// and we make sure in `dropper` that the link is removed from the queue
// before dropping `link_ptr` AND `dropper` makes sure that the shadowed
// `link_ptr` lives until the end of the stack frame.
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
if let Some(slot) = slot {
Poll::Pending
}
// We are not in the wait queue, no one else is waiting, and there is a free slot available.
else {
assert!(!self.0.access(cs).freeq.is_empty());
let slot = unsafe { self.0.access(cs).freeq.pop_back_unchecked() };
Poll::Ready(Ok(slot))
} else {
debug_assert!(self.is_closed());
Poll::Ready(Err(()))
}
})
})