From 1eabb94f0424d7ff85786ad05615da69a379f01d Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Mon, 9 Jan 2023 09:48:39 +0100 Subject: [PATCH] New executor design --- src/export.rs | 68 +--------------------------- src/export/executor.rs | 100 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 67 deletions(-) create mode 100644 src/export/executor.rs diff --git a/src/export.rs b/src/export.rs index 7beaf1631d..6017dcf78f 100644 --- a/src/export.rs +++ b/src/export.rs @@ -8,73 +8,7 @@ pub use cortex_m::{ Peripherals, }; -pub mod executor { - use core::{ - future::Future, - mem, - pin::Pin, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, - }; - - static WAKER_VTABLE: RawWakerVTable = - RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); - - unsafe fn waker_clone(p: *const ()) -> RawWaker { - RawWaker::new(p, &WAKER_VTABLE) - } - - unsafe fn waker_wake(p: *const ()) { - // The only thing we need from a waker is the function to call to pend the async - // dispatcher. - let f: fn() = mem::transmute(p); - f(); - } - - unsafe fn waker_drop(_: *const ()) { - // nop - } - - //============ - // AsyncTaskExecutor - - pub struct AsyncTaskExecutor { - task: Option, - } - - impl AsyncTaskExecutor { - pub const fn new() -> Self { - Self { task: None } - } - - pub fn is_running(&self) -> bool { - self.task.is_some() - } - - pub fn spawn(&mut self, future: F) { - self.task = Some(future); - } - - pub fn poll(&mut self, wake: fn()) -> bool { - if let Some(future) = &mut self.task { - unsafe { - let waker = Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)); - let mut cx = Context::from_waker(&waker); - let future = Pin::new_unchecked(future); - - match future.poll(&mut cx) { - Poll::Ready(_) => { - self.task = None; - true // Only true if we finished now - } - Poll::Pending => false, - } - } - } else { - false - } - } - } -} +pub mod executor; /// Mask is used to store interrupt masks on systems without a BASEPRI register (M0, M0+, M23). /// It needs to be large enough to cover all the relevant interrupts in use. diff --git a/src/export/executor.rs b/src/export/executor.rs new file mode 100644 index 0000000000..874ee192be --- /dev/null +++ b/src/export/executor.rs @@ -0,0 +1,100 @@ +use core::{ + cell::UnsafeCell, + future::Future, + mem::{self, MaybeUninit}, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, +}; + +static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); + +unsafe fn waker_clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &WAKER_VTABLE) +} + +unsafe fn waker_wake(p: *const ()) { + // The only thing we need from a waker is the function to call to pend the async + // dispatcher. + let f: fn() = mem::transmute(p); + f(); +} + +unsafe fn waker_drop(_: *const ()) { + // nop +} + +//============ +// AsyncTaskExecutor + +/// Executor for an async task. +pub struct AsyncTaskExecutor { + // `task` is proteced by the `running` flag. + task: UnsafeCell>, + running: AtomicBool, + pending: AtomicBool, +} + +unsafe impl Sync for AsyncTaskExecutor {} + +impl AsyncTaskExecutor { + /// Create a new executor. + pub const fn new() -> Self { + Self { + task: UnsafeCell::new(MaybeUninit::uninit()), + running: AtomicBool::new(false), + pending: AtomicBool::new(false), + } + } + + /// Check if there is an active task in the executor. + pub fn is_running(&self) -> bool { + self.running.load(Ordering::Relaxed) + } + + /// Checks if a waker has pended the executor. + pub fn is_pending(&self) -> bool { + self.pending.load(Ordering::Relaxed) + } + + // Used by wakers to indicate that the executor needs to run. + pub fn set_pending(&self) { + self.pending.store(true, Ordering::Release); + } + + /// Try to reserve the executor for a future. + /// Used in conjunction with `spawn_unchecked` to reserve the executor before spawning. + /// + /// This could have been joined with `spawn_unchecked` for a complete safe API, however the + /// codegen needs to see if the reserve fails so it can give back input parameters. If spawning + /// was done within the same call the input parameters would be lost and could not be returned. + pub fn try_reserve(&self) -> bool { + self.running + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + } + + /// Spawn a future, only valid to do after `try_reserve` succeeds. + pub unsafe fn spawn_unchecked(&self, future: F) { + debug_assert!(self.running.load(Ordering::Relaxed)); + + self.task.get().write(MaybeUninit::new(future)); + } + + /// Poll the future in the executor. + pub fn poll(&self, wake: fn()) { + if self.is_running() { + let waker = unsafe { Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)) }; + let mut cx = Context::from_waker(&waker); + let future = unsafe { Pin::new_unchecked(&mut *(self.task.get() as *mut F)) }; + + match future.poll(&mut cx) { + Poll::Ready(_) => { + self.running.store(false, Ordering::Release); + } + Poll::Pending => {} + } + } + } +}