diff --git a/rtic-arbiter/.gitignore b/rtic-arbiter/.gitignore new file mode 100644 index 0000000000..1e7caa9ea8 --- /dev/null +++ b/rtic-arbiter/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target/ diff --git a/rtic-arbiter/CHANGELOG.md b/rtic-arbiter/CHANGELOG.md new file mode 100644 index 0000000000..d3a9d846ee --- /dev/null +++ b/rtic-arbiter/CHANGELOG.md @@ -0,0 +1,16 @@ +# Change Log + +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +For each category, *Added*, *Changed*, *Fixed* add new entries at the top! + +## [Unreleased] + +### Added + +### Changed + +### Fixed + +## [v1.0.0] - 2023-xx-xx diff --git a/rtic-arbiter/Cargo.toml b/rtic-arbiter/Cargo.toml new file mode 100644 index 0000000000..b1afaf4516 --- /dev/null +++ b/rtic-arbiter/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rtic-arbiter" +version = "1.0.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +critical-section = "1" +rtic-common = { version = "1.0.0", path = "../rtic-common" } + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "macros", "time"] } + + +[features] +default = [] +testing = ["critical-section/std", "rtic-common/testing"] diff --git a/rtic-arbiter/src/lib.rs b/rtic-arbiter/src/lib.rs new file mode 100644 index 0000000000..487c64cecd --- /dev/null +++ b/rtic-arbiter/src/lib.rs @@ -0,0 +1,175 @@ +//! Crate + +#![no_std] +#![deny(missing_docs)] +//deny_warnings_placeholder_for_ci + +use core::cell::UnsafeCell; +use core::future::poll_fn; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::sync::atomic::{fence, AtomicBool, Ordering}; +use core::task::{Poll, Waker}; + +use rtic_common::dropper::OnDrop; +use rtic_common::wait_queue::{Link, WaitQueue}; + +/// This is needed to make the async closure in `send` accept that we "share" +/// the link possible between threads. +#[derive(Clone)] +struct LinkPtr(*mut Option>); + +impl LinkPtr { + /// This will dereference the pointer stored within and give out an `&mut`. + unsafe fn get(&mut self) -> &mut Option> { + &mut *self.0 + } +} + +unsafe impl Send for LinkPtr {} +unsafe impl Sync for LinkPtr {} + +/// An FIFO waitqueue for use in shared bus usecases. +pub struct Arbiter { + wait_queue: WaitQueue, + inner: UnsafeCell, + taken: AtomicBool, +} + +impl Arbiter { + /// Create a new arbiter. + pub const fn new(inner: T) -> Self { + Self { + wait_queue: WaitQueue::new(), + inner: UnsafeCell::new(inner), + taken: AtomicBool::new(false), + } + } + + /// Get access to the inner value in the `Arbiter`. This will wait until access is granted, + /// for non-blocking access use `try_access`. + pub async fn access(&self) -> ExclusiveAccess<'_, T> { + let mut link_ptr: Option> = None; + + // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. + let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>); + + let mut link_ptr2 = link_ptr.clone(); + let dropper = OnDrop::new(|| { + // SAFETY: We only run this closure and dereference the pointer if we have + // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference + // of this pointer is in the `poll_fn`. + if let Some(link) = unsafe { link_ptr2.get() } { + link.remove_from_list(&self.wait_queue); + } + }); + + poll_fn(|cx| { + critical_section::with(|_| { + fence(Ordering::SeqCst); + + // The queue is empty and noone has taken the value. + if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) { + self.taken.store(true, Ordering::Relaxed); + + return Poll::Ready(()); + } + + // SAFETY: This pointer is only dereferenced here and on drop of the future + // which happens outside this `poll_fn`'s stack frame. + let link = unsafe { link_ptr.get() }; + if let Some(link) = link { + if link.is_poped() { + return Poll::Ready(()); + } + } else { + // Place the link in the wait queue on first run. + let link_ref = link.insert(Link::new(cx.waker().clone())); + + // SAFETY: The address to the link is stable as it is hidden behind + // `link_ptr`, and `link_ptr` shadows the original making it unmovable. + self.wait_queue + .push(unsafe { Pin::new_unchecked(link_ref) }); + } + + Poll::Pending + }) + }) + .await; + + // Make sure the link is removed from the queue. + drop(dropper); + + // SAFETY: One only gets here if there is exlusive access. + ExclusiveAccess { + arbiter: self, + inner: unsafe { &mut *self.inner.get() }, + } + } + + /// Non-blockingly tries to access the underlying value. + /// If someone is in queue to get it, this will return `None`. + pub fn try_access(&self) -> Option> { + critical_section::with(|_| { + fence(Ordering::SeqCst); + + // The queue is empty and noone has taken the value. + if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) { + self.taken.store(true, Ordering::Relaxed); + + // SAFETY: One only gets here if there is exlusive access. + Some(ExclusiveAccess { + arbiter: self, + inner: unsafe { &mut *self.inner.get() }, + }) + } else { + None + } + }) + } +} + +/// This token represents exclusive access to the value protected by the `Arbiter`. +pub struct ExclusiveAccess<'a, T> { + arbiter: &'a Arbiter, + inner: &'a mut T, +} + +impl<'a, T> Drop for ExclusiveAccess<'a, T> { + fn drop(&mut self) { + critical_section::with(|_| { + fence(Ordering::SeqCst); + + if self.arbiter.wait_queue.is_empty() { + // If noone is in queue and we release exclusive access, reset `taken`. + self.arbiter.taken.store(false, Ordering::Relaxed); + } else if let Some(next) = self.arbiter.wait_queue.pop() { + // Wake the next one in queue. + next.wake(); + } + }) + } +} + +impl<'a, T> Deref for ExclusiveAccess<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.inner + } +} + +impl<'a, T> DerefMut for ExclusiveAccess<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.inner + } +} + +#[cfg(test)] +#[macro_use] +extern crate std; + +#[cfg(test)] +mod tests { + // use super::*; +} diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index 2b237f669f..6f816b5755 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -14,8 +14,11 @@ use core::{ task::{Poll, Waker}, }; use heapless::Deque; -use rtic_common::wait_queue::{Link, WaitQueue}; use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration; +use rtic_common::{ + dropper::OnDrop, + wait_queue::{Link, WaitQueue}, +}; /// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue. /// @@ -417,29 +420,6 @@ impl<'a, T, const N: usize> Drop for Receiver<'a, T, N> { } } -struct OnDrop { - f: core::mem::MaybeUninit, -} - -impl OnDrop { - pub fn new(f: F) -> Self { - Self { - f: core::mem::MaybeUninit::new(f), - } - } - - #[allow(unused)] - pub fn defuse(self) { - core::mem::forget(self) - } -} - -impl Drop for OnDrop { - fn drop(&mut self) { - unsafe { self.f.as_ptr().read()() } - } -} - #[cfg(test)] #[macro_use] extern crate std; diff --git a/rtic-common/src/dropper.rs b/rtic-common/src/dropper.rs new file mode 100644 index 0000000000..a4b4d15927 --- /dev/null +++ b/rtic-common/src/dropper.rs @@ -0,0 +1,26 @@ +//! A drop implementation runner. + +/// Runs a closure on drop. +pub struct OnDrop { + f: core::mem::MaybeUninit, +} + +impl OnDrop { + /// Make a new droppper given a closure. + pub fn new(f: F) -> Self { + Self { + f: core::mem::MaybeUninit::new(f), + } + } + + /// Make it not run drop. + pub fn defuse(self) { + core::mem::forget(self) + } +} + +impl Drop for OnDrop { + fn drop(&mut self) { + unsafe { self.f.as_ptr().read()() } + } +} diff --git a/rtic-common/src/lib.rs b/rtic-common/src/lib.rs index 3c75856c8a..b8b5e0d931 100644 --- a/rtic-common/src/lib.rs +++ b/rtic-common/src/lib.rs @@ -4,5 +4,6 @@ #![deny(missing_docs)] //deny_warnings_placeholder_for_ci +pub mod dropper; pub mod wait_queue; pub mod waker_registration;