diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs index e665204a618..fe536085e16 100644 --- a/rtic-sync/src/channel.rs +++ b/rtic-sync/src/channel.rs @@ -102,62 +102,10 @@ impl Channel { } } - /// Clear any remaining items from this `Channel`. - pub fn clear(&mut self) { - for item in self.queued_items() { - drop(item); - } - } - - /// Return an iterator over the still-queued items, removing them - /// from this channel. - pub fn queued_items(&mut self) -> impl Iterator + '_ { - struct Iter<'a, T, const N: usize> { - inner: &'a mut Channel, - } - - impl Iterator for Iter<'_, T, N> { - type Item = T; - - fn next(&mut self) -> Option { - let slot = self.inner.readyq.as_mut().pop_back()?; - - let value = unsafe { - // SAFETY: `ready` is a valid slot. - let first_element = self.inner.slots.get_unchecked(slot as usize).get_mut(); - let ptr = first_element.deref().as_ptr(); - // SAFETY: `ptr` points to an initialized `T`. - core::ptr::read(ptr) - }; - - // NOTE: do not `return_free_slot`, as we have mutable - // access to this `Channel` and no `Receiver` or `Sender` - // exist. - debug_assert!(!self.inner.freeq.as_mut().is_full()); - unsafe { - // SAFETY: `freeq` is not ful. - self.inner.freeq.as_mut().push_back_unchecked(slot); - } - - Some(value) - } - } - - Iter { inner: self } - } - /// Split the queue into a `Sender`/`Receiver` pair. - /// - /// # Panics - /// This function panics if there are items in this channel while splitting. - /// - /// Call [`Channel::clear`] to clear all items from it, or [`Channel::queued_items`] to retrieve - /// an iterator that yields the values. pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) { - assert!( - self.readyq.as_mut().is_empty(), - "Cannot re-split non-empty queue. Call `Channel::clear()`." - ); + // NOTE(assert): queue is cleared by dropping the corresponding `Receiver`. + debug_assert!(self.readyq.as_mut().is_empty(),); let freeq = self.freeq.as_mut(); @@ -193,7 +141,8 @@ impl Channel { /// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`. /// 2. else, insert `slot` into `self.freeq`. /// - /// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`]. + /// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`], and that `slot` + /// is returned at most once. unsafe fn return_free_slot(&self, slot: u8) { critical_section::with(|cs| { fence(Ordering::SeqCst); @@ -216,11 +165,13 @@ impl Channel { } }) } -} -impl Drop for Channel { - fn drop(&mut self) { - self.clear(); + /// SAFETY: the caller must guarantee that `slot` is an `u8` obtained by dequeueing from [`Self::readyq`], + /// and is read at most once. + unsafe fn read_slot(&self, slot: u8) -> T { + let first_element = self.slots.get_unchecked(slot as usize).get_mut(); + let ptr = first_element.deref().as_ptr(); + ptr::read(ptr) } } @@ -641,14 +592,13 @@ impl Receiver<'_, T, N> { if let Some(rs) = ready_slot { // Read the value from the slots, note; this memcpy is not under a critical section. - let r = unsafe { - let first_element = self.0.slots.get_unchecked(rs as usize).get_mut(); - let ptr = first_element.deref().as_ptr(); - ptr::read(ptr) - }; + // SAFETY: `rs` is directly obtained from `self.0.readyq` and is read exactly + // once. + let r = unsafe { self.0.read_slot(rs) }; // Return the index to the free queue after we've read the value. - // SAFETY: `rs` comes directly from `readyq`. + // SAFETY: `rs` comes directly from `readyq` and is only returned + // once. unsafe { self.0.return_free_slot(rs) }; Ok(r) @@ -717,6 +667,19 @@ impl Drop for Receiver<'_, T, N> { self.0.receiver_dropped(cs, |v| *v = true); }); + let ready_slot = || { + critical_section::with(|cs| unsafe { + // SAFETY: `self.0.readyq` is not called recursively. + self.0.readyq(cs, |q| q.pop_back()) + }) + }; + + while let Some(slot) = ready_slot() { + // SAFETY: `slot` comes from `readyq` and is + // read exactly once. + drop(unsafe { self.0.read_slot(slot) }) + } + while let Some((waker, _)) = self.0.wait_queue.pop() { waker.wake(); } @@ -886,43 +849,10 @@ mod tests { tx.try_send(SetToTrueOnDrop(value.clone())).unwrap(); drop((tx, rx)); - drop(channel); assert!(value.load(Ordering::SeqCst)); } - #[test] - pub fn cleared_item_is_dropped() { - let mut channel: Channel = Channel::new(); - - let (mut tx, rx) = channel.split(); - - let value = Arc::new(AtomicBool::new(false)); - tx.try_send(SetToTrueOnDrop(value.clone())).unwrap(); - - drop((tx, rx)); - - assert!(!value.load(Ordering::SeqCst)); - - channel.clear(); - - assert!(value.load(Ordering::SeqCst)); - } - - #[test] - #[should_panic] - pub fn splitting_non_empty_channel_panics() { - let mut channel: Channel<(), 1> = Channel::new(); - - let (mut tx, rx) = channel.split(); - - tx.try_send(()).unwrap(); - - drop((tx, rx)); - - channel.split(); - } - #[test] pub fn splitting_empty_channel_works() { let mut channel: Channel<(), 1> = Channel::new(); @@ -933,7 +863,6 @@ mod tests { drop((tx, rx)); - channel.clear(); channel.split(); } }