From 11f1bc60fa76a6d8bfaa86cf5f18fe91c5ff0278 Mon Sep 17 00:00:00 2001 From: Oleksandr Babak Date: Thu, 15 May 2025 14:03:47 +0200 Subject: [PATCH] feat: `wait_until` method for waker queue --- rtic-common/src/wait_queue.rs | 58 ++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/rtic-common/src/wait_queue.rs b/rtic-common/src/wait_queue.rs index 0f3a59d1542..de27710aeac 100644 --- a/rtic-common/src/wait_queue.rs +++ b/rtic-common/src/wait_queue.rs @@ -1,12 +1,17 @@ //! A wait queue implementation using a doubly linked list. -use core::marker::PhantomPinned; -use core::pin::Pin; -use core::ptr::null_mut; -use core::task::Waker; +use core::{ + future::poll_fn, + marker::PhantomPinned, + pin::{pin, Pin}, + ptr::null_mut, + task::{Poll, Waker}, +}; use critical_section as cs; use portable_atomic::{AtomicBool, AtomicPtr, Ordering}; +use crate::dropper::OnDropWith; + /// A helper definition of a wait queue. pub type WaitQueue = DoublyLinkedList; @@ -220,6 +225,51 @@ impl DoublyLinkedList { } } +impl DoublyLinkedList { + /// Wait until `f` returns `Some`. + pub async fn wait_until Option>(&self, mut f: F) -> T { + let link_place = pin!(None::>); + + let mut link_guard = OnDropWith::new(link_place, |link| { + if let Some(link) = link.as_ref().as_pin_ref() { + link.remove_from_list(self); + } + link.set(None); + }); + + poll_fn(move |cx| { + // clean up the old link, because we are going to invalidate it. + // we are doing it before returning `Poll::Ready` to handle cases + // where the future is polled after it is completed. + link_guard.execute(); + + if let Some(val) = f() { + return Poll::Ready(val); + } + + // note: we may introduce a more complex logic to try to reuse the old link + // with the old waker by using `Waker::will_wake` to avoid `Waker::clone`, + // but it is probably not needed as Rtic's `waker` is cheap to clone. + + // By the contract, each poll we should update the waker. + let new_link = Link::new(cx.waker().clone()); + + // Store the link into the pinned place. + link_guard.set(Some(new_link)); + + let new_link_pinned = link_guard.as_ref().as_pin_ref().expect("We just set it"); + + // SAFETY: we guarantee that `link` will live until removed by cleaning it up + // in the destructor of the future and that destructor is guaranteed to run + // before it's memory is reused or invalidated because the future is pinned. + unsafe { self.push(new_link_pinned) }; + + Poll::Pending + }) + .await + } +} + #[cfg(test)] impl Link { fn print(&self) {