mirror of
https://github.com/rtic-rs/rtic.git
synced 2025-12-16 21:05:35 +01:00
feat: wait_until method for waker queue
This commit is contained in:
parent
ff3b011cef
commit
11f1bc60fa
1 changed files with 54 additions and 4 deletions
|
|
@ -1,12 +1,17 @@
|
||||||
//! A wait queue implementation using a doubly linked list.
|
//! A wait queue implementation using a doubly linked list.
|
||||||
|
|
||||||
use core::marker::PhantomPinned;
|
use core::{
|
||||||
use core::pin::Pin;
|
future::poll_fn,
|
||||||
use core::ptr::null_mut;
|
marker::PhantomPinned,
|
||||||
use core::task::Waker;
|
pin::{pin, Pin},
|
||||||
|
ptr::null_mut,
|
||||||
|
task::{Poll, Waker},
|
||||||
|
};
|
||||||
use critical_section as cs;
|
use critical_section as cs;
|
||||||
use portable_atomic::{AtomicBool, AtomicPtr, Ordering};
|
use portable_atomic::{AtomicBool, AtomicPtr, Ordering};
|
||||||
|
|
||||||
|
use crate::dropper::OnDropWith;
|
||||||
|
|
||||||
/// A helper definition of a wait queue.
|
/// A helper definition of a wait queue.
|
||||||
pub type WaitQueue = DoublyLinkedList<Waker>;
|
pub type WaitQueue = DoublyLinkedList<Waker>;
|
||||||
|
|
||||||
|
|
@ -220,6 +225,51 @@ impl<T: core::fmt::Debug + Clone> DoublyLinkedList<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl DoublyLinkedList<Waker> {
|
||||||
|
/// Wait until `f` returns `Some`.
|
||||||
|
pub async fn wait_until<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> T {
|
||||||
|
let link_place = pin!(None::<Link<Waker>>);
|
||||||
|
|
||||||
|
let mut link_guard = OnDropWith::new(link_place, |link| {
|
||||||
|
if let Some(link) = link.as_ref().as_pin_ref() {
|
||||||
|
link.remove_from_list(self);
|
||||||
|
}
|
||||||
|
link.set(None);
|
||||||
|
});
|
||||||
|
|
||||||
|
poll_fn(move |cx| {
|
||||||
|
// clean up the old link, because we are going to invalidate it.
|
||||||
|
// we are doing it before returning `Poll::Ready` to handle cases
|
||||||
|
// where the future is polled after it is completed.
|
||||||
|
link_guard.execute();
|
||||||
|
|
||||||
|
if let Some(val) = f() {
|
||||||
|
return Poll::Ready(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
// note: we may introduce a more complex logic to try to reuse the old link
|
||||||
|
// with the old waker by using `Waker::will_wake` to avoid `Waker::clone`,
|
||||||
|
// but it is probably not needed as Rtic's `waker` is cheap to clone.
|
||||||
|
|
||||||
|
// By the contract, each poll we should update the waker.
|
||||||
|
let new_link = Link::new(cx.waker().clone());
|
||||||
|
|
||||||
|
// Store the link into the pinned place.
|
||||||
|
link_guard.set(Some(new_link));
|
||||||
|
|
||||||
|
let new_link_pinned = link_guard.as_ref().as_pin_ref().expect("We just set it");
|
||||||
|
|
||||||
|
// SAFETY: we guarantee that `link` will live until removed by cleaning it up
|
||||||
|
// in the destructor of the future and that destructor is guaranteed to run
|
||||||
|
// before it's memory is reused or invalidated because the future is pinned.
|
||||||
|
unsafe { self.push(new_link_pinned) };
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<T: core::fmt::Debug + Clone> Link<T> {
|
impl<T: core::fmt::Debug + Clone> Link<T> {
|
||||||
fn print(&self) {
|
fn print(&self) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue