From 5688a5d332cdaffaca64ade5b138a3676ac7cd32 Mon Sep 17 00:00:00 2001 From: Per Lindgren Date: Thu, 12 Jan 2023 08:50:12 +0100 Subject: [PATCH] executor update for less unsafe and more clear --- macros/src/codegen/async_dispatchers.rs | 11 +++--- macros/src/codegen/module.rs | 8 ++--- src/export/executor.rs | 45 ++++++++++++++----------- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/macros/src/codegen/async_dispatchers.rs b/macros/src/codegen/async_dispatchers.rs index 012bd61a36..a12ad325fc 100644 --- a/macros/src/codegen/async_dispatchers.rs +++ b/macros/src/codegen/async_dispatchers.rs @@ -44,16 +44,15 @@ pub fn codegen(app: &App, analysis: &Analysis) -> TokenStream2 { for name in channel.tasks.iter() { let exec_name = util::internal_task_ident(name, "EXEC"); + // TODO: Fix cfg // let task = &app.software_tasks[name]; // let cfgs = &task.cfgs; stmts.push(quote!( - if #exec_name.check_and_clear_pending() { - #exec_name.poll(|| { - #exec_name.set_pending(); - #pend_interrupt - }); - } + #exec_name.poll(|| { + #exec_name.set_pending(); + #pend_interrupt + }); )); } diff --git a/macros/src/codegen/module.rs b/macros/src/codegen/module.rs index 666bd0420a..f4c188a466 100644 --- a/macros/src/codegen/module.rs +++ b/macros/src/codegen/module.rs @@ -156,11 +156,8 @@ pub fn codegen(ctxt: Context, app: &App, analysis: &Analysis) -> TokenStream2 { #[allow(non_snake_case)] #[doc(hidden)] pub fn #internal_spawn_ident(#(#input_args,)*) -> Result<(), #input_ty> { - if #exec_name.try_reserve() { - // This unsafe is protected by `try_reserve`, see its documentation for details - unsafe { - #exec_name.spawn_unchecked(#name(#name::Context::new() #(,#input_untupled)*)); - } + + if #exec_name.spawn(|| #name(unsafe { #name::Context::new() } #(,#input_untupled)*) ) { #pend_interrupt @@ -168,6 +165,7 @@ pub fn codegen(ctxt: Context, app: &App, analysis: &Analysis) -> TokenStream2 { } else { Err(#input_tupled) } + } )); diff --git a/src/export/executor.rs b/src/export/executor.rs index 2f88eff968..e64cc43ec1 100644 --- a/src/export/executor.rs +++ b/src/export/executor.rs @@ -30,7 +30,7 @@ unsafe fn waker_drop(_: *const ()) { /// Executor for an async task. pub struct AsyncTaskExecutor { - // `task` is proteced by the `running` flag. + // `task` is protected by the `running` flag. task: UnsafeCell>, running: AtomicBool, pending: AtomicBool, @@ -40,6 +40,7 @@ unsafe impl Sync for AsyncTaskExecutor {} impl AsyncTaskExecutor { /// Create a new executor. + #[inline(always)] pub const fn new() -> Self { Self { task: UnsafeCell::new(MaybeUninit::uninit()), @@ -49,45 +50,51 @@ impl AsyncTaskExecutor { } /// Check if there is an active task in the executor. + #[inline(always)] pub fn is_running(&self) -> bool { self.running.load(Ordering::Relaxed) } /// Checks if a waker has pended the executor and simultaneously clears the flag. - pub fn check_and_clear_pending(&self) -> bool { + #[inline(always)] + fn check_and_clear_pending(&self) -> bool { + // Ordering::Acquire to enforce that update of task is visible to poll self.pending - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed) .is_ok() } // Used by wakers to indicate that the executor needs to run. + #[inline(always)] pub fn set_pending(&self) { self.pending.store(true, Ordering::Release); } - /// Try to reserve the executor for a future. - /// Used in conjunction with `spawn_unchecked` to reserve the executor before spawning. - /// - /// This could have been joined with `spawn_unchecked` for a complete safe API, however the - /// codegen needs to see if the reserve fails so it can give back input parameters. If spawning - /// was done within the same call the input parameters would be lost and could not be returned. - pub fn try_reserve(&self) -> bool { - self.running + /// Spawn a future + #[inline(always)] + pub fn spawn(&self, future: impl Fn() -> F) -> bool { + // Try to reserve the executor for a future. + if self + .running .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() - } + { + // This unsafe is protected by `running` being false and the atomic setting it to true. + unsafe { + self.task.get().write(MaybeUninit::new(future())); + } + self.set_pending(); - /// Spawn a future, only valid to do after `try_reserve` succeeds. - pub unsafe fn spawn_unchecked(&self, future: F) { - debug_assert!(self.running.load(Ordering::Relaxed)); - - self.task.get().write(MaybeUninit::new(future)); - self.set_pending(); + true + } else { + false + } } /// Poll the future in the executor. + #[inline(always)] pub fn poll(&self, wake: fn()) { - if self.is_running() { + if self.is_running() && self.check_and_clear_pending() { let waker = unsafe { Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)) }; let mut cx = Context::from_waker(&waker); let future = unsafe { Pin::new_unchecked(&mut *(self.task.get() as *mut F)) };