mirror of
https://github.com/rtic-rs/rtic.git
synced 2025-12-18 05:45:19 +01:00
rtic-sync: explicitly send an awoken Sender the slot it can use
This commit is contained in:
parent
6903d208b6
commit
70f57c3160
1 changed files with 71 additions and 33 deletions
|
|
@ -12,15 +12,17 @@ use core::{
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use critical_section;
|
pub use critical_section;
|
||||||
use heapless::Deque;
|
use heapless::Deque;
|
||||||
use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
|
|
||||||
use rtic_common::{
|
use rtic_common::{
|
||||||
dropper::OnDrop,
|
dropper::OnDrop, wait_queue::DoublyLinkedList, wait_queue::Link,
|
||||||
wait_queue::{Link, WaitQueue},
|
waker_registration::CriticalSectionWakerRegistration as WakerRegistration,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "defmt-03")]
|
#[cfg(feature = "defmt-03")]
|
||||||
use crate::defmt;
|
use crate::defmt;
|
||||||
|
|
||||||
|
type WaitQueueData = (Waker, SlotPtr);
|
||||||
|
type WaitQueue = DoublyLinkedList<WaitQueueData>;
|
||||||
|
|
||||||
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
|
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
|
||||||
///
|
///
|
||||||
/// This channel uses critical sections, however there are extremely small and all `memcpy`
|
/// This channel uses critical sections, however there are extremely small and all `memcpy`
|
||||||
|
|
@ -192,11 +194,11 @@ unsafe impl<T, const N: usize> Send for Sender<'_, T, N> {}
|
||||||
/// This is needed to make the async closure in `send` accept that we "share"
|
/// This is needed to make the async closure in `send` accept that we "share"
|
||||||
/// the link possible between threads.
|
/// the link possible between threads.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct LinkPtr(*mut Option<Link<Waker>>);
|
struct LinkPtr(*mut Option<Link<WaitQueueData>>);
|
||||||
|
|
||||||
impl LinkPtr {
|
impl LinkPtr {
|
||||||
/// This will dereference the pointer stored within and give out an `&mut`.
|
/// This will dereference the pointer stored within and give out an `&mut`.
|
||||||
unsafe fn get(&mut self) -> &mut Option<Link<Waker>> {
|
unsafe fn get(&mut self) -> &mut Option<Link<WaitQueueData>> {
|
||||||
&mut *self.0
|
&mut *self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -205,6 +207,28 @@ unsafe impl Send for LinkPtr {}
|
||||||
|
|
||||||
unsafe impl Sync for LinkPtr {}
|
unsafe impl Sync for LinkPtr {}
|
||||||
|
|
||||||
|
/// This is needed to make the async closure in `send` accept that we "share"
|
||||||
|
/// the link possible between threads.
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct SlotPtr(*mut Option<u8>);
|
||||||
|
|
||||||
|
impl SlotPtr {
|
||||||
|
/// Replace the value of this slot with `new_value`, and return
|
||||||
|
/// the old value.
|
||||||
|
fn replace(
|
||||||
|
&mut self,
|
||||||
|
new_value: Option<u8>,
|
||||||
|
_cs: critical_section::CriticalSection,
|
||||||
|
) -> Option<u8> {
|
||||||
|
// SAFETY: we are in a critical section.
|
||||||
|
unsafe { core::ptr::replace(self.0, new_value) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for SlotPtr {}
|
||||||
|
|
||||||
|
unsafe impl Sync for SlotPtr {}
|
||||||
|
|
||||||
impl<T, const N: usize> core::fmt::Debug for Sender<'_, T, N> {
|
impl<T, const N: usize> core::fmt::Debug for Sender<'_, T, N> {
|
||||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||||
write!(f, "Sender")
|
write!(f, "Sender")
|
||||||
|
|
@ -268,13 +292,17 @@ impl<T, const N: usize> Sender<'_, T, N> {
|
||||||
/// Send a value. If there is no place left in the queue this will wait until there is.
|
/// Send a value. If there is no place left in the queue this will wait until there is.
|
||||||
/// If the receiver does not exist this will return an error.
|
/// If the receiver does not exist this will return an error.
|
||||||
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
|
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
|
||||||
let mut link_ptr: Option<Link<Waker>> = None;
|
let mut free_slot_ptr: Option<u8> = None;
|
||||||
|
let mut link_ptr: Option<Link<WaitQueueData>> = None;
|
||||||
|
|
||||||
// Make this future `Drop`-safe.
|
// Make this future `Drop`-safe.
|
||||||
// SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
|
// SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
|
||||||
let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>);
|
let mut link_ptr = LinkPtr(core::ptr::addr_of_mut!(link_ptr));
|
||||||
|
// SAFETY(freed_slot): Shadow the original definition of `free_slot_ptr` so we can't abuse it.
|
||||||
|
let mut free_slot_ptr = SlotPtr(core::ptr::addr_of_mut!(free_slot_ptr));
|
||||||
|
|
||||||
let mut link_ptr2 = link_ptr.clone();
|
let mut link_ptr2 = link_ptr.clone();
|
||||||
|
let mut free_slot_ptr2 = free_slot_ptr.clone();
|
||||||
let dropper = OnDrop::new(|| {
|
let dropper = OnDrop::new(|| {
|
||||||
// SAFETY: We only run this closure and dereference the pointer if we have
|
// SAFETY: We only run this closure and dereference the pointer if we have
|
||||||
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
|
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
|
||||||
|
|
@ -282,30 +310,41 @@ impl<T, const N: usize> Sender<'_, T, N> {
|
||||||
if let Some(link) = unsafe { link_ptr2.get() } {
|
if let Some(link) = unsafe { link_ptr2.get() } {
|
||||||
link.remove_from_list(&self.0.wait_queue);
|
link.remove_from_list(&self.0.wait_queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Potentially unnecessary c-s because our link was already popped, so there
|
||||||
|
// is no way for anything else to access the free slot ptr. Gotta think
|
||||||
|
// about this a bit more...
|
||||||
|
critical_section::with(|cs| {
|
||||||
|
if let Some(freed_slot) = free_slot_ptr2.replace(None, cs) {
|
||||||
|
debug_assert!(!self.0.access(cs).freeq.is_full());
|
||||||
|
// SAFETY: freeq is not full.
|
||||||
|
unsafe {
|
||||||
|
self.0.access(cs).freeq.push_back_unchecked(freed_slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
let idx = poll_fn(|cx| {
|
let idx = poll_fn(|cx| {
|
||||||
if self.is_closed() {
|
|
||||||
return Poll::Ready(Err(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do all this in one critical section, else there can be race conditions
|
// Do all this in one critical section, else there can be race conditions
|
||||||
let queue_idx = critical_section::with(|cs| {
|
critical_section::with(|cs| {
|
||||||
let wq_empty = self.0.wait_queue.is_empty();
|
let wq_empty = self.0.wait_queue.is_empty();
|
||||||
let fq_empty = self.0.access(cs).freeq.is_empty();
|
let fq_empty = self.0.access(cs).freeq.is_empty();
|
||||||
|
|
||||||
if !wq_empty || fq_empty {
|
if !wq_empty || fq_empty {
|
||||||
// SAFETY: This pointer is only dereferenced here and on drop of the future
|
// SAFETY: This pointer is only dereferenced here and on drop of the future
|
||||||
// which happens outside this `poll_fn`'s stack frame.
|
// which happens outside this `poll_fn`'s stack frame.
|
||||||
let link = unsafe { link_ptr.get() };
|
let link = unsafe { link_ptr.get() };
|
||||||
if let Some(link) = link {
|
if let Some(link) = link {
|
||||||
if !link.is_popped() {
|
if !link.is_popped() {
|
||||||
return None;
|
return Poll::Pending;
|
||||||
} else {
|
} else {
|
||||||
// Fall through to dequeue
|
// Fall through to dequeue
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Place the link in the wait queue on first run.
|
// Place the link in the wait queue on first run.
|
||||||
let link_ref = link.insert(Link::new(cx.waker().clone()));
|
let link_ref =
|
||||||
|
link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
|
||||||
|
|
||||||
// SAFETY(new_unchecked): The address to the link is stable as it is defined
|
// SAFETY(new_unchecked): The address to the link is stable as it is defined
|
||||||
// outside this stack frame.
|
// outside this stack frame.
|
||||||
|
|
@ -315,24 +354,22 @@ impl<T, const N: usize> Sender<'_, T, N> {
|
||||||
// `link_ptr` lives until the end of the stack frame.
|
// `link_ptr` lives until the end of the stack frame.
|
||||||
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
|
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
|
||||||
|
|
||||||
return None;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(!self.0.access(cs).freeq.is_empty());
|
let slot = free_slot_ptr
|
||||||
// Get index as the queue is guaranteed not empty and the wait queue is empty
|
.replace(None, cs)
|
||||||
let idx = unsafe { self.0.access(cs).freeq.pop_front_unchecked() };
|
.or_else(|| self.0.access(cs).freeq.pop_back());
|
||||||
|
|
||||||
Some(idx)
|
if let Some(slot) = slot {
|
||||||
});
|
Poll::Ready(Ok(slot))
|
||||||
|
|
||||||
if let Some(idx) = queue_idx {
|
|
||||||
// Return the index
|
|
||||||
Poll::Ready(Ok(idx))
|
|
||||||
} else {
|
} else {
|
||||||
Poll::Pending
|
debug_assert!(self.is_closed());
|
||||||
|
Poll::Ready(Err(()))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Make sure the link is removed from the queue.
|
// Make sure the link is removed from the queue.
|
||||||
|
|
@ -430,14 +467,15 @@ impl<T, const N: usize> Receiver<'_, T, N> {
|
||||||
|
|
||||||
// Return the index to the free queue after we've read the value.
|
// Return the index to the free queue after we've read the value.
|
||||||
critical_section::with(|cs| {
|
critical_section::with(|cs| {
|
||||||
assert!(!self.0.access(cs).freeq.is_full());
|
|
||||||
unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
|
|
||||||
|
|
||||||
fence(Ordering::SeqCst);
|
fence(Ordering::SeqCst);
|
||||||
|
|
||||||
// If someone is waiting in the WaiterQueue, wake the first one up.
|
// If someone is waiting in the WaiterQueue, wake the first one up & hand it the free slot.
|
||||||
if let Some(wait_head) = self.0.wait_queue.pop() {
|
if let Some((wait_head, mut freeq_slot)) = self.0.wait_queue.pop() {
|
||||||
|
freeq_slot.replace(Some(rs), cs);
|
||||||
wait_head.wake();
|
wait_head.wake();
|
||||||
|
} else {
|
||||||
|
assert!(!self.0.access(cs).freeq.is_full());
|
||||||
|
unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(r)
|
Ok(r)
|
||||||
|
|
@ -495,7 +533,7 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
|
||||||
// Mark the receiver as dropped and wake all waiters
|
// Mark the receiver as dropped and wake all waiters
|
||||||
critical_section::with(|cs| *self.0.access(cs).receiver_dropped = true);
|
critical_section::with(|cs| *self.0.access(cs).receiver_dropped = true);
|
||||||
|
|
||||||
while let Some(waker) = self.0.wait_queue.pop() {
|
while let Some((waker, _)) = self.0.wait_queue.pop() {
|
||||||
waker.wake();
|
waker.wake();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue