Move common data structures to rtic-common

This commit is contained in:
Emil Fresk 2023-01-29 20:16:23 +01:00 committed by Henrik Tjäder
parent 58692a35e8
commit e65e532c2a
8 changed files with 53 additions and 11 deletions

View file

@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
heapless = "0.7"
critical-section = "1"
rtic-common = { version = "1.0.0", path = "../rtic-common" }
[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros", "time"] }
@ -15,4 +16,4 @@ tokio = { version = "1", features = ["rt", "macros", "time"] }
[features]
default = []
testing = ["critical-section/std"]
testing = ["critical-section/std", "rtic-common/testing"]

View file

@ -14,11 +14,8 @@ use core::{
task::{Poll, Waker},
};
use heapless::Deque;
use wait_queue::WaitQueue;
use waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
mod wait_queue;
mod waker_registration;
use rtic_common::wait_queue::{Link, WaitQueue};
use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
///
@ -136,11 +133,11 @@ unsafe impl<'a, T, const N: usize> Send for Sender<'a, T, N> {}
/// This is needed to make the async closure in `send` accept that we "share"
/// the link possible between threads.
#[derive(Clone)]
struct LinkPtr(*mut Option<wait_queue::Link<Waker>>);
struct LinkPtr(*mut Option<Link<Waker>>);
impl LinkPtr {
/// This will dereference the pointer stored within and give out an `&mut`.
unsafe fn get(&mut self) -> &mut Option<wait_queue::Link<Waker>> {
unsafe fn get(&mut self) -> &mut Option<Link<Waker>> {
&mut *self.0
}
}
@ -200,10 +197,10 @@ impl<'a, T, const N: usize> Sender<'a, T, N> {
/// Send a value. If there is no place left in the queue this will wait until there is.
/// If the receiver does not exist this will return an error.
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
let mut link_ptr: Option<wait_queue::Link<Waker>> = None;
let mut link_ptr: Option<Link<Waker>> = None;
// Make this future `Drop`-safe, also shadow the original definition so we can't abuse it.
let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<wait_queue::Link<Waker>>);
let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>);
let mut link_ptr2 = link_ptr.clone();
let dropper = OnDrop::new(|| {
@ -236,7 +233,7 @@ impl<'a, T, const N: usize> Sender<'a, T, N> {
}
} else {
// Place the link in the wait queue on first run.
let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone()));
let link_ref = link.insert(Link::new(cx.waker().clone()));
// SAFETY: The address to the link is stable as it is hidden behind
// `link_ptr`, and `link_ptr` shadows the original making it unmovable.

View file

@ -1,268 +0,0 @@
//! ...
use core::marker::PhantomPinned;
use core::pin::Pin;
use core::ptr::null_mut;
use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use core::task::Waker;
use critical_section as cs;
pub type WaitQueue = LinkedList<Waker>;
/// A FIFO linked list for a wait queue.
pub struct LinkedList<T> {
head: AtomicPtr<Link<T>>, // UnsafeCell<*mut Link<T>>
tail: AtomicPtr<Link<T>>,
}
impl<T> LinkedList<T> {
/// Create a new linked list.
pub const fn new() -> Self {
Self {
head: AtomicPtr::new(null_mut()),
tail: AtomicPtr::new(null_mut()),
}
}
}
impl<T: Clone> LinkedList<T> {
const R: Ordering = Ordering::Relaxed;
/// Pop the first element in the queue.
pub fn pop(&self) -> Option<T> {
cs::with(|_| {
// Make sure all previous writes are visible
core::sync::atomic::fence(Ordering::SeqCst);
let head = self.head.load(Self::R);
// SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link
if let Some(head_ref) = unsafe { head.as_ref() } {
// Move head to the next element
self.head.store(head_ref.next.load(Self::R), Self::R);
// We read the value at head
let head_val = head_ref.val.clone();
let tail = self.tail.load(Self::R);
if head == tail {
// The queue is empty
self.tail.store(null_mut(), Self::R);
}
if let Some(next_ref) = unsafe { head_ref.next.load(Self::R).as_ref() } {
next_ref.prev.store(null_mut(), Self::R);
}
// Clear the pointers in the node.
head_ref.next.store(null_mut(), Self::R);
head_ref.prev.store(null_mut(), Self::R);
head_ref.is_poped.store(true, Self::R);
return Some(head_val);
}
None
})
}
/// Put an element at the back of the queue.
pub fn push(&self, link: Pin<&mut Link<T>>) {
cs::with(|_| {
// Make sure all previous writes are visible
core::sync::atomic::fence(Ordering::SeqCst);
let tail = self.tail.load(Self::R);
// SAFETY: This datastructure does not move the underlying value.
let link = unsafe { link.get_unchecked_mut() };
if let Some(tail_ref) = unsafe { tail.as_ref() } {
// Queue is not empty
link.prev.store(tail, Self::R);
self.tail.store(link, Self::R);
tail_ref.next.store(link, Self::R);
} else {
// Queue is empty
self.tail.store(link, Self::R);
self.head.store(link, Self::R);
}
});
}
/// Check if the queue is empty.
pub fn is_empty(&self) -> bool {
self.head.load(Self::R).is_null()
}
}
/// A link in the linked list.
pub struct Link<T> {
pub(crate) val: T,
next: AtomicPtr<Link<T>>,
prev: AtomicPtr<Link<T>>,
is_poped: AtomicBool,
_up: PhantomPinned,
}
impl<T: Clone> Link<T> {
const R: Ordering = Ordering::Relaxed;
/// Create a new link.
pub const fn new(val: T) -> Self {
Self {
val,
next: AtomicPtr::new(null_mut()),
prev: AtomicPtr::new(null_mut()),
is_poped: AtomicBool::new(false),
_up: PhantomPinned,
}
}
pub fn is_poped(&self) -> bool {
self.is_poped.load(Self::R)
}
pub fn remove_from_list(&mut self, list: &LinkedList<T>) {
cs::with(|_| {
// Make sure all previous writes are visible
core::sync::atomic::fence(Ordering::SeqCst);
let prev = self.prev.load(Self::R);
let next = self.next.load(Self::R);
self.is_poped.store(true, Self::R);
match unsafe { (prev.as_ref(), next.as_ref()) } {
(None, None) => {
// Not in the list or alone in the list, check if list head == node address
let sp = self as *const _;
if sp == list.head.load(Ordering::Relaxed) {
list.head.store(null_mut(), Self::R);
list.tail.store(null_mut(), Self::R);
}
}
(None, Some(next_ref)) => {
// First in the list
next_ref.prev.store(null_mut(), Self::R);
list.head.store(next, Self::R);
}
(Some(prev_ref), None) => {
// Last in the list
prev_ref.next.store(null_mut(), Self::R);
list.tail.store(prev, Self::R);
}
(Some(prev_ref), Some(next_ref)) => {
// Somewhere in the list
// Connect the `prev.next` and `next.prev` with each other to remove the node
prev_ref.next.store(next, Self::R);
next_ref.prev.store(prev, Self::R);
}
}
})
}
}
#[cfg(test)]
impl<T: core::fmt::Debug + Clone> LinkedList<T> {
fn print(&self) {
cs::with(|_| {
// Make sure all previous writes are visible
core::sync::atomic::fence(Ordering::SeqCst);
let mut head = self.head.load(Self::R);
let tail = self.tail.load(Self::R);
println!(
"List - h = 0x{:x}, t = 0x{:x}",
head as usize, tail as usize
);
let mut i = 0;
// SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link
while let Some(head_ref) = unsafe { head.as_ref() } {
println!(
" {}: {:?}, s = 0x{:x}, n = 0x{:x}, p = 0x{:x}",
i,
head_ref.val,
head as usize,
head_ref.next.load(Ordering::Relaxed) as usize,
head_ref.prev.load(Ordering::Relaxed) as usize
);
head = head_ref.next.load(Self::R);
i += 1;
}
});
}
}
#[cfg(test)]
impl<T: core::fmt::Debug + Clone> Link<T> {
fn print(&self) {
cs::with(|_| {
// Make sure all previous writes are visible
core::sync::atomic::fence(Ordering::SeqCst);
println!("Link:");
println!(
" val = {:?}, n = 0x{:x}, p = 0x{:x}",
self.val,
self.next.load(Ordering::Relaxed) as usize,
self.prev.load(Ordering::Relaxed) as usize
);
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn linked_list() {
let wq = LinkedList::<u32>::new();
let mut i1 = Link::new(10);
let mut i2 = Link::new(11);
let mut i3 = Link::new(12);
let mut i4 = Link::new(13);
let mut i5 = Link::new(14);
wq.push(unsafe { Pin::new_unchecked(&mut i1) });
wq.push(unsafe { Pin::new_unchecked(&mut i2) });
wq.push(unsafe { Pin::new_unchecked(&mut i3) });
wq.push(unsafe { Pin::new_unchecked(&mut i4) });
wq.push(unsafe { Pin::new_unchecked(&mut i5) });
wq.print();
wq.pop();
i1.print();
wq.print();
i4.remove_from_list(&wq);
wq.print();
// i1.remove_from_list(&wq);
// wq.print();
println!("i2");
i2.remove_from_list(&wq);
wq.print();
println!("i3");
i3.remove_from_list(&wq);
wq.print();
println!("i5");
i5.remove_from_list(&wq);
wq.print();
}
}

View file

@ -1,64 +0,0 @@
use core::cell::UnsafeCell;
use core::task::Waker;
/// A critical section based waker handler.
pub struct CriticalSectionWakerRegistration {
waker: UnsafeCell<Option<Waker>>,
}
unsafe impl Send for CriticalSectionWakerRegistration {}
unsafe impl Sync for CriticalSectionWakerRegistration {}
impl CriticalSectionWakerRegistration {
/// Create a new waker registration.
pub const fn new() -> Self {
Self {
waker: UnsafeCell::new(None),
}
}
/// Register a waker.
/// This will overwrite the previous waker if there was one.
pub fn register(&self, new_waker: &Waker) {
critical_section::with(|_| {
// SAFETY: This access is protected by the critical section.
let self_waker = unsafe { &mut *self.waker.get() };
// From embassy
// https://github.com/embassy-rs/embassy/blob/b99533607ceed225dd12ae73aaa9a0d969a7365e/embassy-sync/src/waitqueue/waker.rs#L59-L61
match self_waker {
// Optimization: If both the old and new Wakers wake the same task, we can simply
// keep the old waker, skipping the clone. (In most executor implementations,
// cloning a waker is somewhat expensive, comparable to cloning an Arc).
Some(ref w2) if (w2.will_wake(new_waker)) => {}
_ => {
// clone the new waker and store it
if let Some(old_waker) = core::mem::replace(self_waker, Some(new_waker.clone()))
{
// We had a waker registered for another task. Wake it, so the other task can
// reregister itself if it's still interested.
//
// If two tasks are waiting on the same thing concurrently, this will cause them
// to wake each other in a loop fighting over this WakerRegistration. This wastes
// CPU but things will still work.
//
// If the user wants to have two tasks waiting on the same thing they should use
// a more appropriate primitive that can store multiple wakers.
old_waker.wake()
}
}
}
});
}
/// Wake the waker.
pub fn wake(&self) {
critical_section::with(|_| {
// SAFETY: This access is protected by the critical section.
let self_waker = unsafe { &mut *self.waker.get() };
if let Some(waker) = self_waker.take() {
waker.wake()
}
});
}
}