mirror of
https://github.com/rtic-rs/rtic.git
synced 2024-11-23 12:12:50 +01:00
Change timeout
/delay
functions to non-async (#910)
Because of a compiler bug, the `async` implementations of `delay`/`delay_until`/`timeout`/`timeout_at` produce much larger RAM footprint than they should. Fixes #890. Co-authored-by: Emil Fresk <emil.fresk@gmail.com>
This commit is contained in:
parent
2c85ee4620
commit
a636f4e4ad
2 changed files with 117 additions and 106 deletions
|
@ -7,6 +7,10 @@ For each category, *Added*, *Changed*, *Fixed* add new entries at the top!
|
||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Replace `async` implementations of `delay`/`delay_until`/`timeout`/`timeout_at` with structs to reduce memory usage.
|
||||||
|
|
||||||
## v2.0.0 - 2024-05-29
|
## v2.0.0 - 2024-05-29
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
|
@ -3,15 +3,10 @@
|
||||||
use crate::linked_list::{self, Link, LinkedList};
|
use crate::linked_list::{self, Link, LinkedList};
|
||||||
use crate::TimeoutError;
|
use crate::TimeoutError;
|
||||||
|
|
||||||
use core::future::{poll_fn, Future};
|
use core::future::Future;
|
||||||
use core::pin::Pin;
|
use core::pin::Pin;
|
||||||
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use core::task::{Poll, Waker};
|
use core::task::{Poll, Waker};
|
||||||
use futures_util::{
|
|
||||||
future::{select, Either},
|
|
||||||
pin_mut,
|
|
||||||
};
|
|
||||||
use rtic_common::dropper::OnDrop;
|
|
||||||
|
|
||||||
mod backend;
|
mod backend;
|
||||||
mod tick_type;
|
mod tick_type;
|
||||||
|
@ -67,26 +62,6 @@ pub struct TimerQueue<Backend: TimerQueueBackend> {
|
||||||
initialized: AtomicBool,
|
initialized: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is needed to make the async closure in `delay_until` accept that we "share"
|
|
||||||
/// the link possible between threads.
|
|
||||||
struct LinkPtr<Backend: TimerQueueBackend>(*mut Option<linked_list::Link<WaitingWaker<Backend>>>);
|
|
||||||
|
|
||||||
impl<Backend: TimerQueueBackend> Clone for LinkPtr<Backend> {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
LinkPtr(self.0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Backend: TimerQueueBackend> LinkPtr<Backend> {
|
|
||||||
/// This will dereference the pointer stored within and give out an `&mut`.
|
|
||||||
unsafe fn get(&mut self) -> &mut Option<linked_list::Link<WaitingWaker<Backend>>> {
|
|
||||||
&mut *self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<Backend: TimerQueueBackend> Send for LinkPtr<Backend> {}
|
|
||||||
unsafe impl<Backend: TimerQueueBackend> Sync for LinkPtr<Backend> {}
|
|
||||||
|
|
||||||
impl<Backend: TimerQueueBackend> Default for TimerQueue<Backend> {
|
impl<Backend: TimerQueueBackend> Default for TimerQueue<Backend> {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self::new()
|
Self::new()
|
||||||
|
@ -112,7 +87,7 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
|
||||||
pub fn initialize(&self, backend: Backend) {
|
pub fn initialize(&self, backend: Backend) {
|
||||||
self.initialized.store(true, Ordering::SeqCst);
|
self.initialized.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
// Don't run drop on `Mono`
|
// Don't run drop on `Backend`
|
||||||
core::mem::forget(backend);
|
core::mem::forget(backend);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,29 +139,29 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Timeout at a specific time.
|
/// Timeout at a specific time.
|
||||||
pub async fn timeout_at<F: Future>(
|
pub fn timeout_at<F: Future>(
|
||||||
&self,
|
&self,
|
||||||
instant: Backend::Ticks,
|
instant: Backend::Ticks,
|
||||||
future: F,
|
future: F,
|
||||||
) -> Result<F::Output, TimeoutError> {
|
) -> Timeout<'_, Backend, F> {
|
||||||
let delay = self.delay_until(instant);
|
Timeout {
|
||||||
|
delay: Delay::<Backend> {
|
||||||
pin_mut!(future);
|
instant,
|
||||||
pin_mut!(delay);
|
queue: &self.queue,
|
||||||
|
link_ptr: None,
|
||||||
match select(future, delay).await {
|
marker: AtomicUsize::new(0),
|
||||||
Either::Left((r, _)) => Ok(r),
|
},
|
||||||
Either::Right(_) => Err(TimeoutError),
|
future,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Timeout after at least a specific duration.
|
/// Timeout after at least a specific duration.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn timeout_after<F: Future>(
|
pub fn timeout_after<F: Future>(
|
||||||
&self,
|
&self,
|
||||||
duration: Backend::Ticks,
|
duration: Backend::Ticks,
|
||||||
future: F,
|
future: F,
|
||||||
) -> Result<F::Output, TimeoutError> {
|
) -> Timeout<'_, Backend, F> {
|
||||||
let now = Backend::now();
|
let now = Backend::now();
|
||||||
let mut timeout = now.wrapping_add(duration);
|
let mut timeout = now.wrapping_add(duration);
|
||||||
if now != timeout {
|
if now != timeout {
|
||||||
|
@ -195,12 +170,12 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
|
||||||
|
|
||||||
// Wait for one period longer, because by definition timers have an uncertainty
|
// Wait for one period longer, because by definition timers have an uncertainty
|
||||||
// of one period, so waiting for 'at least' needs to compensate for that.
|
// of one period, so waiting for 'at least' needs to compensate for that.
|
||||||
self.timeout_at(timeout, future).await
|
self.timeout_at(timeout, future)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delay for at least some duration of time.
|
/// Delay for at least some duration of time.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn delay(&self, duration: Backend::Ticks) {
|
pub fn delay(&self, duration: Backend::Ticks) -> Delay<'_, Backend> {
|
||||||
let now = Backend::now();
|
let now = Backend::now();
|
||||||
let mut timeout = now.wrapping_add(duration);
|
let mut timeout = now.wrapping_add(duration);
|
||||||
if now != timeout {
|
if now != timeout {
|
||||||
|
@ -209,79 +184,111 @@ impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
|
||||||
|
|
||||||
// Wait for one period longer, because by definition timers have an uncertainty
|
// Wait for one period longer, because by definition timers have an uncertainty
|
||||||
// of one period, so waiting for 'at least' needs to compensate for that.
|
// of one period, so waiting for 'at least' needs to compensate for that.
|
||||||
self.delay_until(timeout).await;
|
self.delay_until(timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delay to some specific time instant.
|
/// Delay to some specific time instant.
|
||||||
pub async fn delay_until(&self, instant: Backend::Ticks) {
|
pub fn delay_until(&self, instant: Backend::Ticks) -> Delay<'_, Backend> {
|
||||||
if !self.initialized.load(Ordering::Relaxed) {
|
if !self.initialized.load(Ordering::Relaxed) {
|
||||||
panic!(
|
panic!(
|
||||||
"The timer queue is not initialized with a monotonic, you need to run `initialize`"
|
"The timer queue is not initialized with a monotonic, you need to run `initialize`"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Delay::<Backend> {
|
||||||
let mut link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>> = None;
|
instant,
|
||||||
|
queue: &self.queue,
|
||||||
// Make this future `Drop`-safe
|
link_ptr: None,
|
||||||
// SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
|
marker: AtomicUsize::new(0),
|
||||||
let mut link_ptr =
|
|
||||||
LinkPtr(&mut link_ptr as *mut Option<linked_list::Link<WaitingWaker<Backend>>>);
|
|
||||||
let mut link_ptr2 = link_ptr.clone();
|
|
||||||
|
|
||||||
let queue = &self.queue;
|
|
||||||
let marker = &AtomicUsize::new(0);
|
|
||||||
|
|
||||||
let dropper = OnDrop::new(|| {
|
|
||||||
queue.delete(marker.load(Ordering::Relaxed));
|
|
||||||
});
|
|
||||||
|
|
||||||
poll_fn(|cx| {
|
|
||||||
if Backend::now().is_at_least(instant) {
|
|
||||||
return Poll::Ready(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// SAFETY: This pointer is only dereferenced here and on drop of the future
|
|
||||||
// which happens outside this `poll_fn`'s stack frame, so this mutable access cannot
|
|
||||||
// happen at the same time as `dropper` runs.
|
|
||||||
let link = unsafe { link_ptr2.get() };
|
|
||||||
if link.is_none() {
|
|
||||||
let link_ref = link.insert(Link::new(WaitingWaker {
|
|
||||||
waker: cx.waker().clone(),
|
|
||||||
release_at: instant,
|
|
||||||
was_popped: AtomicBool::new(false),
|
|
||||||
}));
|
|
||||||
|
|
||||||
// SAFETY(new_unchecked): The address to the link is stable as it is defined
|
|
||||||
//outside this stack frame.
|
|
||||||
// SAFETY(insert): `link_ref` lifetime comes from `link_ptr` that is shadowed, and
|
|
||||||
// we make sure in `dropper` that the link is removed from the queue before
|
|
||||||
// dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives
|
|
||||||
// until the end of the stack frame.
|
|
||||||
let (head_updated, addr) = unsafe { queue.insert(Pin::new_unchecked(link_ref)) };
|
|
||||||
|
|
||||||
marker.store(addr, Ordering::Relaxed);
|
|
||||||
|
|
||||||
if head_updated {
|
|
||||||
// Pend the monotonic handler if the queue head was updated.
|
|
||||||
Backend::pend_interrupt()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Pending
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// SAFETY: We only run this and dereference the pointer if we have
|
|
||||||
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
|
|
||||||
// of this pointer is in the `poll_fn`.
|
|
||||||
if let Some(link) = unsafe { link_ptr.get() } {
|
|
||||||
if link.val.was_popped.load(Ordering::Relaxed) {
|
|
||||||
// If it was popped from the queue there is no need to run delete
|
|
||||||
dropper.defuse();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Make sure that our link is deleted from the list before we drop this stack
|
|
||||||
drop(dropper);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Future returned by `delay` and `delay_until`.
|
||||||
|
pub struct Delay<'q, Backend: TimerQueueBackend> {
|
||||||
|
instant: Backend::Ticks,
|
||||||
|
queue: &'q LinkedList<WaitingWaker<Backend>>,
|
||||||
|
link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>>,
|
||||||
|
marker: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'q, Backend: TimerQueueBackend> Future for Delay<'q, Backend> {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
|
||||||
|
// SAFETY: We ensure we never move anything out of this.
|
||||||
|
let this = unsafe { self.get_unchecked_mut() };
|
||||||
|
|
||||||
|
if Backend::now().is_at_least(this.instant) {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only
|
||||||
|
// in `drop` we can't do this access concurrently with queue removal.
|
||||||
|
let link = &mut this.link_ptr;
|
||||||
|
if link.is_none() {
|
||||||
|
let link_ref = link.insert(Link::new(WaitingWaker {
|
||||||
|
waker: cx.waker().clone(),
|
||||||
|
release_at: this.instant,
|
||||||
|
was_popped: AtomicBool::new(false),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// SAFETY(new_unchecked): The address to the link is stable as it is defined
|
||||||
|
// outside this stack frame.
|
||||||
|
// SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by
|
||||||
|
// the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the
|
||||||
|
// queue on drop, which happens before the struct and thus `link_ptr` goes out of
|
||||||
|
// scope.
|
||||||
|
let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) };
|
||||||
|
this.marker.store(addr, Ordering::Relaxed);
|
||||||
|
if head_updated {
|
||||||
|
Backend::pend_interrupt()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'q, Backend: TimerQueueBackend> Drop for Delay<'q, Backend> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// SAFETY: Drop cannot be run at the same time as poll, so we can't end up
|
||||||
|
// derefencing this concurrently to the one in `poll`.
|
||||||
|
match self.link_ptr.as_ref() {
|
||||||
|
None => return,
|
||||||
|
// If it was popped from the queue there is no need to run delete
|
||||||
|
Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
self.queue.delete(self.marker.load(Ordering::Relaxed));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future returned by `timeout` and `timeout_at`.
|
||||||
|
pub struct Timeout<'q, Backend: TimerQueueBackend, F> {
|
||||||
|
delay: Delay<'q, Backend>,
|
||||||
|
future: F,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'q, Backend: TimerQueueBackend, F: Future> Future for Timeout<'q, Backend, F> {
|
||||||
|
type Output = Result<F::Output, TimeoutError>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let inner = unsafe { self.get_unchecked_mut() };
|
||||||
|
|
||||||
|
{
|
||||||
|
let f = unsafe { Pin::new_unchecked(&mut inner.future) };
|
||||||
|
if let Poll::Ready(v) = f.poll(cx) {
|
||||||
|
return Poll::Ready(Ok(v));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let d = unsafe { Pin::new_unchecked(&mut inner.delay) };
|
||||||
|
if d.poll(cx).is_ready() {
|
||||||
|
return Poll::Ready(Err(TimeoutError));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue