mirror of
https://github.com/rtic-rs/rtic.git
synced 2024-11-23 20:22:51 +01:00
Use Pin
in the linked lists
This commit is contained in:
parent
5908d5bdbc
commit
3050fc0591
4 changed files with 42 additions and 15 deletions
|
@ -8,6 +8,7 @@ use core::{
|
||||||
cell::UnsafeCell,
|
cell::UnsafeCell,
|
||||||
future::poll_fn,
|
future::poll_fn,
|
||||||
mem::MaybeUninit,
|
mem::MaybeUninit,
|
||||||
|
pin::Pin,
|
||||||
ptr,
|
ptr,
|
||||||
task::{Poll, Waker},
|
task::{Poll, Waker},
|
||||||
};
|
};
|
||||||
|
@ -177,10 +178,11 @@ impl<'a, T, const N: usize> Sender<'a, T, N> {
|
||||||
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
|
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
|
||||||
if self.is_closed() {}
|
if self.is_closed() {}
|
||||||
|
|
||||||
let mut __hidden_link: Option<wait_queue::Link<Waker>> = None;
|
let mut link_ptr: Option<wait_queue::Link<Waker>> = None;
|
||||||
|
|
||||||
|
// Make this future `Drop`-safe, also shadow the original definition so we can't abuse it.
|
||||||
|
let link_ptr = &mut link_ptr as *mut Option<wait_queue::Link<Waker>>;
|
||||||
|
|
||||||
// Make this future `Drop`-safe
|
|
||||||
let link_ptr = &mut __hidden_link as *mut Option<wait_queue::Link<Waker>>;
|
|
||||||
let dropper = OnDrop::new(|| {
|
let dropper = OnDrop::new(|| {
|
||||||
// SAFETY: We only run this closure and dereference the pointer if we have
|
// SAFETY: We only run this closure and dereference the pointer if we have
|
||||||
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
|
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
|
||||||
|
@ -198,12 +200,18 @@ impl<'a, T, const N: usize> Sender<'a, T, N> {
|
||||||
// Do all this in one critical section, else there can be race conditions
|
// Do all this in one critical section, else there can be race conditions
|
||||||
let queue_idx = critical_section::with(|cs| {
|
let queue_idx = critical_section::with(|cs| {
|
||||||
if !self.0.wait_queue.is_empty() || self.0.access(cs).freeq.is_empty() {
|
if !self.0.wait_queue.is_empty() || self.0.access(cs).freeq.is_empty() {
|
||||||
// SAFETY: This pointer is only dereferenced here and on drop of the future.
|
// SAFETY: This pointer is only dereferenced here and on drop of the future
|
||||||
|
// which happens outside this `poll_fn`'s stack frame.
|
||||||
let link = unsafe { &mut *link_ptr };
|
let link = unsafe { &mut *link_ptr };
|
||||||
if link.is_none() {
|
if link.is_none() {
|
||||||
// Place the link in the wait queue on first run.
|
// Place the link in the wait queue on first run.
|
||||||
let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone()));
|
let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone()));
|
||||||
self.0.wait_queue.push(link_ref);
|
|
||||||
|
// SAFETY: The address to the link is stable as it is hidden behind
|
||||||
|
// `link_ptr`, and `link_ptr` shadows the original making it unmovable.
|
||||||
|
self.0
|
||||||
|
.wait_queue
|
||||||
|
.push(unsafe { Pin::new_unchecked(link_ref) });
|
||||||
}
|
}
|
||||||
|
|
||||||
return None;
|
return None;
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
//! ...
|
//! ...
|
||||||
|
|
||||||
use core::marker::PhantomPinned;
|
use core::marker::PhantomPinned;
|
||||||
|
use core::pin::Pin;
|
||||||
use core::ptr::null_mut;
|
use core::ptr::null_mut;
|
||||||
use core::sync::atomic::{AtomicPtr, Ordering};
|
use core::sync::atomic::{AtomicPtr, Ordering};
|
||||||
use core::task::Waker;
|
use core::task::Waker;
|
||||||
|
@ -65,13 +66,16 @@ impl<T: Clone> LinkedList<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Put an element at the back of the queue.
|
/// Put an element at the back of the queue.
|
||||||
pub fn push(&self, link: &mut Link<T>) {
|
pub fn push(&self, link: Pin<&mut Link<T>>) {
|
||||||
cs::with(|_| {
|
cs::with(|_| {
|
||||||
// Make sure all previous writes are visible
|
// Make sure all previous writes are visible
|
||||||
core::sync::atomic::fence(Ordering::SeqCst);
|
core::sync::atomic::fence(Ordering::SeqCst);
|
||||||
|
|
||||||
let tail = self.tail.load(Self::R);
|
let tail = self.tail.load(Self::R);
|
||||||
|
|
||||||
|
// SAFETY: This datastructure does not move the underlying value.
|
||||||
|
let link = unsafe { link.get_unchecked_mut() };
|
||||||
|
|
||||||
if let Some(tail_ref) = unsafe { tail.as_ref() } {
|
if let Some(tail_ref) = unsafe { tail.as_ref() } {
|
||||||
// Queue is not empty
|
// Queue is not empty
|
||||||
link.prev.store(tail, Self::R);
|
link.prev.store(tail, Self::R);
|
||||||
|
@ -221,11 +225,11 @@ mod tests {
|
||||||
let mut i4 = Link::new(13);
|
let mut i4 = Link::new(13);
|
||||||
let mut i5 = Link::new(14);
|
let mut i5 = Link::new(14);
|
||||||
|
|
||||||
wq.push(&mut i1);
|
wq.push(unsafe { Pin::new_unchecked(&mut i1) });
|
||||||
wq.push(&mut i2);
|
wq.push(unsafe { Pin::new_unchecked(&mut i2) });
|
||||||
wq.push(&mut i3);
|
wq.push(unsafe { Pin::new_unchecked(&mut i3) });
|
||||||
wq.push(&mut i4);
|
wq.push(unsafe { Pin::new_unchecked(&mut i4) });
|
||||||
wq.push(&mut i5);
|
wq.push(unsafe { Pin::new_unchecked(&mut i5) });
|
||||||
|
|
||||||
wq.print();
|
wq.print();
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
#![feature(async_fn_in_trait)]
|
#![feature(async_fn_in_trait)]
|
||||||
|
|
||||||
use core::future::{poll_fn, Future};
|
use core::future::{poll_fn, Future};
|
||||||
|
use core::pin::Pin;
|
||||||
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use core::task::{Poll, Waker};
|
use core::task::{Poll, Waker};
|
||||||
use futures_util::{
|
use futures_util::{
|
||||||
|
@ -185,7 +186,10 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut link = None;
|
let mut link_ptr: Option<linked_list::Link<WaitingWaker<Mono>>> = None;
|
||||||
|
|
||||||
|
// Make this future `Drop`-safe, also shadow the original definition so we can't abuse it.
|
||||||
|
let link_ptr = &mut link_ptr as *mut Option<linked_list::Link<WaitingWaker<Mono>>>;
|
||||||
|
|
||||||
let queue = &self.queue;
|
let queue = &self.queue;
|
||||||
let marker = &AtomicUsize::new(0);
|
let marker = &AtomicUsize::new(0);
|
||||||
|
@ -199,6 +203,9 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
|
||||||
return Poll::Ready(());
|
return Poll::Ready(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SAFETY: This pointer is only dereferenced here and on drop of the future
|
||||||
|
// which happens outside this `poll_fn`'s stack frame.
|
||||||
|
let link = unsafe { &mut *link_ptr };
|
||||||
if link.is_none() {
|
if link.is_none() {
|
||||||
let mut link_ref = link.insert(Link::new(WaitingWaker {
|
let mut link_ref = link.insert(Link::new(WaitingWaker {
|
||||||
waker: cx.waker().clone(),
|
waker: cx.waker().clone(),
|
||||||
|
@ -206,7 +213,9 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
|
||||||
was_poped: AtomicBool::new(false),
|
was_poped: AtomicBool::new(false),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let (was_empty, addr) = queue.insert(&mut link_ref);
|
// SAFETY: The address to the link is stable as it is defined outside this stack
|
||||||
|
// frame.
|
||||||
|
let (was_empty, addr) = queue.insert(unsafe { Pin::new_unchecked(&mut link_ref) });
|
||||||
marker.store(addr, Ordering::Relaxed);
|
marker.store(addr, Ordering::Relaxed);
|
||||||
|
|
||||||
if was_empty {
|
if was_empty {
|
||||||
|
@ -219,7 +228,10 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Some(link) = link {
|
// SAFETY: We only run this and dereference the pointer if we have
|
||||||
|
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
|
||||||
|
// of this pointer is in the `poll_fn`.
|
||||||
|
if let Some(link) = unsafe { &mut *link_ptr } {
|
||||||
if link.val.was_poped.load(Ordering::Relaxed) {
|
if link.val.was_poped.load(Ordering::Relaxed) {
|
||||||
// If it was poped from the queue there is no need to run delete
|
// If it was poped from the queue there is no need to run delete
|
||||||
dropper.defuse();
|
dropper.defuse();
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
//! ...
|
//! ...
|
||||||
|
|
||||||
use core::marker::PhantomPinned;
|
use core::marker::PhantomPinned;
|
||||||
|
use core::pin::Pin;
|
||||||
use core::sync::atomic::{AtomicPtr, Ordering};
|
use core::sync::atomic::{AtomicPtr, Ordering};
|
||||||
use critical_section as cs;
|
use critical_section as cs;
|
||||||
|
|
||||||
|
@ -92,8 +93,10 @@ impl<T: PartialOrd + Clone> LinkedList<T> {
|
||||||
|
|
||||||
/// Insert a new link into the linked list.
|
/// Insert a new link into the linked list.
|
||||||
/// The return is (was_empty, address), where the address of the link is for use with `delete`.
|
/// The return is (was_empty, address), where the address of the link is for use with `delete`.
|
||||||
pub fn insert(&self, val: &mut Link<T>) -> (bool, usize) {
|
pub fn insert(&self, val: Pin<&mut Link<T>>) -> (bool, usize) {
|
||||||
cs::with(|_| {
|
cs::with(|_| {
|
||||||
|
// SAFETY: This datastructure does not move the underlying value.
|
||||||
|
let val = unsafe { val.get_unchecked_mut() };
|
||||||
let addr = val as *const _ as usize;
|
let addr = val as *const _ as usize;
|
||||||
|
|
||||||
// Make sure all previous writes are visible
|
// Make sure all previous writes are visible
|
||||||
|
|
Loading…
Reference in a new issue