mirror of
https://github.com/rtic-rs/rtic.git
synced 2024-11-23 20:22:51 +01:00
Add rtic-arbiter
This commit is contained in:
parent
6c48ebeeee
commit
ac891333f1
7 changed files with 242 additions and 24 deletions
2
rtic-arbiter/.gitignore
vendored
Normal file
2
rtic-arbiter/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
Cargo.lock
|
||||||
|
target/
|
16
rtic-arbiter/CHANGELOG.md
Normal file
16
rtic-arbiter/CHANGELOG.md
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
# Change Log
|
||||||
|
|
||||||
|
All notable changes to this project will be documented in this file.
|
||||||
|
This project adheres to [Semantic Versioning](http://semver.org/).
|
||||||
|
|
||||||
|
For each category, *Added*, *Changed*, *Fixed* add new entries at the top!
|
||||||
|
|
||||||
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
## [v1.0.0] - 2023-xx-xx
|
18
rtic-arbiter/Cargo.toml
Normal file
18
rtic-arbiter/Cargo.toml
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
[package]
|
||||||
|
name = "rtic-arbiter"
|
||||||
|
version = "1.0.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
critical-section = "1"
|
||||||
|
rtic-common = { version = "1.0.0", path = "../rtic-common" }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { version = "1", features = ["rt", "macros", "time"] }
|
||||||
|
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
testing = ["critical-section/std", "rtic-common/testing"]
|
175
rtic-arbiter/src/lib.rs
Normal file
175
rtic-arbiter/src/lib.rs
Normal file
|
@ -0,0 +1,175 @@
|
||||||
|
//! Crate
|
||||||
|
|
||||||
|
#![no_std]
|
||||||
|
#![deny(missing_docs)]
|
||||||
|
//deny_warnings_placeholder_for_ci
|
||||||
|
|
||||||
|
use core::cell::UnsafeCell;
|
||||||
|
use core::future::poll_fn;
|
||||||
|
use core::ops::{Deref, DerefMut};
|
||||||
|
use core::pin::Pin;
|
||||||
|
use core::sync::atomic::{fence, AtomicBool, Ordering};
|
||||||
|
use core::task::{Poll, Waker};
|
||||||
|
|
||||||
|
use rtic_common::dropper::OnDrop;
|
||||||
|
use rtic_common::wait_queue::{Link, WaitQueue};
|
||||||
|
|
||||||
|
/// This is needed to make the async closure in `send` accept that we "share"
|
||||||
|
/// the link possible between threads.
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct LinkPtr(*mut Option<Link<Waker>>);
|
||||||
|
|
||||||
|
impl LinkPtr {
|
||||||
|
/// This will dereference the pointer stored within and give out an `&mut`.
|
||||||
|
unsafe fn get(&mut self) -> &mut Option<Link<Waker>> {
|
||||||
|
&mut *self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for LinkPtr {}
|
||||||
|
unsafe impl Sync for LinkPtr {}
|
||||||
|
|
||||||
|
/// An FIFO waitqueue for use in shared bus usecases.
|
||||||
|
pub struct Arbiter<T> {
|
||||||
|
wait_queue: WaitQueue,
|
||||||
|
inner: UnsafeCell<T>,
|
||||||
|
taken: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Arbiter<T> {
|
||||||
|
/// Create a new arbiter.
|
||||||
|
pub const fn new(inner: T) -> Self {
|
||||||
|
Self {
|
||||||
|
wait_queue: WaitQueue::new(),
|
||||||
|
inner: UnsafeCell::new(inner),
|
||||||
|
taken: AtomicBool::new(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get access to the inner value in the `Arbiter`. This will wait until access is granted,
|
||||||
|
/// for non-blocking access use `try_access`.
|
||||||
|
pub async fn access(&self) -> ExclusiveAccess<'_, T> {
|
||||||
|
let mut link_ptr: Option<Link<Waker>> = None;
|
||||||
|
|
||||||
|
// Make this future `Drop`-safe, also shadow the original definition so we can't abuse it.
|
||||||
|
let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>);
|
||||||
|
|
||||||
|
let mut link_ptr2 = link_ptr.clone();
|
||||||
|
let dropper = OnDrop::new(|| {
|
||||||
|
// SAFETY: We only run this closure and dereference the pointer if we have
|
||||||
|
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
|
||||||
|
// of this pointer is in the `poll_fn`.
|
||||||
|
if let Some(link) = unsafe { link_ptr2.get() } {
|
||||||
|
link.remove_from_list(&self.wait_queue);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
poll_fn(|cx| {
|
||||||
|
critical_section::with(|_| {
|
||||||
|
fence(Ordering::SeqCst);
|
||||||
|
|
||||||
|
// The queue is empty and noone has taken the value.
|
||||||
|
if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) {
|
||||||
|
self.taken.store(true, Ordering::Relaxed);
|
||||||
|
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// SAFETY: This pointer is only dereferenced here and on drop of the future
|
||||||
|
// which happens outside this `poll_fn`'s stack frame.
|
||||||
|
let link = unsafe { link_ptr.get() };
|
||||||
|
if let Some(link) = link {
|
||||||
|
if link.is_poped() {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Place the link in the wait queue on first run.
|
||||||
|
let link_ref = link.insert(Link::new(cx.waker().clone()));
|
||||||
|
|
||||||
|
// SAFETY: The address to the link is stable as it is hidden behind
|
||||||
|
// `link_ptr`, and `link_ptr` shadows the original making it unmovable.
|
||||||
|
self.wait_queue
|
||||||
|
.push(unsafe { Pin::new_unchecked(link_ref) });
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Make sure the link is removed from the queue.
|
||||||
|
drop(dropper);
|
||||||
|
|
||||||
|
// SAFETY: One only gets here if there is exlusive access.
|
||||||
|
ExclusiveAccess {
|
||||||
|
arbiter: self,
|
||||||
|
inner: unsafe { &mut *self.inner.get() },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Non-blockingly tries to access the underlying value.
|
||||||
|
/// If someone is in queue to get it, this will return `None`.
|
||||||
|
pub fn try_access(&self) -> Option<ExclusiveAccess<'_, T>> {
|
||||||
|
critical_section::with(|_| {
|
||||||
|
fence(Ordering::SeqCst);
|
||||||
|
|
||||||
|
// The queue is empty and noone has taken the value.
|
||||||
|
if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) {
|
||||||
|
self.taken.store(true, Ordering::Relaxed);
|
||||||
|
|
||||||
|
// SAFETY: One only gets here if there is exlusive access.
|
||||||
|
Some(ExclusiveAccess {
|
||||||
|
arbiter: self,
|
||||||
|
inner: unsafe { &mut *self.inner.get() },
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This token represents exclusive access to the value protected by the `Arbiter`.
|
||||||
|
pub struct ExclusiveAccess<'a, T> {
|
||||||
|
arbiter: &'a Arbiter<T>,
|
||||||
|
inner: &'a mut T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Drop for ExclusiveAccess<'a, T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
critical_section::with(|_| {
|
||||||
|
fence(Ordering::SeqCst);
|
||||||
|
|
||||||
|
if self.arbiter.wait_queue.is_empty() {
|
||||||
|
// If noone is in queue and we release exclusive access, reset `taken`.
|
||||||
|
self.arbiter.taken.store(false, Ordering::Relaxed);
|
||||||
|
} else if let Some(next) = self.arbiter.wait_queue.pop() {
|
||||||
|
// Wake the next one in queue.
|
||||||
|
next.wake();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Deref for ExclusiveAccess<'a, T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> DerefMut for ExclusiveAccess<'a, T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[macro_use]
|
||||||
|
extern crate std;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
// use super::*;
|
||||||
|
}
|
|
@ -14,8 +14,11 @@ use core::{
|
||||||
task::{Poll, Waker},
|
task::{Poll, Waker},
|
||||||
};
|
};
|
||||||
use heapless::Deque;
|
use heapless::Deque;
|
||||||
use rtic_common::wait_queue::{Link, WaitQueue};
|
|
||||||
use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
|
use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
|
||||||
|
use rtic_common::{
|
||||||
|
dropper::OnDrop,
|
||||||
|
wait_queue::{Link, WaitQueue},
|
||||||
|
};
|
||||||
|
|
||||||
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
|
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
|
||||||
///
|
///
|
||||||
|
@ -417,29 +420,6 @@ impl<'a, T, const N: usize> Drop for Receiver<'a, T, N> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct OnDrop<F: FnOnce()> {
|
|
||||||
f: core::mem::MaybeUninit<F>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F: FnOnce()> OnDrop<F> {
|
|
||||||
pub fn new(f: F) -> Self {
|
|
||||||
Self {
|
|
||||||
f: core::mem::MaybeUninit::new(f),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(unused)]
|
|
||||||
pub fn defuse(self) {
|
|
||||||
core::mem::forget(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F: FnOnce()> Drop for OnDrop<F> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
unsafe { self.f.as_ptr().read()() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate std;
|
extern crate std;
|
||||||
|
|
26
rtic-common/src/dropper.rs
Normal file
26
rtic-common/src/dropper.rs
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
//! A drop implementation runner.
|
||||||
|
|
||||||
|
/// Runs a closure on drop.
|
||||||
|
pub struct OnDrop<F: FnOnce()> {
|
||||||
|
f: core::mem::MaybeUninit<F>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: FnOnce()> OnDrop<F> {
|
||||||
|
/// Make a new droppper given a closure.
|
||||||
|
pub fn new(f: F) -> Self {
|
||||||
|
Self {
|
||||||
|
f: core::mem::MaybeUninit::new(f),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Make it not run drop.
|
||||||
|
pub fn defuse(self) {
|
||||||
|
core::mem::forget(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: FnOnce()> Drop for OnDrop<F> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
unsafe { self.f.as_ptr().read()() }
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,5 +4,6 @@
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
//deny_warnings_placeholder_for_ci
|
//deny_warnings_placeholder_for_ci
|
||||||
|
|
||||||
|
pub mod dropper;
|
||||||
pub mod wait_queue;
|
pub mod wait_queue;
|
||||||
pub mod waker_registration;
|
pub mod waker_registration;
|
||||||
|
|
Loading…
Reference in a new issue