rtic-sync: drop items when Receiver is dropped

This commit is contained in:
datdenkikniet 2025-04-05 17:06:55 +02:00 committed by Emil Fresk
parent 9183e2983e
commit 72b5bc4836

View file

@ -102,62 +102,10 @@ impl<T, const N: usize> Channel<T, N> {
}
}
/// Clear any remaining items from this `Channel`.
pub fn clear(&mut self) {
for item in self.queued_items() {
drop(item);
}
}
/// Return an iterator over the still-queued items, removing them
/// from this channel.
pub fn queued_items(&mut self) -> impl Iterator<Item = T> + '_ {
struct Iter<'a, T, const N: usize> {
inner: &'a mut Channel<T, N>,
}
impl<T, const N: usize> Iterator for Iter<'_, T, N> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
let slot = self.inner.readyq.as_mut().pop_back()?;
let value = unsafe {
// SAFETY: `ready` is a valid slot.
let first_element = self.inner.slots.get_unchecked(slot as usize).get_mut();
let ptr = first_element.deref().as_ptr();
// SAFETY: `ptr` points to an initialized `T`.
core::ptr::read(ptr)
};
// NOTE: do not `return_free_slot`, as we have mutable
// access to this `Channel` and no `Receiver` or `Sender`
// exist.
debug_assert!(!self.inner.freeq.as_mut().is_full());
unsafe {
// SAFETY: `freeq` is not ful.
self.inner.freeq.as_mut().push_back_unchecked(slot);
}
Some(value)
}
}
Iter { inner: self }
}
/// Split the queue into a `Sender`/`Receiver` pair.
///
/// # Panics
/// This function panics if there are items in this channel while splitting.
///
/// Call [`Channel::clear`] to clear all items from it, or [`Channel::queued_items`] to retrieve
/// an iterator that yields the values.
pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) {
assert!(
self.readyq.as_mut().is_empty(),
"Cannot re-split non-empty queue. Call `Channel::clear()`."
);
// NOTE(assert): queue is cleared by dropping the corresponding `Receiver`.
debug_assert!(self.readyq.as_mut().is_empty(),);
let freeq = self.freeq.as_mut();
@ -193,7 +141,8 @@ impl<T, const N: usize> Channel<T, N> {
/// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`.
/// 2. else, insert `slot` into `self.freeq`.
///
/// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`].
/// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`], and that `slot`
/// is returned at most once.
unsafe fn return_free_slot(&self, slot: u8) {
critical_section::with(|cs| {
fence(Ordering::SeqCst);
@ -216,11 +165,13 @@ impl<T, const N: usize> Channel<T, N> {
}
})
}
}
impl<T, const N: usize> Drop for Channel<T, N> {
fn drop(&mut self) {
self.clear();
/// SAFETY: the caller must guarantee that `slot` is an `u8` obtained by dequeueing from [`Self::readyq`],
/// and is read at most once.
unsafe fn read_slot(&self, slot: u8) -> T {
let first_element = self.slots.get_unchecked(slot as usize).get_mut();
let ptr = first_element.deref().as_ptr();
ptr::read(ptr)
}
}
@ -641,14 +592,13 @@ impl<T, const N: usize> Receiver<'_, T, N> {
if let Some(rs) = ready_slot {
// Read the value from the slots, note; this memcpy is not under a critical section.
let r = unsafe {
let first_element = self.0.slots.get_unchecked(rs as usize).get_mut();
let ptr = first_element.deref().as_ptr();
ptr::read(ptr)
};
// SAFETY: `rs` is directly obtained from `self.0.readyq` and is read exactly
// once.
let r = unsafe { self.0.read_slot(rs) };
// Return the index to the free queue after we've read the value.
// SAFETY: `rs` comes directly from `readyq`.
// SAFETY: `rs` comes directly from `readyq` and is only returned
// once.
unsafe { self.0.return_free_slot(rs) };
Ok(r)
@ -717,6 +667,19 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
self.0.receiver_dropped(cs, |v| *v = true);
});
let ready_slot = || {
critical_section::with(|cs| unsafe {
// SAFETY: `self.0.readyq` is not called recursively.
self.0.readyq(cs, |q| q.pop_back())
})
};
while let Some(slot) = ready_slot() {
// SAFETY: `slot` comes from `readyq` and is
// read exactly once.
drop(unsafe { self.0.read_slot(slot) })
}
while let Some((waker, _)) = self.0.wait_queue.pop() {
waker.wake();
}
@ -886,43 +849,10 @@ mod tests {
tx.try_send(SetToTrueOnDrop(value.clone())).unwrap();
drop((tx, rx));
drop(channel);
assert!(value.load(Ordering::SeqCst));
}
#[test]
pub fn cleared_item_is_dropped() {
let mut channel: Channel<SetToTrueOnDrop, 1> = Channel::new();
let (mut tx, rx) = channel.split();
let value = Arc::new(AtomicBool::new(false));
tx.try_send(SetToTrueOnDrop(value.clone())).unwrap();
drop((tx, rx));
assert!(!value.load(Ordering::SeqCst));
channel.clear();
assert!(value.load(Ordering::SeqCst));
}
#[test]
#[should_panic]
pub fn splitting_non_empty_channel_panics() {
let mut channel: Channel<(), 1> = Channel::new();
let (mut tx, rx) = channel.split();
tx.try_send(()).unwrap();
drop((tx, rx));
channel.split();
}
#[test]
pub fn splitting_empty_channel_works() {
let mut channel: Channel<(), 1> = Channel::new();
@ -933,7 +863,6 @@ mod tests {
drop((tx, rx));
channel.clear();
channel.split();
}
}