From d516d9a214152e7bcecbc14bb3cbd22819039c3e Mon Sep 17 00:00:00 2001 From: AdinAck Date: Wed, 19 Jun 2024 11:52:38 -0700 Subject: [PATCH] Add Signal to rtic-sync (#934) * add signal to rtic-sync * woops update changelog * remove example, too comlicated for a doc TODO: add example to rtic-examples repo * fix @korken89's issues * ...remove fence * fix clippy warnings * add tests --- rtic-sync/CHANGELOG.md | 1 + rtic-sync/Cargo.toml | 3 +- rtic-sync/src/lib.rs | 1 + rtic-sync/src/signal.rs | 195 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 rtic-sync/src/signal.rs diff --git a/rtic-sync/CHANGELOG.md b/rtic-sync/CHANGELOG.md index 135b17bd22..6cd3a36ad9 100644 --- a/rtic-sync/CHANGELOG.md +++ b/rtic-sync/CHANGELOG.md @@ -14,6 +14,7 @@ For each category, _Added_, _Changed_, _Fixed_ add new entries at the top! ### Added - `defmt v0.3` derives added and forwarded to `embedded-hal(-x)` crates. +- signal structure ## v1.2.0 - 2024-01-10 diff --git a/rtic-sync/Cargo.toml b/rtic-sync/Cargo.toml index 6fcf9006d0..6d9358f2d1 100644 --- a/rtic-sync/Cargo.toml +++ b/rtic-sync/Cargo.toml @@ -29,9 +29,10 @@ embedded-hal-bus = { version = "0.1.0", features = ["async"] } defmt-03 = { package = "defmt", version = "0.3", optional = true } [dev-dependencies] +static_cell = "2.1.0" tokio = { version = "1", features = ["rt", "macros", "time"] } [features] default = [] testing = ["critical-section/std", "rtic-common/testing"] -defmt-03 = ["dep:defmt-03", "embedded-hal/defmt-03", "embedded-hal-async/defmt-03", "embedded-hal-bus/defmt-03"] \ No newline at end of file +defmt-03 = ["dep:defmt-03", "embedded-hal/defmt-03", "embedded-hal-async/defmt-03", "embedded-hal-bus/defmt-03"] diff --git a/rtic-sync/src/lib.rs b/rtic-sync/src/lib.rs index 90afff63b3..f8845888ed 100644 --- a/rtic-sync/src/lib.rs +++ b/rtic-sync/src/lib.rs @@ -9,6 +9,7 @@ use defmt_03 as defmt; pub mod arbiter; pub mod channel; pub use portable_atomic; +pub mod signal; #[cfg(test)] #[macro_use] diff --git a/rtic-sync/src/signal.rs b/rtic-sync/src/signal.rs new file mode 100644 index 0000000000..0a26b4c623 --- /dev/null +++ b/rtic-sync/src/signal.rs @@ -0,0 +1,195 @@ +//! A "latest only" value store with unlimited writers and async waiting. + +use core::{cell::UnsafeCell, future::poll_fn, task::Poll}; +use rtic_common::waker_registration::CriticalSectionWakerRegistration; + +/// Basically an Option but for indicating +/// whether the store has been set or not +#[derive(Clone, Copy)] +enum Store { + Set(T), + Unset, +} + +/// A "latest only" value store with unlimited writers and async waiting. +pub struct Signal { + waker: CriticalSectionWakerRegistration, + store: UnsafeCell>, +} + +unsafe impl Send for Signal {} +unsafe impl Sync for Signal {} + +impl Signal { + /// Create a new signal. + pub const fn new() -> Self { + Self { + waker: CriticalSectionWakerRegistration::new(), + store: UnsafeCell::new(Store::Unset), + } + } + + /// Split the signal into a writer and reader. + pub fn split(&self) -> (SignalWriter, SignalReader) { + (SignalWriter { parent: self }, SignalReader { parent: self }) + } +} + +/// Fascilitates the writing of values to a Signal. +#[derive(Clone)] +pub struct SignalWriter<'a, T: Copy> { + parent: &'a Signal, +} + +impl<'a, T: Copy> SignalWriter<'a, T> { + /// Write a raw Store value to the Signal. + fn write_inner(&mut self, value: Store) { + critical_section::with(|_| { + // SAFETY: in a cs: exclusive access + unsafe { self.parent.store.get().replace(value) }; + }); + + self.parent.waker.wake(); + } + + /// Write a value to the Signal. + pub fn write(&mut self, value: T) { + self.write_inner(Store::Set(value)); + } + + /// Clear the stored value in the Signal (if any). + pub fn clear(&mut self) { + self.write_inner(Store::Unset); + } +} + +/// Fascilitates the async reading of values from the Signal. +pub struct SignalReader<'a, T: Copy> { + parent: &'a Signal, +} + +impl<'a, T: Copy> SignalReader<'a, T> { + /// Immediately read and evict the latest value stored in the Signal. + fn take(&mut self) -> Store { + critical_section::with(|_| { + // SAFETY: in a cs: exclusive access + unsafe { self.parent.store.get().replace(Store::Unset) } + }) + } + + /// Returns a pending value if present, or None if no value is available. + /// + /// Upon read, the stored value is evicted. + pub fn try_read(&mut self) -> Option { + match self.take() { + Store::Unset => None, + Store::Set(value) => Some(value), + } + } + + /// Wait for a new value to be written and read it. + /// + /// If a value is already pending it will be returned immediately. + /// + /// Upon read, the stored value is evicted. + pub async fn wait(&mut self) -> T { + poll_fn(|ctx| { + self.parent.waker.register(ctx.waker()); + match self.take() { + Store::Unset => Poll::Pending, + Store::Set(value) => Poll::Ready(value), + } + }) + .await + } + + /// Wait for a new value to be written and read it. + /// + /// If a value is already pending, it will be evicted and a new + /// value must be written for the wait to resolve. + /// + /// Upon read, the stored value is evicted. + pub async fn wait_fresh(&mut self) -> T { + self.take(); + self.wait().await + } +} + +/// Convenience macro for creating a Signal. +#[macro_export] +macro_rules! make_signal { + ( $T:ty ) => {{ + static SIGNAL: Signal<$T> = Signal::new(); + + SIGNAL.split() + }}; +} + +#[cfg(test)] +mod tests { + use static_cell::StaticCell; + + use super::*; + + #[test] + fn empty() { + let (_writer, mut reader) = make_signal!(u32); + + assert!(reader.try_read().is_none()); + } + + #[test] + fn ping_pong() { + let (mut writer, mut reader) = make_signal!(u32); + + writer.write(0xde); + assert!(reader.try_read().is_some_and(|value| value == 0xde)); + } + + #[test] + fn latest() { + let (mut writer, mut reader) = make_signal!(u32); + + writer.write(0xde); + writer.write(0xad); + writer.write(0xbe); + writer.write(0xef); + assert!(reader.try_read().is_some_and(|value| value == 0xef)); + } + + #[test] + fn consumption() { + let (mut writer, mut reader) = make_signal!(u32); + + writer.write(0xaa); + assert!(reader.try_read().is_some_and(|value| value == 0xaa)); + assert!(reader.try_read().is_none()); + } + + #[tokio::test] + async fn pending() { + let (mut writer, mut reader) = make_signal!(u32); + + writer.write(0xaa); + + assert_eq!(reader.wait().await, 0xaa); + } + + #[tokio::test] + async fn waiting() { + static READER: StaticCell> = StaticCell::new(); + let (mut writer, reader) = make_signal!(u32); + + writer.write(0xaa); + + let reader = READER.init(reader); + let handle = tokio::spawn(reader.wait_fresh()); + + tokio::task::yield_now().await; // encourage tokio executor to poll reader future + assert!(!handle.is_finished()); // verify reader future did not resolve after poll + + writer.write(0xab); + + assert!(handle.await.is_ok_and(|value| value == 0xab)); + } +}