mirror of
https://github.com/rtic-rs/rtic.git
synced 2025-12-16 12:55:23 +01:00
rtic-sync: drop items when Receiver is dropped
This commit is contained in:
parent
7421b1d5b8
commit
afe0c061ad
1 changed files with 28 additions and 99 deletions
|
|
@ -102,62 +102,10 @@ impl<T, const N: usize> Channel<T, N> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Clear any remaining items from this `Channel`.
|
||||
pub fn clear(&mut self) {
|
||||
for item in self.queued_items() {
|
||||
drop(item);
|
||||
}
|
||||
}
|
||||
|
||||
/// Return an iterator over the still-queued items, removing them
|
||||
/// from this channel.
|
||||
pub fn queued_items(&mut self) -> impl Iterator<Item = T> + '_ {
|
||||
struct Iter<'a, T, const N: usize> {
|
||||
inner: &'a mut Channel<T, N>,
|
||||
}
|
||||
|
||||
impl<T, const N: usize> Iterator for Iter<'_, T, N> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let slot = self.inner.readyq.as_mut().pop_back()?;
|
||||
|
||||
let value = unsafe {
|
||||
// SAFETY: `ready` is a valid slot.
|
||||
let first_element = self.inner.slots.get_unchecked(slot as usize).get_mut();
|
||||
let ptr = first_element.deref().as_ptr();
|
||||
// SAFETY: `ptr` points to an initialized `T`.
|
||||
core::ptr::read(ptr)
|
||||
};
|
||||
|
||||
// NOTE: do not `return_free_slot`, as we have mutable
|
||||
// access to this `Channel` and no `Receiver` or `Sender`
|
||||
// exist.
|
||||
debug_assert!(!self.inner.freeq.as_mut().is_full());
|
||||
unsafe {
|
||||
// SAFETY: `freeq` is not ful.
|
||||
self.inner.freeq.as_mut().push_back_unchecked(slot);
|
||||
}
|
||||
|
||||
Some(value)
|
||||
}
|
||||
}
|
||||
|
||||
Iter { inner: self }
|
||||
}
|
||||
|
||||
/// Split the queue into a `Sender`/`Receiver` pair.
|
||||
///
|
||||
/// # Panics
|
||||
/// This function panics if there are items in this channel while splitting.
|
||||
///
|
||||
/// Call [`Channel::clear`] to clear all items from it, or [`Channel::queued_items`] to retrieve
|
||||
/// an iterator that yields the values.
|
||||
pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) {
|
||||
assert!(
|
||||
self.readyq.as_mut().is_empty(),
|
||||
"Cannot re-split non-empty queue. Call `Channel::clear()`."
|
||||
);
|
||||
// NOTE(assert): queue is cleared by dropping the corresponding `Receiver`.
|
||||
debug_assert!(self.readyq.as_mut().is_empty(),);
|
||||
|
||||
let freeq = self.freeq.as_mut();
|
||||
|
||||
|
|
@ -193,7 +141,8 @@ impl<T, const N: usize> Channel<T, N> {
|
|||
/// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`.
|
||||
/// 2. else, insert `slot` into `self.freeq`.
|
||||
///
|
||||
/// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`].
|
||||
/// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`], and that `slot`
|
||||
/// is returned at most once.
|
||||
unsafe fn return_free_slot(&self, slot: u8) {
|
||||
critical_section::with(|cs| {
|
||||
fence(Ordering::SeqCst);
|
||||
|
|
@ -216,11 +165,13 @@ impl<T, const N: usize> Channel<T, N> {
|
|||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, const N: usize> Drop for Channel<T, N> {
|
||||
fn drop(&mut self) {
|
||||
self.clear();
|
||||
/// SAFETY: the caller must guarantee that `slot` is an `u8` obtained by dequeueing from [`Self::readyq`],
|
||||
/// and is read at most once.
|
||||
unsafe fn read_slot(&self, slot: u8) -> T {
|
||||
let first_element = self.slots.get_unchecked(slot as usize).get_mut();
|
||||
let ptr = first_element.deref().as_ptr();
|
||||
ptr::read(ptr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -641,14 +592,13 @@ impl<T, const N: usize> Receiver<'_, T, N> {
|
|||
|
||||
if let Some(rs) = ready_slot {
|
||||
// Read the value from the slots, note; this memcpy is not under a critical section.
|
||||
let r = unsafe {
|
||||
let first_element = self.0.slots.get_unchecked(rs as usize).get_mut();
|
||||
let ptr = first_element.deref().as_ptr();
|
||||
ptr::read(ptr)
|
||||
};
|
||||
// SAFETY: `rs` is directly obtained from `self.0.readyq` and is read exactly
|
||||
// once.
|
||||
let r = unsafe { self.0.read_slot(rs) };
|
||||
|
||||
// Return the index to the free queue after we've read the value.
|
||||
// SAFETY: `rs` comes directly from `readyq`.
|
||||
// SAFETY: `rs` comes directly from `readyq` and is only returned
|
||||
// once.
|
||||
unsafe { self.0.return_free_slot(rs) };
|
||||
|
||||
Ok(r)
|
||||
|
|
@ -717,6 +667,19 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
|
|||
self.0.receiver_dropped(cs, |v| *v = true);
|
||||
});
|
||||
|
||||
let ready_slot = || {
|
||||
critical_section::with(|cs| unsafe {
|
||||
// SAFETY: `self.0.readyq` is not called recursively.
|
||||
self.0.readyq(cs, |q| q.pop_back())
|
||||
})
|
||||
};
|
||||
|
||||
while let Some(slot) = ready_slot() {
|
||||
// SAFETY: `slot` comes from `readyq` and is
|
||||
// read exactly once.
|
||||
drop(unsafe { self.0.read_slot(slot) })
|
||||
}
|
||||
|
||||
while let Some((waker, _)) = self.0.wait_queue.pop() {
|
||||
waker.wake();
|
||||
}
|
||||
|
|
@ -886,43 +849,10 @@ mod tests {
|
|||
tx.try_send(SetToTrueOnDrop(value.clone())).unwrap();
|
||||
|
||||
drop((tx, rx));
|
||||
drop(channel);
|
||||
|
||||
assert!(value.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn cleared_item_is_dropped() {
|
||||
let mut channel: Channel<SetToTrueOnDrop, 1> = Channel::new();
|
||||
|
||||
let (mut tx, rx) = channel.split();
|
||||
|
||||
let value = Arc::new(AtomicBool::new(false));
|
||||
tx.try_send(SetToTrueOnDrop(value.clone())).unwrap();
|
||||
|
||||
drop((tx, rx));
|
||||
|
||||
assert!(!value.load(Ordering::SeqCst));
|
||||
|
||||
channel.clear();
|
||||
|
||||
assert!(value.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
pub fn splitting_non_empty_channel_panics() {
|
||||
let mut channel: Channel<(), 1> = Channel::new();
|
||||
|
||||
let (mut tx, rx) = channel.split();
|
||||
|
||||
tx.try_send(()).unwrap();
|
||||
|
||||
drop((tx, rx));
|
||||
|
||||
channel.split();
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn splitting_empty_channel_works() {
|
||||
let mut channel: Channel<(), 1> = Channel::new();
|
||||
|
|
@ -933,7 +863,6 @@ mod tests {
|
|||
|
||||
drop((tx, rx));
|
||||
|
||||
channel.clear();
|
||||
channel.split();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue