diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs index aca8cf4530d..5b83fce86b6 100644 --- a/rtic-sync/src/channel.rs +++ b/rtic-sync/src/channel.rs @@ -12,15 +12,17 @@ use core::{ #[doc(hidden)] pub use critical_section; use heapless::Deque; -use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration; use rtic_common::{ - dropper::OnDrop, - wait_queue::{Link, WaitQueue}, + dropper::OnDrop, wait_queue::DoublyLinkedList, wait_queue::Link, + waker_registration::CriticalSectionWakerRegistration as WakerRegistration, }; #[cfg(feature = "defmt-03")] use crate::defmt; +type WaitQueueData = (Waker, SlotPtr); +type WaitQueue = DoublyLinkedList; + /// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue. /// /// This channel uses critical sections, however there are extremely small and all `memcpy` @@ -192,11 +194,11 @@ unsafe impl Send for Sender<'_, T, N> {} /// This is needed to make the async closure in `send` accept that we "share" /// the link possible between threads. #[derive(Clone)] -struct LinkPtr(*mut Option>); +struct LinkPtr(*mut Option>); impl LinkPtr { /// This will dereference the pointer stored within and give out an `&mut`. - unsafe fn get(&mut self) -> &mut Option> { + unsafe fn get(&mut self) -> &mut Option> { &mut *self.0 } } @@ -205,6 +207,28 @@ unsafe impl Send for LinkPtr {} unsafe impl Sync for LinkPtr {} +/// This is needed to make the async closure in `send` accept that we "share" +/// the link possible between threads. +#[derive(Clone)] +struct SlotPtr(*mut Option); + +impl SlotPtr { + /// Replace the value of this slot with `new_value`, and return + /// the old value. + fn replace( + &mut self, + new_value: Option, + _cs: critical_section::CriticalSection, + ) -> Option { + // SAFETY: we are in a critical section. + unsafe { core::ptr::replace(self.0, new_value) } + } +} + +unsafe impl Send for SlotPtr {} + +unsafe impl Sync for SlotPtr {} + impl core::fmt::Debug for Sender<'_, T, N> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!(f, "Sender") @@ -268,13 +292,17 @@ impl Sender<'_, T, N> { /// Send a value. If there is no place left in the queue this will wait until there is. /// If the receiver does not exist this will return an error. pub async fn send(&mut self, val: T) -> Result<(), NoReceiver> { - let mut link_ptr: Option> = None; + let mut free_slot_ptr: Option = None; + let mut link_ptr: Option> = None; // Make this future `Drop`-safe. // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it. - let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>); + let mut link_ptr = LinkPtr(core::ptr::addr_of_mut!(link_ptr)); + // SAFETY(freed_slot): Shadow the original definition of `free_slot_ptr` so we can't abuse it. + let mut free_slot_ptr = SlotPtr(core::ptr::addr_of_mut!(free_slot_ptr)); let mut link_ptr2 = link_ptr.clone(); + let mut free_slot_ptr2 = free_slot_ptr.clone(); let dropper = OnDrop::new(|| { // SAFETY: We only run this closure and dereference the pointer if we have // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference @@ -282,30 +310,41 @@ impl Sender<'_, T, N> { if let Some(link) = unsafe { link_ptr2.get() } { link.remove_from_list(&self.0.wait_queue); } + + // Potentially unnecessary c-s because our link was already popped, so there + // is no way for anything else to access the free slot ptr. Gotta think + // about this a bit more... + critical_section::with(|cs| { + if let Some(freed_slot) = free_slot_ptr2.replace(None, cs) { + debug_assert!(!self.0.access(cs).freeq.is_full()); + // SAFETY: freeq is not full. + unsafe { + self.0.access(cs).freeq.push_back_unchecked(freed_slot); + } + } + }); }); let idx = poll_fn(|cx| { - if self.is_closed() { - return Poll::Ready(Err(())); - } - // Do all this in one critical section, else there can be race conditions - let queue_idx = critical_section::with(|cs| { + critical_section::with(|cs| { let wq_empty = self.0.wait_queue.is_empty(); let fq_empty = self.0.access(cs).freeq.is_empty(); + if !wq_empty || fq_empty { // SAFETY: This pointer is only dereferenced here and on drop of the future // which happens outside this `poll_fn`'s stack frame. let link = unsafe { link_ptr.get() }; if let Some(link) = link { if !link.is_popped() { - return None; + return Poll::Pending; } else { // Fall through to dequeue } } else { // Place the link in the wait queue on first run. - let link_ref = link.insert(Link::new(cx.waker().clone())); + let link_ref = + link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone()))); // SAFETY(new_unchecked): The address to the link is stable as it is defined // outside this stack frame. @@ -315,23 +354,21 @@ impl Sender<'_, T, N> { // `link_ptr` lives until the end of the stack frame. unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) }; - return None; + return Poll::Pending; } } - assert!(!self.0.access(cs).freeq.is_empty()); - // Get index as the queue is guaranteed not empty and the wait queue is empty - let idx = unsafe { self.0.access(cs).freeq.pop_front_unchecked() }; + let slot = free_slot_ptr + .replace(None, cs) + .or_else(|| self.0.access(cs).freeq.pop_back()); - Some(idx) - }); - - if let Some(idx) = queue_idx { - // Return the index - Poll::Ready(Ok(idx)) - } else { - Poll::Pending - } + if let Some(slot) = slot { + Poll::Ready(Ok(slot)) + } else { + debug_assert!(self.is_closed()); + Poll::Ready(Err(())) + } + }) }) .await; @@ -430,14 +467,15 @@ impl Receiver<'_, T, N> { // Return the index to the free queue after we've read the value. critical_section::with(|cs| { - assert!(!self.0.access(cs).freeq.is_full()); - unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) } - fence(Ordering::SeqCst); - // If someone is waiting in the WaiterQueue, wake the first one up. - if let Some(wait_head) = self.0.wait_queue.pop() { + // If someone is waiting in the WaiterQueue, wake the first one up & hand it the free slot. + if let Some((wait_head, mut freeq_slot)) = self.0.wait_queue.pop() { + freeq_slot.replace(Some(rs), cs); wait_head.wake(); + } else { + assert!(!self.0.access(cs).freeq.is_full()); + unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) } } Ok(r) @@ -495,7 +533,7 @@ impl Drop for Receiver<'_, T, N> { // Mark the receiver as dropped and wake all waiters critical_section::with(|cs| *self.0.access(cs).receiver_dropped = true); - while let Some(waker) = self.0.wait_queue.pop() { + while let Some((waker, _)) = self.0.wait_queue.pop() { waker.wake(); } }