From f42147948d1393ad7a13ba9a6f45651fbc94af3a Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Sun, 23 Jun 2024 10:33:02 +0200 Subject: [PATCH] Add blocking version of `rtic_sync::arbiter::{i2c,spi}::ArbiterDevice` --- rtic-sync/CHANGELOG.md | 7 ++ rtic-sync/src/arbiter.rs | 194 +---------------------------------- rtic-sync/src/arbiter/i2c.rs | 168 ++++++++++++++++++++++++++++++ rtic-sync/src/arbiter/spi.rs | 164 +++++++++++++++++++++++++++++ 4 files changed, 342 insertions(+), 191 deletions(-) create mode 100644 rtic-sync/src/arbiter/i2c.rs create mode 100644 rtic-sync/src/arbiter/spi.rs diff --git a/rtic-sync/CHANGELOG.md b/rtic-sync/CHANGELOG.md index 54236a22067..f37044ef08a 100644 --- a/rtic-sync/CHANGELOG.md +++ b/rtic-sync/CHANGELOG.md @@ -7,7 +7,14 @@ For each category, _Added_, _Changed_, _Fixed_ add new entries at the top! ## [Unreleased] +### Added + +- Add `arbiter::{i2c, spi}::BlockingArbiterDevice` which allows sharing of `embedded_hal` (non-async) buses. This also helps during initialization of RTIC apps as you can use the arbiter while in `init`. After initialization is complete, convert an `BlockingArbiterDevice` into an `ArbiterDevice` using `BlockingArbiterDevice::into_non_blocking()`. + +### Fixed + - Avoid a critical section when a `send`-link is popped and when returning `free_slot`. + ### Changed - Add `loom` support. diff --git a/rtic-sync/src/arbiter.rs b/rtic-sync/src/arbiter.rs index 60559dffab8..49fa7fafbe9 100644 --- a/rtic-sync/src/arbiter.rs +++ b/rtic-sync/src/arbiter.rs @@ -29,10 +29,12 @@ use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Poll, Waker}; use portable_atomic::{fence, AtomicBool, Ordering}; - use rtic_common::dropper::OnDrop; use rtic_common::wait_queue::{Link, WaitQueue}; +pub mod i2c; +pub mod spi; + /// This is needed to make the async closure in `send` accept that we "share" /// the link possible between threads. #[derive(Clone)] @@ -191,196 +193,6 @@ impl DerefMut for ExclusiveAccess<'_, T> { } } -/// SPI bus sharing using [`Arbiter`] -pub mod spi { - use super::Arbiter; - use embedded_hal::digital::OutputPin; - use embedded_hal_async::{ - delay::DelayNs, - spi::{ErrorType, Operation, SpiBus, SpiDevice}, - }; - use embedded_hal_bus::spi::DeviceError; - - /// [`Arbiter`]-based shared bus implementation. - pub struct ArbiterDevice<'a, BUS, CS, D> { - bus: &'a Arbiter, - cs: CS, - delay: D, - } - - impl<'a, BUS, CS, D> ArbiterDevice<'a, BUS, CS, D> { - /// Create a new [`ArbiterDevice`]. - pub fn new(bus: &'a Arbiter, cs: CS, delay: D) -> Self { - Self { bus, cs, delay } - } - } - - impl ErrorType for ArbiterDevice<'_, BUS, CS, D> - where - BUS: ErrorType, - CS: OutputPin, - { - type Error = DeviceError; - } - - impl SpiDevice for ArbiterDevice<'_, BUS, CS, D> - where - Word: Copy + 'static, - BUS: SpiBus, - CS: OutputPin, - D: DelayNs, - { - async fn transaction( - &mut self, - operations: &mut [Operation<'_, Word>], - ) -> Result<(), DeviceError> { - let mut bus = self.bus.access().await; - - self.cs.set_low().map_err(DeviceError::Cs)?; - - let op_res = 'ops: { - for op in operations { - let res = match op { - Operation::Read(buf) => bus.read(buf).await, - Operation::Write(buf) => bus.write(buf).await, - Operation::Transfer(read, write) => bus.transfer(read, write).await, - Operation::TransferInPlace(buf) => bus.transfer_in_place(buf).await, - Operation::DelayNs(ns) => match bus.flush().await { - Err(e) => Err(e), - Ok(()) => { - self.delay.delay_ns(*ns).await; - Ok(()) - } - }, - }; - if let Err(e) = res { - break 'ops Err(e); - } - } - Ok(()) - }; - - // On failure, it's important to still flush and deassert CS. - let flush_res = bus.flush().await; - let cs_res = self.cs.set_high(); - - op_res.map_err(DeviceError::Spi)?; - flush_res.map_err(DeviceError::Spi)?; - cs_res.map_err(DeviceError::Cs)?; - - Ok(()) - } - } -} - -/// I2C bus sharing using [`Arbiter`] -/// -/// An Example how to use it in RTIC application: -/// ```text -/// #[app(device = some_hal, peripherals = true, dispatchers = [TIM16])] -/// mod app { -/// use core::mem::MaybeUninit; -/// use rtic_sync::{arbiter::{i2c::ArbiterDevice, Arbiter}, -/// -/// #[shared] -/// struct Shared {} -/// -/// #[local] -/// struct Local { -/// ens160: Ens160>>, -/// } -/// -/// #[init(local = [ -/// i2c_arbiter: MaybeUninit>> = MaybeUninit::uninit(), -/// ])] -/// fn init(cx: init::Context) -> (Shared, Local) { -/// let i2c = I2c::new(cx.device.I2C1); -/// let i2c_arbiter = cx.local.i2c_arbiter.write(Arbiter::new(i2c)); -/// let ens160 = Ens160::new(ArbiterDevice::new(i2c_arbiter), 0x52); -/// -/// i2c_sensors::spawn(i2c_arbiter).ok(); -/// -/// (Shared {}, Local { ens160 }) -/// } -/// -/// #[task(local = [ens160])] -/// async fn i2c_sensors(cx: i2c_sensors::Context, i2c: &'static Arbiter>) { -/// use sensor::Asensor; -/// -/// loop { -/// // Use scope to make sure I2C access is dropped. -/// { -/// // Read from sensor driver that wants to use I2C directly. -/// let mut i2c = i2c.access().await; -/// let status = Asensor::status(&mut i2c).await; -/// } -/// -/// // Read ENS160 sensor. -/// let eco2 = cx.local.ens160.eco2().await; -/// } -/// } -/// } -/// ``` -pub mod i2c { - use super::Arbiter; - use embedded_hal::i2c::{AddressMode, ErrorType, Operation}; - use embedded_hal_async::i2c::I2c; - - /// [`Arbiter`]-based shared bus implementation for I2C. - pub struct ArbiterDevice<'a, BUS> { - bus: &'a Arbiter, - } - - impl<'a, BUS> ArbiterDevice<'a, BUS> { - /// Create a new [`ArbiterDevice`] for I2C. - pub fn new(bus: &'a Arbiter) -> Self { - Self { bus } - } - } - - impl ErrorType for ArbiterDevice<'_, BUS> - where - BUS: ErrorType, - { - type Error = BUS::Error; - } - - impl I2c for ArbiterDevice<'_, BUS> - where - BUS: I2c, - A: AddressMode, - { - async fn read(&mut self, address: A, read: &mut [u8]) -> Result<(), Self::Error> { - let mut bus = self.bus.access().await; - bus.read(address, read).await - } - - async fn write(&mut self, address: A, write: &[u8]) -> Result<(), Self::Error> { - let mut bus = self.bus.access().await; - bus.write(address, write).await - } - - async fn write_read( - &mut self, - address: A, - write: &[u8], - read: &mut [u8], - ) -> Result<(), Self::Error> { - let mut bus = self.bus.access().await; - bus.write_read(address, write, read).await - } - - async fn transaction( - &mut self, - address: A, - operations: &mut [Operation<'_>], - ) -> Result<(), Self::Error> { - let mut bus = self.bus.access().await; - bus.transaction(address, operations).await - } - } -} - #[cfg(not(loom))] #[cfg(test)] mod tests { diff --git a/rtic-sync/src/arbiter/i2c.rs b/rtic-sync/src/arbiter/i2c.rs new file mode 100644 index 00000000000..40cefce2ca5 --- /dev/null +++ b/rtic-sync/src/arbiter/i2c.rs @@ -0,0 +1,168 @@ +//! I2C bus sharing using [`Arbiter`] +//! +//! An Example how to use it in RTIC application: +//! ```text +//! #[app(device = some_hal, peripherals = true, dispatchers = [TIM16])] +//! mod app { +//! use core::mem::MaybeUninit; +//! use rtic_sync::{arbiter::{i2c::ArbiterDevice, Arbiter}, +//! +//! #[shared] +//! struct Shared {} +//! +//! #[local] +//! struct Local { +//! ens160: Ens160>>, +//! } +//! +//! #[init(local = [ +//! i2c_arbiter: MaybeUninit>> = MaybeUninit::uninit(), +//! ])] +//! fn init(cx: init::Context) -> (Shared, Local) { +//! let i2c = I2c::new(cx.device.I2C1); +//! let i2c_arbiter = cx.local.i2c_arbiter.write(Arbiter::new(i2c)); +//! let ens160 = Ens160::new(ArbiterDevice::new(i2c_arbiter), 0x52); +//! +//! i2c_sensors::spawn(i2c_arbiter).ok(); +//! +//! (Shared {}, Local { ens160 }) +//! } +//! +//! #[task(local = [ens160])] +//! async fn i2c_sensors(cx: i2c_sensors::Context, i2c: &'static Arbiter>) { +//! use sensor::Asensor; +//! +//! loop { +//! // Use scope to make sure I2C access is dropped. +//! { +//! // Read from sensor driver that wants to use I2C directly. +//! let mut i2c = i2c.access().await; +//! let status = Asensor::status(&mut i2c).await; +//! } +//! +//! // Read ENS160 sensor. +//! let eco2 = cx.local.ens160.eco2().await; +//! } +//! } +//! } +//! ``` + +use super::Arbiter; +use embedded_hal::i2c::{AddressMode, ErrorType, I2c as BlockingI2c, Operation}; +use embedded_hal_async::i2c::I2c as AsyncI2c; + +/// [`Arbiter`]-based shared bus implementation for I2C. +pub struct ArbiterDevice<'a, BUS> { + bus: &'a Arbiter, +} + +impl<'a, BUS> ArbiterDevice<'a, BUS> { + /// Create a new [`ArbiterDevice`] for I2C. + pub fn new(bus: &'a Arbiter) -> Self { + Self { bus } + } +} + +impl ErrorType for ArbiterDevice<'_, BUS> +where + BUS: ErrorType, +{ + type Error = BUS::Error; +} + +impl AsyncI2c for ArbiterDevice<'_, BUS> +where + BUS: AsyncI2c, + A: AddressMode, +{ + async fn read(&mut self, address: A, read: &mut [u8]) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.read(address, read).await + } + + async fn write(&mut self, address: A, write: &[u8]) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.write(address, write).await + } + + async fn write_read( + &mut self, + address: A, + write: &[u8], + read: &mut [u8], + ) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.write_read(address, write, read).await + } + + async fn transaction( + &mut self, + address: A, + operations: &mut [Operation<'_>], + ) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.transaction(address, operations).await + } +} + +/// [`Arbiter`]-based shared bus implementation for I2C. +pub struct BlockingArbiterDevice<'a, BUS> { + bus: &'a Arbiter, +} + +impl<'a, BUS> BlockingArbiterDevice<'a, BUS> { + /// Create a new [`BlockingArbiterDevice`] for I2C. + pub fn new(bus: &'a Arbiter) -> Self { + Self { bus } + } + + /// Create an `ArbiterDevice` from an `BlockingArbiterDevice`. + pub fn into_non_blocking(self) -> ArbiterDevice<'a, BUS> + where + BUS: AsyncI2c, + { + ArbiterDevice { bus: self.bus } + } +} + +impl<'a, BUS> ErrorType for BlockingArbiterDevice<'a, BUS> +where + BUS: ErrorType, +{ + type Error = BUS::Error; +} + +impl<'a, BUS, A> AsyncI2c for BlockingArbiterDevice<'a, BUS> +where + BUS: BlockingI2c, + A: AddressMode, +{ + async fn read(&mut self, address: A, read: &mut [u8]) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.read(address, read) + } + + async fn write(&mut self, address: A, write: &[u8]) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.write(address, write) + } + + async fn write_read( + &mut self, + address: A, + write: &[u8], + read: &mut [u8], + ) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.write_read(address, write, read) + } + + async fn transaction( + &mut self, + address: A, + operations: &mut [Operation<'_>], + ) -> Result<(), Self::Error> { + let mut bus = self.bus.access().await; + bus.transaction(address, operations) + } +} diff --git a/rtic-sync/src/arbiter/spi.rs b/rtic-sync/src/arbiter/spi.rs new file mode 100644 index 00000000000..7ab15edf3b8 --- /dev/null +++ b/rtic-sync/src/arbiter/spi.rs @@ -0,0 +1,164 @@ +//! SPI bus sharing using [`Arbiter`] + +use super::Arbiter; +use embedded_hal::digital::OutputPin; +use embedded_hal::spi::SpiBus as BlockingSpiBus; +use embedded_hal_async::{ + delay::DelayNs, + spi::{ErrorType, Operation, SpiBus as AsyncSpiBus, SpiDevice}, +}; +use embedded_hal_bus::spi::DeviceError; + +/// [`Arbiter`]-based shared bus implementation. +pub struct ArbiterDevice<'a, BUS, CS, D> { + bus: &'a Arbiter, + cs: CS, + delay: D, +} + +impl<'a, BUS, CS, D> ArbiterDevice<'a, BUS, CS, D> { + /// Create a new [`ArbiterDevice`]. + pub fn new(bus: &'a Arbiter, cs: CS, delay: D) -> Self { + Self { bus, cs, delay } + } +} + +impl ErrorType for ArbiterDevice<'_, BUS, CS, D> +where + BUS: ErrorType, + CS: OutputPin, +{ + type Error = DeviceError; +} + +impl SpiDevice for ArbiterDevice<'_, BUS, CS, D> +where + Word: Copy + 'static, + BUS: AsyncSpiBus, + CS: OutputPin, + D: DelayNs, +{ + async fn transaction( + &mut self, + operations: &mut [Operation<'_, Word>], + ) -> Result<(), DeviceError> { + let mut bus = self.bus.access().await; + + self.cs.set_low().map_err(DeviceError::Cs)?; + + let op_res = 'ops: { + for op in operations { + let res = match op { + Operation::Read(buf) => bus.read(buf).await, + Operation::Write(buf) => bus.write(buf).await, + Operation::Transfer(read, write) => bus.transfer(read, write).await, + Operation::TransferInPlace(buf) => bus.transfer_in_place(buf).await, + Operation::DelayNs(ns) => match bus.flush().await { + Err(e) => Err(e), + Ok(()) => { + self.delay.delay_ns(*ns).await; + Ok(()) + } + }, + }; + if let Err(e) = res { + break 'ops Err(e); + } + } + Ok(()) + }; + + // On failure, it's important to still flush and deassert CS. + let flush_res = bus.flush().await; + let cs_res = self.cs.set_high(); + + op_res.map_err(DeviceError::Spi)?; + flush_res.map_err(DeviceError::Spi)?; + cs_res.map_err(DeviceError::Cs)?; + + Ok(()) + } +} + +/// [`Arbiter`]-based shared bus implementation. +pub struct BlockingArbiterDevice<'a, BUS, CS, D> { + bus: &'a Arbiter, + cs: CS, + delay: D, +} + +impl<'a, BUS, CS, D> BlockingArbiterDevice<'a, BUS, CS, D> { + /// Create a new [`BlockingArbiterDevice`]. + pub fn new(bus: &'a Arbiter, cs: CS, delay: D) -> Self { + Self { bus, cs, delay } + } + + /// Create an `ArbiterDevice` from an `BlockingArbiterDevice`. + pub fn into_non_blocking(self) -> ArbiterDevice<'a, BUS, CS, D> + where + BUS: AsyncSpiBus, + { + ArbiterDevice { + bus: self.bus, + cs: self.cs, + delay: self.delay, + } + } +} + +impl<'a, BUS, CS, D> ErrorType for BlockingArbiterDevice<'a, BUS, CS, D> +where + BUS: ErrorType, + CS: OutputPin, +{ + type Error = DeviceError; +} + +impl<'a, Word, BUS, CS, D> SpiDevice for BlockingArbiterDevice<'a, BUS, CS, D> +where + Word: Copy + 'static, + BUS: BlockingSpiBus, + CS: OutputPin, + D: DelayNs, +{ + async fn transaction( + &mut self, + operations: &mut [Operation<'_, Word>], + ) -> Result<(), DeviceError> { + let mut bus = self.bus.access().await; + + self.cs.set_low().map_err(DeviceError::Cs)?; + + let op_res = 'ops: { + for op in operations { + let res = match op { + Operation::Read(buf) => bus.read(buf), + Operation::Write(buf) => bus.write(buf), + Operation::Transfer(read, write) => bus.transfer(read, write), + Operation::TransferInPlace(buf) => bus.transfer_in_place(buf), + Operation::DelayNs(ns) => match bus.flush() { + Err(e) => Err(e), + Ok(()) => { + self.delay.delay_ns(*ns).await; + Ok(()) + } + }, + }; + if let Err(e) = res { + break 'ops Err(e); + } + } + Ok(()) + }; + + // On failure, it's important to still flush and deassert CS. + let flush_res = bus.flush(); + let cs_res = self.cs.set_high(); + + op_res.map_err(DeviceError::Spi)?; + flush_res.map_err(DeviceError::Spi)?; + cs_res.map_err(DeviceError::Cs)?; + + Ok(()) + } +}