diff --git a/rtic-arbiter/.gitignore b/rtic-arbiter/.gitignore
new file mode 100644
index 0000000000..1e7caa9ea8
--- /dev/null
+++ b/rtic-arbiter/.gitignore
@@ -0,0 +1,2 @@
+Cargo.lock
+target/
diff --git a/rtic-arbiter/CHANGELOG.md b/rtic-arbiter/CHANGELOG.md
new file mode 100644
index 0000000000..d3a9d846ee
--- /dev/null
+++ b/rtic-arbiter/CHANGELOG.md
@@ -0,0 +1,16 @@
+# Change Log
+
+All notable changes to this project will be documented in this file.
+This project adheres to [Semantic Versioning](http://semver.org/).
+
+For each category, *Added*, *Changed*, *Fixed* add new entries at the top!
+
+## [Unreleased]
+
+### Added
+
+### Changed
+
+### Fixed
+
+## [v1.0.0] - 2023-xx-xx
diff --git a/rtic-arbiter/Cargo.toml b/rtic-arbiter/Cargo.toml
new file mode 100644
index 0000000000..b1afaf4516
--- /dev/null
+++ b/rtic-arbiter/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "rtic-arbiter"
+version = "1.0.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+critical-section = "1"
+rtic-common = { version = "1.0.0", path = "../rtic-common" }
+
+[dev-dependencies]
+tokio = { version = "1", features = ["rt", "macros", "time"] }
+
+
+[features]
+default = []
+testing = ["critical-section/std", "rtic-common/testing"]
diff --git a/rtic-arbiter/src/lib.rs b/rtic-arbiter/src/lib.rs
new file mode 100644
index 0000000000..487c64cecd
--- /dev/null
+++ b/rtic-arbiter/src/lib.rs
@@ -0,0 +1,175 @@
+//! Crate
+
+#![no_std]
+#![deny(missing_docs)]
+//deny_warnings_placeholder_for_ci
+
+use core::cell::UnsafeCell;
+use core::future::poll_fn;
+use core::ops::{Deref, DerefMut};
+use core::pin::Pin;
+use core::sync::atomic::{fence, AtomicBool, Ordering};
+use core::task::{Poll, Waker};
+
+use rtic_common::dropper::OnDrop;
+use rtic_common::wait_queue::{Link, WaitQueue};
+
+/// This is needed to make the async closure in `send` accept that we "share"
+/// the link possible between threads.
+#[derive(Clone)]
+struct LinkPtr(*mut Option>);
+
+impl LinkPtr {
+ /// This will dereference the pointer stored within and give out an `&mut`.
+ unsafe fn get(&mut self) -> &mut Option> {
+ &mut *self.0
+ }
+}
+
+unsafe impl Send for LinkPtr {}
+unsafe impl Sync for LinkPtr {}
+
+/// An FIFO waitqueue for use in shared bus usecases.
+pub struct Arbiter {
+ wait_queue: WaitQueue,
+ inner: UnsafeCell,
+ taken: AtomicBool,
+}
+
+impl Arbiter {
+ /// Create a new arbiter.
+ pub const fn new(inner: T) -> Self {
+ Self {
+ wait_queue: WaitQueue::new(),
+ inner: UnsafeCell::new(inner),
+ taken: AtomicBool::new(false),
+ }
+ }
+
+ /// Get access to the inner value in the `Arbiter`. This will wait until access is granted,
+ /// for non-blocking access use `try_access`.
+ pub async fn access(&self) -> ExclusiveAccess<'_, T> {
+ let mut link_ptr: Option> = None;
+
+ // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it.
+ let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>);
+
+ let mut link_ptr2 = link_ptr.clone();
+ let dropper = OnDrop::new(|| {
+ // SAFETY: We only run this closure and dereference the pointer if we have
+ // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
+ // of this pointer is in the `poll_fn`.
+ if let Some(link) = unsafe { link_ptr2.get() } {
+ link.remove_from_list(&self.wait_queue);
+ }
+ });
+
+ poll_fn(|cx| {
+ critical_section::with(|_| {
+ fence(Ordering::SeqCst);
+
+ // The queue is empty and noone has taken the value.
+ if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) {
+ self.taken.store(true, Ordering::Relaxed);
+
+ return Poll::Ready(());
+ }
+
+ // SAFETY: This pointer is only dereferenced here and on drop of the future
+ // which happens outside this `poll_fn`'s stack frame.
+ let link = unsafe { link_ptr.get() };
+ if let Some(link) = link {
+ if link.is_poped() {
+ return Poll::Ready(());
+ }
+ } else {
+ // Place the link in the wait queue on first run.
+ let link_ref = link.insert(Link::new(cx.waker().clone()));
+
+ // SAFETY: The address to the link is stable as it is hidden behind
+ // `link_ptr`, and `link_ptr` shadows the original making it unmovable.
+ self.wait_queue
+ .push(unsafe { Pin::new_unchecked(link_ref) });
+ }
+
+ Poll::Pending
+ })
+ })
+ .await;
+
+ // Make sure the link is removed from the queue.
+ drop(dropper);
+
+ // SAFETY: One only gets here if there is exlusive access.
+ ExclusiveAccess {
+ arbiter: self,
+ inner: unsafe { &mut *self.inner.get() },
+ }
+ }
+
+ /// Non-blockingly tries to access the underlying value.
+ /// If someone is in queue to get it, this will return `None`.
+ pub fn try_access(&self) -> Option> {
+ critical_section::with(|_| {
+ fence(Ordering::SeqCst);
+
+ // The queue is empty and noone has taken the value.
+ if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) {
+ self.taken.store(true, Ordering::Relaxed);
+
+ // SAFETY: One only gets here if there is exlusive access.
+ Some(ExclusiveAccess {
+ arbiter: self,
+ inner: unsafe { &mut *self.inner.get() },
+ })
+ } else {
+ None
+ }
+ })
+ }
+}
+
+/// This token represents exclusive access to the value protected by the `Arbiter`.
+pub struct ExclusiveAccess<'a, T> {
+ arbiter: &'a Arbiter,
+ inner: &'a mut T,
+}
+
+impl<'a, T> Drop for ExclusiveAccess<'a, T> {
+ fn drop(&mut self) {
+ critical_section::with(|_| {
+ fence(Ordering::SeqCst);
+
+ if self.arbiter.wait_queue.is_empty() {
+ // If noone is in queue and we release exclusive access, reset `taken`.
+ self.arbiter.taken.store(false, Ordering::Relaxed);
+ } else if let Some(next) = self.arbiter.wait_queue.pop() {
+ // Wake the next one in queue.
+ next.wake();
+ }
+ })
+ }
+}
+
+impl<'a, T> Deref for ExclusiveAccess<'a, T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ self.inner
+ }
+}
+
+impl<'a, T> DerefMut for ExclusiveAccess<'a, T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.inner
+ }
+}
+
+#[cfg(test)]
+#[macro_use]
+extern crate std;
+
+#[cfg(test)]
+mod tests {
+ // use super::*;
+}
diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs
index 2b237f669f..6f816b5755 100644
--- a/rtic-channel/src/lib.rs
+++ b/rtic-channel/src/lib.rs
@@ -14,8 +14,11 @@ use core::{
task::{Poll, Waker},
};
use heapless::Deque;
-use rtic_common::wait_queue::{Link, WaitQueue};
use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
+use rtic_common::{
+ dropper::OnDrop,
+ wait_queue::{Link, WaitQueue},
+};
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
///
@@ -417,29 +420,6 @@ impl<'a, T, const N: usize> Drop for Receiver<'a, T, N> {
}
}
-struct OnDrop {
- f: core::mem::MaybeUninit,
-}
-
-impl OnDrop {
- pub fn new(f: F) -> Self {
- Self {
- f: core::mem::MaybeUninit::new(f),
- }
- }
-
- #[allow(unused)]
- pub fn defuse(self) {
- core::mem::forget(self)
- }
-}
-
-impl Drop for OnDrop {
- fn drop(&mut self) {
- unsafe { self.f.as_ptr().read()() }
- }
-}
-
#[cfg(test)]
#[macro_use]
extern crate std;
diff --git a/rtic-common/src/dropper.rs b/rtic-common/src/dropper.rs
new file mode 100644
index 0000000000..a4b4d15927
--- /dev/null
+++ b/rtic-common/src/dropper.rs
@@ -0,0 +1,26 @@
+//! A drop implementation runner.
+
+/// Runs a closure on drop.
+pub struct OnDrop {
+ f: core::mem::MaybeUninit,
+}
+
+impl OnDrop {
+ /// Make a new droppper given a closure.
+ pub fn new(f: F) -> Self {
+ Self {
+ f: core::mem::MaybeUninit::new(f),
+ }
+ }
+
+ /// Make it not run drop.
+ pub fn defuse(self) {
+ core::mem::forget(self)
+ }
+}
+
+impl Drop for OnDrop {
+ fn drop(&mut self) {
+ unsafe { self.f.as_ptr().read()() }
+ }
+}
diff --git a/rtic-common/src/lib.rs b/rtic-common/src/lib.rs
index 3c75856c8a..b8b5e0d931 100644
--- a/rtic-common/src/lib.rs
+++ b/rtic-common/src/lib.rs
@@ -4,5 +4,6 @@
#![deny(missing_docs)]
//deny_warnings_placeholder_for_ci
+pub mod dropper;
pub mod wait_queue;
pub mod waker_registration;