From b2ec1fa65118813b400cf806e3ff492ea41f49ca Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Fri, 10 Jun 2022 20:08:46 +0200 Subject: [PATCH] Example running, timeout and delay futures available --- examples/async-task.rs | 168 +------------ examples/async.rs | 287 --------------------- examples/async2.rs | 361 --------------------------- macros/src/codegen.rs | 56 +---- macros/src/codegen/dispatchers.rs | 128 ++++++++-- macros/src/codegen/module.rs | 34 +-- macros/src/codegen/monotonic.rs | 251 +++++++++++++++++++ macros/src/codegen/software_tasks.rs | 30 ++- macros/src/codegen/timer_queue.rs | 36 ++- macros/src/codegen/util.rs | 9 +- src/export.rs | 68 ++++- src/tq.rs | 270 +++++++++++++++----- 12 files changed, 724 insertions(+), 974 deletions(-) delete mode 100644 examples/async.rs delete mode 100644 examples/async2.rs create mode 100644 macros/src/codegen/monotonic.rs diff --git a/examples/async-task.rs b/examples/async-task.rs index a91dfc6785..00fe581c1f 100644 --- a/examples/async-task.rs +++ b/examples/async-task.rs @@ -2,10 +2,6 @@ #![no_std] #![feature(type_alias_impl_trait)] -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll, Waker}; - use cortex_m_semihosting::{debug, hprintln}; use panic_semihosting as _; use systick_monotonic::*; @@ -25,9 +21,7 @@ mod app { pub type AppDuration = as rtic::Monotonic>::Duration; #[shared] - struct Shared { - s: u32, - } + struct Shared {} #[local] struct Local {} @@ -39,8 +33,11 @@ mod app { fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) { hprintln!("init").unwrap(); + normal_task::spawn().ok(); + async_task::spawn().ok(); + ( - Shared { s: 0 }, + Shared {}, Local {}, init::Monotonics(Systick::new(cx.core.SYST, 12_000_000)), ) @@ -55,156 +52,15 @@ mod app { } } - #[task(priority = 2)] - async fn task(cx: task::Context) { - hprintln!("delay long time").ok(); + #[task] + fn normal_task(_cx: normal_task::Context) { + hprintln!("hello from normal").ok(); + } - let fut = Delay::spawn(2500.millis()); - - hprintln!("we have just created the future").ok(); - fut.await; - hprintln!("long delay done").ok(); - - hprintln!("delay short time").ok(); - delay(500.millis()).await; - hprintln!("short delay done").ok(); - - hprintln!("test timeout").ok(); - let res = timeout(NeverEndingFuture {}, 1.secs()).await; - hprintln!("timeout done: {:?}", res).ok(); - - hprintln!("test timeout 2").ok(); - let res = timeout(delay(500.millis()), 1.secs()).await; - hprintln!("timeout done 2: {:?}", res).ok(); + #[task] + async fn async_task(_cx: async_task::Context) { + hprintln!("hello from async").ok(); debug::exit(debug::EXIT_SUCCESS); } - - #[task(capacity = 12)] - fn delay_handler(_: delay_handler::Context, waker: Waker) { - waker.wake(); - } -} - -// Delay - -pub struct Delay { - until: crate::app::AppInstant, -} - -impl Delay { - pub fn spawn(duration: crate::app::AppDuration) -> Self { - let until = crate::app::monotonics::now() + duration; - - Delay { until } - } -} - -#[inline(always)] -pub fn delay(duration: crate::app::AppDuration) -> Delay { - Delay::spawn(duration) -} - -impl Future for Delay { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let s = self.as_mut(); - let now = crate::app::monotonics::now(); - - hprintln!(" poll Delay").ok(); - - if now >= s.until { - Poll::Ready(()) - } else { - let waker = cx.waker().clone(); - crate::app::delay_handler::spawn_after(s.until - now, waker).ok(); - - Poll::Pending - } - } -} - -//============= -// Timeout future - -#[derive(Copy, Clone, Debug)] -pub struct TimeoutError; - -pub struct Timeout { - future: F, - until: crate::app::AppInstant, - cancel_handle: Option, -} - -impl Timeout -where - F: Future, -{ - pub fn timeout(future: F, duration: crate::app::AppDuration) -> Self { - let until = crate::app::monotonics::now() + duration; - Self { - future, - until, - cancel_handle: None, - } - } -} - -#[inline(always)] -pub fn timeout(future: F, duration: crate::app::AppDuration) -> Timeout { - Timeout::timeout(future, duration) -} - -impl Future for Timeout -where - F: Future, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let now = crate::app::monotonics::now(); - - // SAFETY: We don't move the underlying pinned value. - let mut s = unsafe { self.get_unchecked_mut() }; - let future = unsafe { Pin::new_unchecked(&mut s.future) }; - - hprintln!(" poll Timeout").ok(); - - match future.poll(cx) { - Poll::Ready(r) => { - if let Some(ch) = s.cancel_handle.take() { - ch.cancel().ok(); - } - - Poll::Ready(Ok(r)) - } - Poll::Pending => { - if now >= s.until { - Poll::Ready(Err(TimeoutError)) - } else if s.cancel_handle.is_none() { - let waker = cx.waker().clone(); - let sh = crate::app::delay_handler::spawn_after(s.until - now, waker) - .expect("Internal RTIC bug, this should never fail"); - s.cancel_handle = Some(sh); - - Poll::Pending - } else { - Poll::Pending - } - } - } - } -} - -pub struct NeverEndingFuture {} - -impl Future for NeverEndingFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - // Never finish - hprintln!(" polling NeverEndingFuture").ok(); - Poll::Pending - } } diff --git a/examples/async.rs b/examples/async.rs deleted file mode 100644 index 6abbbad861..0000000000 --- a/examples/async.rs +++ /dev/null @@ -1,287 +0,0 @@ -#![no_main] -#![no_std] -#![feature(type_alias_impl_trait)] - -use core::future::Future; -use core::mem; -use core::pin::Pin; -use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; - -use cortex_m_semihosting::{debug, hprintln}; -use panic_semihosting as _; - -#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)] -mod app { - use crate::Timer; - use crate::*; - - #[shared] - struct Shared { - syst: cortex_m::peripheral::SYST, - } - - #[local] - struct Local {} - - #[init] - fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) { - hprintln!("init").unwrap(); - foo::spawn().unwrap(); - foo2::spawn().unwrap(); - (Shared { syst: cx.core.SYST }, Local {}, init::Monotonics()) - } - - #[idle] - fn idle(_: idle::Context) -> ! { - // debug::exit(debug::EXIT_SUCCESS); - loop { - hprintln!("idle"); - cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs - } - } - - type F = impl Future + 'static; - static mut TASK: Task = Task::new(); - - #[task(shared = [syst])] - fn foo(mut cx: foo::Context) { - // BEGIN BOILERPLATE - fn create(cx: foo::Context<'static>) -> F { - task(cx) - } - - hprintln!("foo trampoline").ok(); - unsafe { - match TASK { - Task::Idle | Task::Done(_) => { - hprintln!("foo spawn task").ok(); - TASK.spawn(|| create(mem::transmute(cx))); - // Per: - // I think transmute could be removed as in: - // TASK.spawn(|| create(cx)); - // - // This could be done if spawn for async tasks would be passed - // a 'static reference by the generated code. - // - // Soundness: - // Check if lifetime for async context is correct. - } - _ => {} - }; - - foo_poll::spawn(); - } - // END BOILERPLATE - - async fn task(mut cx: foo::Context<'static>) { - hprintln!("foo task").ok(); - - hprintln!("delay long time").ok(); - let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 5000000)); - - hprintln!("we have just created the future"); - fut.await; // this calls poll on the timer future - hprintln!("foo task resumed").ok(); - - hprintln!("delay short time").ok(); - cx.shared.syst.lock(|syst| timer_delay(syst, 1000000)).await; - hprintln!("foo task resumed").ok(); - debug::exit(debug::EXIT_SUCCESS); - } - } - - #[task(shared = [syst])] - fn foo_poll(mut cx: foo_poll::Context) { - // BEGIN BOILERPLATE - - hprintln!("foo poll trampoline").ok(); - unsafe { - hprintln!("foo trampoline poll").ok(); - TASK.poll(|| { - hprintln!("foo poll closure").ok(); - }); - - match TASK { - Task::Done(ref r) => { - hprintln!("foo trampoline done").ok(); - // hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok(); - } - _ => { - hprintln!("foo trampoline running").ok(); - } - } - } - // END BOILERPLATE - } - - type F2 = impl Future + 'static; - static mut TASK2: Task = Task::new(); - - #[task(shared = [syst])] - fn foo2(mut cx: foo2::Context) { - // BEGIN BOILERPLATE - fn create(cx: foo2::Context<'static>) -> F2 { - task(cx) - } - - hprintln!("foo2 trampoline").ok(); - unsafe { - match TASK2 { - Task::Idle | Task::Done(_) => { - hprintln!("foo2 spawn task").ok(); - TASK2.spawn(|| create(mem::transmute(cx))); - // Per: - // I think transmute could be removed as in: - // TASK.spawn(|| create(cx)); - // - // This could be done if spawn for async tasks would be passed - // a 'static reference by the generated code. - // - // Soundness: - // Check if lifetime for async context is correct. - } - _ => {} - }; - - foo2_poll::spawn(); - } - // END BOILERPLATE - - async fn task(mut cx: foo2::Context<'static>) { - hprintln!("foo2 task").ok(); - - hprintln!("foo2 delay long time").ok(); - let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 10_000_000)); - - hprintln!("we have just created the future"); - fut.await; // this calls poll on the timer future - hprintln!("foo task resumed").ok(); - } - } - - #[task(shared = [syst])] - fn foo2_poll(mut cx: foo2_poll::Context) { - // BEGIN BOILERPLATE - - hprintln!("foo2 poll trampoline").ok(); - unsafe { - hprintln!("foo2 trampoline poll").ok(); - TASK2.poll(|| { - hprintln!("foo2 poll closure").ok(); - }); - - match TASK2 { - Task::Done(ref r) => { - hprintln!("foo2 trampoline done").ok(); - // hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok(); - } - _ => { - hprintln!("foo2 trampoline running").ok(); - } - } - } - // END BOILERPLATE - } - - // This the actual RTIC task, binds to systic. - #[task(binds = SysTick, shared = [syst], priority = 2)] - fn systic(mut cx: systic::Context) { - hprintln!("systic interrupt").ok(); - cx.shared.syst.lock(|syst| syst.disable_interrupt()); - crate::app::foo_poll::spawn(); // this should be from a Queue later - crate::app::foo2_poll::spawn(); // this should be from a Queue later - } -} - -//============= -// Waker - -static WAKER_VTABLE: RawWakerVTable = - RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); - -unsafe fn waker_clone(p: *const ()) -> RawWaker { - RawWaker::new(p, &WAKER_VTABLE) -} - -unsafe fn waker_wake(p: *const ()) { - let f: fn() = mem::transmute(p); - f(); -} - -unsafe fn waker_drop(_: *const ()) { - // nop -} - -//============ -// Task - -enum Task { - Idle, - Running(F), - Done(F::Output), -} - -impl Task { - const fn new() -> Self { - Self::Idle - } - - fn spawn(&mut self, future: impl FnOnce() -> F) { - *self = Task::Running(future()); - } - - unsafe fn poll(&mut self, wake: fn()) { - match self { - Task::Idle => {} - Task::Running(future) => { - let future = Pin::new_unchecked(future); - let waker_data: *const () = mem::transmute(wake); - let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE)); - let mut cx = Context::from_waker(&waker); - - match future.poll(&mut cx) { - Poll::Ready(r) => *self = Task::Done(r), - Poll::Pending => {} - }; - } - Task::Done(_) => {} - } - } -} - -//============= -// Timer -// Later we want a proper queue - -use heapless; -pub struct Timer { - pub done: bool, - // pub waker_task: Option Result<(), ()>>, -} - -impl Future for Timer { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.done { - Poll::Ready(()) - } else { - hprintln!("timer polled"); - cx.waker().wake_by_ref(); - hprintln!("after wake_by_ref"); - self.done = true; - Poll::Pending - } - } -} - -fn timer_delay(syst: &mut cortex_m::peripheral::SYST, t: u32) -> Timer { - hprintln!("timer_delay {}", t); - - syst.set_reload(t); - syst.enable_counter(); - syst.enable_interrupt(); - Timer { - done: false, - // waker_task: Some(app::foo::spawn), // we should add waker field to async task context i RTIC - } -} diff --git a/examples/async2.rs b/examples/async2.rs deleted file mode 100644 index 2d82ed3e96..0000000000 --- a/examples/async2.rs +++ /dev/null @@ -1,361 +0,0 @@ -#![no_main] -#![no_std] -#![feature(type_alias_impl_trait)] - -use core::future::Future; -use core::mem; -use core::pin::Pin; -use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; - -use cortex_m_semihosting::{debug, hprintln}; -use panic_semihosting as _; -use systick_monotonic::*; - -// NOTES: -// -// - Async tasks cannot have `#[lock_free]` resources, as they can interleve and each async -// task can have a mutable reference stored. -// - Spawning an async task equates to it being polled at least once. -// - ... - -#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)] -mod app { - use crate::*; - - pub type AppInstant = as rtic::Monotonic>::Instant; - pub type AppDuration = as rtic::Monotonic>::Duration; - - #[shared] - struct Shared { - s: u32, - } - - #[local] - struct Local {} - - #[monotonic(binds = SysTick, default = true)] - type MyMono = Systick<100>; - - #[init] - fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) { - hprintln!("init").unwrap(); - task_executor::spawn().unwrap(); - - ( - Shared { s: 0 }, - Local {}, - init::Monotonics(Systick::new(cx.core.SYST, 12_000_000)), - ) - } - - #[idle] - fn idle(_: idle::Context) -> ! { - // debug::exit(debug::EXIT_SUCCESS); - loop { - // hprintln!("idle"); - cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs - } - } - - // TODO: This should be the task, that is understood by the `syntax` proc-macro - // #[task(priority = 2)] - async fn task(cx: task_executor::Context<'_>) { - #[allow(unused_imports)] - use rtic::mutex_prelude::*; - - hprintln!("delay long time").ok(); - - let fut = Delay::spawn(2500.millis()); - - hprintln!("we have just created the future").ok(); - fut.await; - hprintln!("long delay done").ok(); - - hprintln!("delay short time").ok(); - sleep(500.millis()).await; - hprintln!("short delay done").ok(); - - hprintln!("test timeout").ok(); - let res = timeout(NeverEndingFuture {}, 1.secs()).await; - hprintln!("timeout done: {:?}", res).ok(); - - hprintln!("test timeout 2").ok(); - let res = timeout(Delay::spawn(500.millis()), 1.secs()).await; - hprintln!("timeout done 2: {:?}", res).ok(); - - debug::exit(debug::EXIT_SUCCESS); - } - - ////////////////////////////////////////////// - // BEGIN BOILERPLATE - ////////////////////////////////////////////// - type F = impl Future + 'static; - static mut TASK: AsyncTaskExecutor = AsyncTaskExecutor::new(); - - // TODO: This should be a special case codegen for the `dispatcher`, which runs - // in the dispatcher. Not as its own task, this is just to make it work - // in this example. - #[task(shared = [s])] - fn task_executor(cx: task_executor::Context) { - let task_storage = unsafe { &mut TASK }; - match task_storage { - AsyncTaskExecutor::Idle => { - // TODO: The context generated for async tasks need 'static lifetime, - // use `mem::transmute` for now until codegen is fixed - // - // TODO: Check if there is some way to not need 'static lifetime - hprintln!(" task_executor spawn").ok(); - task_storage.spawn(|| task(unsafe { mem::transmute(cx) })); - task_executor::spawn().ok(); - } - _ => { - hprintln!(" task_executor run").ok(); - task_storage.poll(|| { - task_executor::spawn().ok(); - }); - } - }; - } - ////////////////////////////////////////////// - // END BOILERPLATE - ////////////////////////////////////////////// - - // TODO: This is generated by the `delay` impl, it needs a capacity equal or grater - // than the number of async tasks in the system. Should more likely be a part - // of the monotonic codegen, not its own task. - #[task(capacity = 12)] - fn delay_handler(_: delay_handler::Context, waker: Waker) { - waker.wake(); - } -} - -//============= -// Waker - -static WAKER_VTABLE: RawWakerVTable = - RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); - -unsafe fn waker_clone(p: *const ()) -> RawWaker { - RawWaker::new(p, &WAKER_VTABLE) -} - -unsafe fn waker_wake(p: *const ()) { - // The only thing we need from a waker is the function to call to pend the async - // dispatcher. - let f: fn() = mem::transmute(p); - f(); -} - -unsafe fn waker_drop(_: *const ()) { - // nop -} - -//============ -// AsyncTaskExecutor - -enum AsyncTaskExecutor { - Idle, - Running(F), -} - -impl AsyncTaskExecutor { - const fn new() -> Self { - Self::Idle - } - - fn spawn(&mut self, future: impl FnOnce() -> F) { - *self = AsyncTaskExecutor::Running(future()); - } - - fn poll(&mut self, wake: fn()) { - match self { - AsyncTaskExecutor::Idle => {} - AsyncTaskExecutor::Running(future) => unsafe { - let waker_data: *const () = mem::transmute(wake); - let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE)); - let mut cx = Context::from_waker(&waker); - let future = Pin::new_unchecked(future); - - match future.poll(&mut cx) { - Poll::Ready(_) => { - *self = AsyncTaskExecutor::Idle; - hprintln!(" task_executor idle").ok(); - } - Poll::Pending => {} - }; - }, - } - } -} - -//============= -// Delay - -pub struct Delay { - until: crate::app::AppInstant, -} - -impl Delay { - pub fn spawn(duration: crate::app::AppDuration) -> Self { - let until = crate::app::monotonics::now() + duration; - - Delay { until } - } -} - -#[inline(always)] -pub fn sleep(duration: crate::app::AppDuration) -> Delay { - Delay::spawn(duration) -} - -impl Future for Delay { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let s = self.as_mut(); - let now = crate::app::monotonics::now(); - - hprintln!(" poll Delay").ok(); - - if now >= s.until { - Poll::Ready(()) - } else { - let waker = cx.waker().clone(); - crate::app::delay_handler::spawn_after(s.until - now, waker).ok(); - - Poll::Pending - } - } -} - -//============= -// Timeout future - -#[derive(Copy, Clone, Debug)] -pub struct TimeoutError; - -pub struct Timeout { - future: F, - until: crate::app::AppInstant, - cancel_handle: Option, -} - -impl Timeout -where - F: Future, -{ - pub fn timeout(future: F, duration: crate::app::AppDuration) -> Self { - let until = crate::app::monotonics::now() + duration; - Self { - future, - until, - cancel_handle: None, - } - } -} - -#[inline(always)] -pub fn timeout(future: F, duration: crate::app::AppDuration) -> Timeout { - Timeout::timeout(future, duration) -} - -impl Future for Timeout -where - F: Future, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let now = crate::app::monotonics::now(); - - // SAFETY: We don't move the underlying pinned value. - let mut s = unsafe { self.get_unchecked_mut() }; - let future = unsafe { Pin::new_unchecked(&mut s.future) }; - - hprintln!(" poll Timeout").ok(); - - match future.poll(cx) { - Poll::Ready(r) => { - if let Some(ch) = s.cancel_handle.take() { - ch.cancel().ok(); - } - - Poll::Ready(Ok(r)) - } - Poll::Pending => { - if now >= s.until { - Poll::Ready(Err(TimeoutError)) - } else if s.cancel_handle.is_none() { - let waker = cx.waker().clone(); - let sh = crate::app::delay_handler::spawn_after(s.until - now, waker) - .expect("Internal RTIC bug, this should never fail"); - s.cancel_handle = Some(sh); - - Poll::Pending - } else { - Poll::Pending - } - } - } - } -} - -pub struct NeverEndingFuture {} - -impl Future for NeverEndingFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - // Never finish - hprintln!(" polling NeverEndingFuture").ok(); - Poll::Pending - } -} - -//============= -// Async SPI driver - -// #[task] -async fn test_spi(async_spi_driver: &mut AsyncSpi) { - let transfer = Transaction { - buf: [0; 16], - n_write: 1, - n_read: 5, - }; - - let ret = async_spi_driver.transfer(transfer).await; - - // do_something(ret); -} - -/// A DMA transaction. -/// -/// NOTE: Don't leak this `Future`, if you do there is immediate UB! -struct Transaction { - pub buf: [u8; 16], - pub n_write: usize, - pub n_read: usize, -} - -struct AsyncSpi { - transaction: Option, - queue: heapless::spsc::Queue, -} - -impl AsyncSpi { - pub fn transfer(&mut self, transfer: Transaction) -> AsyncSpiTransaction { - todo!() - } -} - -struct AsyncSpiTransaction { - // ... -} - -impl Future for AsyncSpiTransaction { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - todo!() - } -} diff --git a/macros/src/codegen.rs b/macros/src/codegen.rs index 01be1d5787..d7711b638b 100644 --- a/macros/src/codegen.rs +++ b/macros/src/codegen.rs @@ -12,6 +12,7 @@ mod init; mod local_resources; mod local_resources_struct; mod module; +mod monotonic; mod post_init; mod pre_init; mod shared_resources; @@ -95,6 +96,8 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 { let (mod_app_software_tasks, root_software_tasks, user_software_tasks) = software_tasks::codegen(app, analysis, extra); + let monotonics = monotonic::codegen(app, analysis, extra); + let mod_app_dispatchers = dispatchers::codegen(app, analysis, extra); let mod_app_timer_queue = timer_queue::codegen(app, analysis, extra); let user_imports = &app.user_imports; @@ -102,59 +105,6 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 { let name = &app.name; let device = &extra.device; - let monotonic_parts: Vec<_> = app - .monotonics - .iter() - .map(|(_, monotonic)| { - let name = &monotonic.ident; - let name_str = &name.to_string(); - let ident = util::monotonic_ident(name_str); - let doc = &format!( - "This module holds the static implementation for `{}::now()`", - name_str - ); - - let default_monotonic = if monotonic.args.default { - quote!(pub use #name::now;) - } else { - quote!() - }; - - quote! { - #default_monotonic - - #[doc = #doc] - #[allow(non_snake_case)] - pub mod #name { - - /// Read the current time from this monotonic - pub fn now() -> ::Instant { - rtic::export::interrupt::free(|_| { - use rtic::Monotonic as _; - if let Some(m) = unsafe{ &mut *super::super::#ident.get_mut() } { - m.now() - } else { - ::zero() - } - }) - } - } - } - }) - .collect(); - - let monotonics = if monotonic_parts.is_empty() { - quote!() - } else { - quote!( - pub use rtic::Monotonic as _; - - /// Holds static methods for each monotonic. - pub mod monotonics { - #(#monotonic_parts)* - } - ) - }; let rt_err = util::rt_err_ident(); quote!( diff --git a/macros/src/codegen/dispatchers.rs b/macros/src/codegen/dispatchers.rs index a90a97c773..e6caa78196 100644 --- a/macros/src/codegen/dispatchers.rs +++ b/macros/src/codegen/dispatchers.rs @@ -5,7 +5,7 @@ use rtic_syntax::ast::App; use crate::{analyze::Analysis, check::Extra, codegen::util}; /// Generates task dispatchers -pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec { +pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> Vec { let mut items = vec![]; let interrupts = &analysis.interrupts; @@ -64,6 +64,9 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec = rtic::RacyCell::new(#rq_expr); )); + let device = &extra.device; + let enum_ = util::interrupt_ident(); + let interrupt = util::suffixed(&interrupts[&level].0.to_string()); let arms = channel .tasks .iter() @@ -73,37 +76,124 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec { - let #tupled = - (&*#inputs - .get()) - .get_unchecked(usize::from(index)) - .as_ptr() - .read(); - (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index); - let priority = &rtic::export::Priority::new(PRIORITY); - #name( - #name::Context::new(priority) - #(,#pats)* - ) - } - ) + if task.is_async { + let executor_run_ident = util::executor_run_ident(name); + + quote!( + #(#cfgs)* + #t::#name => { + if !(&mut *#exec_name.get_mut()).is_running() { + let #tupled = + (&*#inputs + .get()) + .get_unchecked(usize::from(index)) + .as_ptr() + .read(); + (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index); + + let priority = &rtic::export::Priority::new(PRIORITY); + (&mut *#exec_name.get_mut()).spawn(#name(#name::Context::new(priority), #(,#pats)*)); + #executor_run_ident.store(true, core::sync::atomic::Ordering::Relaxed); + } else { + retry_queue.push_unchecked((#t::#name, index)); + } + } + ) + } else { + quote!( + #(#cfgs)* + #t::#name => { + let #tupled = + (&*#inputs + .get()) + .get_unchecked(usize::from(index)) + .as_ptr() + .read(); + (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index); + let priority = &rtic::export::Priority::new(PRIORITY); + #name( + #name::Context::new(priority) + #(,#pats)* + ) + } + ) + } }) .collect::>(); + for (name, task) in app.software_tasks.iter() { + if task.is_async { + let type_name = util::internal_task_ident(name, "F"); + let exec_name = util::internal_task_ident(name, "EXEC"); + + stmts.push(quote!( + type #type_name = impl core::future::Future + 'static; + static #exec_name: + rtic::RacyCell> = + rtic::RacyCell::new(rtic::export::executor::AsyncTaskExecutor::new()); + )); + } + } + + let n_executors: usize = app + .software_tasks + .iter() + .map(|(_, task)| if task.is_async { 1 } else { 0 }) + .sum(); + + // TODO: This `retry_queue` comes from the current design of the dispatcher queue handling. + // To remove this we would need to redesign how the dispatcher handles queues, and this can + // be done as an optimization later. + // + // The core issue is that we should only dequeue the ready queue if the exexutor associated + // to the task is not running. As it is today this queue is blindly dequeued, see the + // `while let Some(...) = (&mut *#rq.get_mut())...` a few lines down. The current "hack" is + // to just requeue the executor run if it should not have been dequeued. This needs however + // to be done after the ready queue has been exhausted. + if n_executors > 0 { + stmts.push(quote!( + let mut retry_queue: rtic::export::Vec<_, #n_executors> = rtic::export::Vec::new(); + )); + } + stmts.push(quote!( while let Some((task, index)) = (&mut *#rq.get_mut()).split().1.dequeue() { match task { #(#arms)* } } + + while let Some((task, index)) = retry_queue.pop() { + rtic::export::interrupt::free(|_| { + (&mut *#rq.get_mut()).enqueue_unchecked((task, index)); + }); + } )); + for (name, _task) in app.software_tasks.iter().filter_map(|(name, task)| { + if task.is_async { + Some((name, task)) + } else { + None + } + }) { + let exec_name = util::internal_task_ident(name, "EXEC"); + + let executor_run_ident = util::executor_run_ident(name); + stmts.push(quote!( + if #executor_run_ident.load(core::sync::atomic::Ordering::Relaxed) { + #executor_run_ident.store(false, core::sync::atomic::Ordering::Relaxed); + (&mut *#exec_name.get_mut()).poll(|| { + #executor_run_ident.store(true, core::sync::atomic::Ordering::Release); + rtic::pend(#device::#enum_::#interrupt); + }); + } + )); + } + let doc = format!("Interrupt handler to dispatch tasks at priority {}", level); - let interrupt = util::suffixed(&interrupts[&level].0.to_string()); let attribute = &interrupts[&level].1.attrs; items.push(quote!( #[allow(non_snake_case)] diff --git a/macros/src/codegen/module.rs b/macros/src/codegen/module.rs index fd8137fad4..29f276625e 100644 --- a/macros/src/codegen/module.rs +++ b/macros/src/codegen/module.rs @@ -54,14 +54,6 @@ pub fn codegen( Context::Idle | Context::HardwareTask(_) | Context::SoftwareTask(_) => {} } - // if ctxt.has_locals(app) { - // let ident = util::locals_ident(ctxt, app); - // module_items.push(quote!( - // #[doc(inline)] - // pub use super::#ident as Locals; - // )); - // } - if ctxt.has_local_resources(app) { let ident = util::local_resources_ident(ctxt, app); let lt = if local_resources_tick { @@ -133,6 +125,7 @@ pub fn codegen( )); module_items.push(quote!( + #[doc(inline)] pub use super::#internal_monotonics_ident as Monotonics; )); } @@ -193,6 +186,7 @@ pub fn codegen( module_items.push(quote!( #(#cfgs)* + #[doc(inline)] pub use super::#internal_context_name as Context; )); @@ -225,6 +219,8 @@ pub fn codegen( #(#cfgs)* /// Spawns the task directly + #[allow(non_snake_case)] + #[doc(hidden)] pub fn #internal_spawn_ident(#(#args,)*) -> Result<(), #ty> { let input = #tupled; @@ -239,7 +235,6 @@ pub fn codegen( rtic::export::interrupt::free(|_| { (&mut *#rq.get_mut()).enqueue_unchecked((#t::#name, index)); }); - rtic::pend(#device::#enum_::#interrupt); Ok(()) @@ -252,6 +247,7 @@ pub fn codegen( module_items.push(quote!( #(#cfgs)* + #[doc(inline)] pub use super::#internal_spawn_ident as spawn; )); @@ -294,15 +290,21 @@ pub fn codegen( if monotonic.args.default { module_items.push(quote!( + #[doc(inline)] pub use #m::spawn_after; + #[doc(inline)] pub use #m::spawn_at; + #[doc(inline)] pub use #m::SpawnHandle; )); } module_items.push(quote!( pub mod #m { + #[doc(inline)] pub use super::super::#internal_spawn_after_ident as spawn_after; + #[doc(inline)] pub use super::super::#internal_spawn_at_ident as spawn_at; + #[doc(inline)] pub use super::super::#internal_spawn_handle_ident as SpawnHandle; } )); @@ -316,6 +318,7 @@ pub fn codegen( marker: u32, } + #(#cfgs)* impl core::fmt::Debug for #internal_spawn_handle_ident { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct(#spawn_handle_string).finish() @@ -327,7 +330,7 @@ pub fn codegen( pub fn cancel(self) -> Result<#ty, ()> { rtic::export::interrupt::free(|_| unsafe { let tq = &mut *#tq.get_mut(); - if let Some((_task, index)) = tq.cancel_marker(self.marker) { + if let Some((_task, index)) = tq.cancel_task_marker(self.marker) { // Get the message let msg = (&*#inputs .get()) @@ -362,11 +365,12 @@ pub fn codegen( let tq = (&mut *#tq.get_mut()); - tq.update_marker(self.marker, marker, instant, || #pend).map(|_| #name::#m::SpawnHandle { marker }) + tq.update_task_marker(self.marker, marker, instant, || #pend).map(|_| #name::#m::SpawnHandle { marker }) }) } } + #(#cfgs)* /// Spawns the task after a set duration relative to the current time /// @@ -407,10 +411,10 @@ pub fn codegen( rtic::export::interrupt::free(|_| { let marker = #tq_marker.get().read(); - let nr = rtic::export::NotReady { - instant, - index, + let nr = rtic::export::TaskNotReady { task: #t::#name, + index, + instant, marker, }; @@ -418,7 +422,7 @@ pub fn codegen( let tq = &mut *#tq.get_mut(); - tq.enqueue_unchecked( + tq.enqueue_task_unchecked( nr, || #enable_interrupt, || #pend, diff --git a/macros/src/codegen/monotonic.rs b/macros/src/codegen/monotonic.rs new file mode 100644 index 0000000000..685502edbb --- /dev/null +++ b/macros/src/codegen/monotonic.rs @@ -0,0 +1,251 @@ +use proc_macro2::TokenStream as TokenStream2; +use quote::quote; +use rtic_syntax::ast::App; + +use crate::{analyze::Analysis, check::Extra, codegen::util}; + +/// Generates monotonic module dispatchers +pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2 { + let mut monotonic_parts: Vec<_> = Vec::new(); + + let tq_marker = util::timer_queue_marker_ident(); + + for (_, monotonic) in &app.monotonics { + // let instants = util::monotonic_instants_ident(name, &monotonic.ident); + let monotonic_name = monotonic.ident.to_string(); + + let tq = util::tq_ident(&monotonic_name); + let m = &monotonic.ident; + let m_ident = util::monotonic_ident(&monotonic_name); + let m_isr = &monotonic.args.binds; + let enum_ = util::interrupt_ident(); + let name_str = &m.to_string(); + let ident = util::monotonic_ident(name_str); + let doc = &format!( + "This module holds the static implementation for `{}::now()`", + name_str + ); + + let (enable_interrupt, pend) = if &*m_isr.to_string() == "SysTick" { + ( + quote!(core::mem::transmute::<_, rtic::export::SYST>(()).enable_interrupt()), + quote!(rtic::export::SCB::set_pendst()), + ) + } else { + let rt_err = util::rt_err_ident(); + ( + quote!(rtic::export::NVIC::unmask(#rt_err::#enum_::#m_isr)), + quote!(rtic::pend(#rt_err::#enum_::#m_isr)), + ) + }; + + let default_monotonic = if monotonic.args.default { + quote!( + #[doc(inline)] + pub use #m::now; + #[doc(inline)] + pub use #m::delay; + #[doc(inline)] + pub use #m::timeout_at; + #[doc(inline)] + pub use #m::timeout_after; + ) + } else { + quote!() + }; + + monotonic_parts.push(quote! { + #default_monotonic + + #[doc = #doc] + #[allow(non_snake_case)] + pub mod #m { + + /// Read the current time from this monotonic + pub fn now() -> ::Instant { + rtic::export::interrupt::free(|_| { + use rtic::Monotonic as _; + if let Some(m) = unsafe{ &mut *super::super::#ident.get_mut() } { + m.now() + } else { + ::zero() + } + }) + } + + fn enqueue_waker( + instant: ::Instant, + waker: core::task::Waker + ) -> Result { + unsafe { + rtic::export::interrupt::free(|_| { + let marker = super::super::#tq_marker.get().read(); + super::super::#tq_marker.get_mut().write(marker.wrapping_add(1)); + + let nr = rtic::export::WakerNotReady { + waker, + instant, + marker, + }; + + let tq = &mut *super::super::#tq.get_mut(); + + tq.enqueue_waker( + nr, + || #enable_interrupt, + || #pend, + (&mut *super::super::#m_ident.get_mut()).as_mut()).map(|_| marker) + }) + } + } + + /// Delay + #[inline(always)] + #[allow(non_snake_case)] + pub fn delay(duration: ::Duration) + -> DelayFuture { + let until = now() + duration; + DelayFuture { until, tq_marker: None } + } + + /// Delay future. + #[allow(non_snake_case)] + #[allow(non_camel_case_types)] + pub struct DelayFuture { + until: ::Instant, + tq_marker: Option, + } + + impl core::future::Future for DelayFuture { + type Output = Result<(), ()>; + + fn poll( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_> + ) -> core::task::Poll { + let mut s = self.as_mut(); + let now = now(); + + if now >= s.until { + core::task::Poll::Ready(Ok(())) + } else { + if s.tq_marker.is_some() { + core::task::Poll::Pending + } else { + match enqueue_waker(s.until, cx.waker().clone()) { + Ok(marker) => { + s.tq_marker = Some(marker); + core::task::Poll::Pending + }, + Err(()) => core::task::Poll::Ready(Err(())), + } + } + } + } + } + + /// Timeout future. + #[allow(non_snake_case)] + #[allow(non_camel_case_types)] + pub struct TimeoutFuture { + future: F, + until: ::Instant, + tq_marker: Option, + } + + /// Timeout after + #[allow(non_snake_case)] + #[inline(always)] + pub fn timeout_after( + future: F, + duration: ::Duration + ) -> TimeoutFuture { + let until = now() + duration; + TimeoutFuture { + future, + until, + tq_marker: None, + } + } + + /// Timeout at + #[allow(non_snake_case)] + #[inline(always)] + pub fn timeout_at( + future: F, + instant: ::Instant + ) -> TimeoutFuture { + TimeoutFuture { + future, + until: instant, + tq_marker: None, + } + } + + impl core::future::Future for TimeoutFuture + where + F: core::future::Future, + { + type Output = Result, ()>; + + fn poll( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_> + ) -> core::task::Poll { + let now = now(); + + // SAFETY: We don't move the underlying pinned value. + let mut s = unsafe { self.get_unchecked_mut() }; + let future = unsafe { core::pin::Pin::new_unchecked(&mut s.future) }; + + match future.poll(cx) { + core::task::Poll::Ready(r) => { + if let Some(marker) = s.tq_marker { + rtic::export::interrupt::free(|_| unsafe { + let tq = &mut *super::super::#tq.get_mut(); + tq.cancel_waker_marker(marker); + }); + } + + core::task::Poll::Ready(Ok(Ok(r))) + } + core::task::Poll::Pending => { + if now >= s.until { + // Timeout + core::task::Poll::Ready(Ok(Err(super::TimeoutError))) + } else if s.tq_marker.is_none() { + match enqueue_waker(s.until, cx.waker().clone()) { + Ok(marker) => { + s.tq_marker = Some(marker); + core::task::Poll::Pending + }, + Err(()) => core::task::Poll::Ready(Err(())), // TQ full + } + } else { + core::task::Poll::Pending + } + } + } + } + } + } + }); + } + + if monotonic_parts.is_empty() { + quote!() + } else { + quote!( + pub use rtic::Monotonic as _; + + /// Holds static methods for each monotonic. + pub mod monotonics { + /// A timeout error. + #[derive(Debug)] + pub struct TimeoutError; + + #(#monotonic_parts)* + } + ) + } +} diff --git a/macros/src/codegen/software_tasks.rs b/macros/src/codegen/software_tasks.rs index 78f6c96128..6d08a221a0 100644 --- a/macros/src/codegen/software_tasks.rs +++ b/macros/src/codegen/software_tasks.rs @@ -27,13 +27,8 @@ pub fn codegen( let mut root = vec![]; let mut user_tasks = vec![]; - // Async tasks - for (name, task) in app.software_tasks.iter().filter(|(_, task)| task.is_async) { - // todo - } - - // Non-async tasks - for (name, task) in app.software_tasks.iter().filter(|(_, task)| !task.is_async) { + // Any task + for (name, task) in app.software_tasks.iter() { let inputs = &task.inputs; let (_, _, _, input_ty) = util::regroup_inputs(inputs); @@ -87,6 +82,7 @@ pub fn codegen( let uninit = mk_uninit(); let inputs_ident = util::inputs_ident(name); + mod_app.push(quote!( #uninit // /// Buffer that holds the inputs of a task @@ -96,6 +92,18 @@ pub fn codegen( static #inputs_ident: rtic::RacyCell<[core::mem::MaybeUninit<#input_ty>; #cap_lit]> = rtic::RacyCell::new([#(#elems,)*]); )); + if task.is_async { + let executor_ident = util::executor_run_ident(name); + mod_app.push(quote!( + #[allow(non_camel_case_types)] + #[allow(non_upper_case_globals)] + #[doc(hidden)] + static #executor_ident: core::sync::atomic::AtomicBool = + core::sync::atomic::AtomicBool::new(false); + )); + } + + let inputs = &task.inputs; // `${task}Resources` let mut shared_needs_lt = false; @@ -131,11 +139,17 @@ pub fn codegen( let attrs = &task.attrs; let cfgs = &task.cfgs; let stmts = &task.stmts; + let async_marker = if task.is_async { + quote!(async) + } else { + quote!() + }; + user_tasks.push(quote!( #(#attrs)* #(#cfgs)* #[allow(non_snake_case)] - fn #name(#context: #name::Context #(,#inputs)*) { + #async_marker fn #name(#context: #name::Context #(,#inputs)*) { use rtic::Mutex as _; use rtic::mutex::prelude::*; diff --git a/macros/src/codegen/timer_queue.rs b/macros/src/codegen/timer_queue.rs index 32e288c56d..513f78af8d 100644 --- a/macros/src/codegen/timer_queue.rs +++ b/macros/src/codegen/timer_queue.rs @@ -1,9 +1,8 @@ +use crate::{analyze::Analysis, check::Extra, codegen::util}; use proc_macro2::TokenStream as TokenStream2; use quote::quote; use rtic_syntax::ast::App; -use crate::{analyze::Analysis, check::Extra, codegen::util}; - /// Generates timer queues and timer queue handlers #[allow(clippy::too_many_lines)] pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec { @@ -67,8 +66,14 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec); + let n_task = util::capacity_literal(cap); + let n_worker: usize = app + .software_tasks + .iter() + .map(|(_name, task)| task.is_async as usize) + .sum(); + let n_worker = util::capacity_literal(n_worker); + let tq_ty = quote!(rtic::export::TimerQueue<#mono_type, #t, #n_task, #n_worker>); // For future use // let doc = format!(" RTIC internal: {}:{}", file!(), line!()); @@ -76,8 +81,12 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec = - rtic::RacyCell::new(rtic::export::TimerQueue(rtic::export::SortedLinkedList::new_u16())); + static #tq: rtic::RacyCell<#tq_ty> = rtic::RacyCell::new( + rtic::export::TimerQueue { + task_queue: rtic::export::SortedLinkedList::new_u16(), + waker_queue: rtic::export::SortedLinkedList::new_u16(), + } + ); )); let mono = util::monotonic_ident(&monotonic_name); @@ -118,7 +127,9 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec { - rtic::export::interrupt::free(|_| (&mut *#rq.get_mut()).split().0.enqueue_unchecked((#rqt::#name, index))); + rtic::export::interrupt::free(|_| + (&mut *#rq.get_mut()).split().0.enqueue_unchecked((#rqt::#name, index)) + ); #pend } @@ -137,7 +148,7 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec Vec waker.wake(), + rtic::export::TaskOrWaker::Task((task, index)) => { + match task { + #(#arms)* + } + } } } diff --git a/macros/src/codegen/util.rs b/macros/src/codegen/util.rs index 0a3edc20d2..a0a5909034 100644 --- a/macros/src/codegen/util.rs +++ b/macros/src/codegen/util.rs @@ -67,6 +67,11 @@ pub fn inputs_ident(task: &Ident) -> Ident { mark_internal_name(&format!("{}_INPUTS", task)) } +/// Generates an identifier for the `EXECUTOR_RUN` atomics (`async` API) +pub fn executor_run_ident(task: &Ident) -> Ident { + mark_internal_name(&format!("{}_EXECUTOR_RUN", task)) +} + /// Generates an identifier for the `INSTANTS` buffer (`schedule` API) pub fn monotonic_instants_ident(task: &Ident, monotonic: &Ident) -> Ident { mark_internal_name(&format!("{}_{}_INSTANTS", task, monotonic)) @@ -222,7 +227,7 @@ pub fn rq_ident(priority: u8) -> Ident { /// Generates an identifier for the `enum` of `schedule`-able tasks pub fn schedule_t_ident() -> Ident { - Ident::new("SCHED_T", Span::call_site()) + mark_internal_name("SCHED_T") } /// Generates an identifier for the `enum` of `spawn`-able tasks @@ -230,7 +235,7 @@ pub fn schedule_t_ident() -> Ident { /// This identifier needs the same structure as the `RQ` identifier because there's one ready queue /// for each of these `T` enums pub fn spawn_t_ident(priority: u8) -> Ident { - Ident::new(&format!("P{}_T", priority), Span::call_site()) + mark_internal_name(&format!("P{}_T", priority)) } /// Suffixed identifier diff --git a/src/export.rs b/src/export.rs index 6f2a1b63c1..703c0e04d8 100644 --- a/src/export.rs +++ b/src/export.rs @@ -4,7 +4,7 @@ use core::{ sync::atomic::{AtomicBool, Ordering}, }; -pub use crate::tq::{NotReady, TimerQueue}; +pub use crate::tq::{TaskNotReady, TaskOrWaker, TimerQueue, WakerNotReady}; pub use bare_metal::CriticalSection; pub use cortex_m::{ asm::nop, @@ -16,8 +16,74 @@ pub use cortex_m::{ pub use heapless::sorted_linked_list::SortedLinkedList; pub use heapless::spsc::Queue; pub use heapless::BinaryHeap; +pub use heapless::Vec; pub use rtic_monotonic as monotonic; +pub mod executor { + use core::{ + future::Future, + mem, + pin::Pin, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + }; + + static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); + + unsafe fn waker_clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &WAKER_VTABLE) + } + + unsafe fn waker_wake(p: *const ()) { + // The only thing we need from a waker is the function to call to pend the async + // dispatcher. + let f: fn() = mem::transmute(p); + f(); + } + + unsafe fn waker_drop(_: *const ()) { + // nop + } + + //============ + // AsyncTaskExecutor + + pub struct AsyncTaskExecutor { + task: Option, + } + + impl AsyncTaskExecutor { + pub const fn new() -> Self { + Self { task: None } + } + + pub fn is_running(&self) -> bool { + self.task.is_some() + } + + pub fn spawn(&mut self, future: F) { + self.task = Some(future); + } + + pub fn poll(&mut self, wake: fn()) { + if let Some(future) = &mut self.task { + unsafe { + let waker = Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)); + let mut cx = Context::from_waker(&waker); + let future = Pin::new_unchecked(future); + + match future.poll(&mut cx) { + Poll::Ready(_) => { + self.task = None; + } + Poll::Pending => {} + }; + } + } + } + } +} + pub type SCFQ = Queue; pub type SCRQ = Queue<(T, u8), N>; diff --git a/src/tq.rs b/src/tq.rs index 0f585ba4dd..90542e7308 100644 --- a/src/tq.rs +++ b/src/tq.rs @@ -1,29 +1,25 @@ use crate::Monotonic; use core::cmp::Ordering; +use core::task::Waker; use heapless::sorted_linked_list::{LinkedIndexU16, Min, SortedLinkedList}; -pub struct TimerQueue( - pub SortedLinkedList, LinkedIndexU16, Min, N>, -) -where - Mono: Monotonic, - Task: Copy; - -impl TimerQueue +pub struct TimerQueue where Mono: Monotonic, Task: Copy, { - /// # Safety - /// - /// Writing to memory with a transmute in order to enable - /// interrupts of the ``SysTick`` timer - /// - /// Enqueue a task without checking if it is full - #[inline] - pub unsafe fn enqueue_unchecked( - &mut self, - nr: NotReady, + pub task_queue: SortedLinkedList, LinkedIndexU16, Min, N_TASK>, + pub waker_queue: SortedLinkedList, LinkedIndexU16, Min, N_WAKER>, +} + +impl TimerQueue +where + Mono: Monotonic, + Task: Copy, +{ + fn check_if_enable( + &self, + instant: Mono::Instant, enable_interrupt: F1, pend_handler: F2, mono: Option<&mut Mono>, @@ -33,11 +29,17 @@ where { // Check if the top contains a non-empty element and if that element is // greater than nr - let if_heap_max_greater_than_nr = - self.0.peek().map_or(true, |head| nr.instant < head.instant); + let if_task_heap_max_greater_than_nr = self + .task_queue + .peek() + .map_or(true, |head| instant < head.instant); + let if_waker_heap_max_greater_than_nr = self + .waker_queue + .peek() + .map_or(true, |head| instant < head.instant); - if if_heap_max_greater_than_nr { - if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE && self.0.is_empty() { + if if_task_heap_max_greater_than_nr || if_waker_heap_max_greater_than_nr { + if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE && self.is_empty() { if let Some(mono) = mono { mono.enable_timer(); } @@ -46,19 +48,50 @@ where pend_handler(); } - - self.0.push_unchecked(nr); } - /// Check if the timer queue is empty. + /// Enqueue a task without checking if it is full + #[inline] + pub unsafe fn enqueue_task_unchecked( + &mut self, + nr: TaskNotReady, + enable_interrupt: F1, + pend_handler: F2, + mono: Option<&mut Mono>, + ) where + F1: FnOnce(), + F2: FnOnce(), + { + self.check_if_enable(nr.instant, enable_interrupt, pend_handler, mono); + self.task_queue.push_unchecked(nr); + } + + /// Enqueue a waker + #[inline] + pub fn enqueue_waker( + &mut self, + nr: WakerNotReady, + enable_interrupt: F1, + pend_handler: F2, + mono: Option<&mut Mono>, + ) -> Result<(), ()> + where + F1: FnOnce(), + F2: FnOnce(), + { + self.check_if_enable(nr.instant, enable_interrupt, pend_handler, mono); + self.waker_queue.push(nr).map_err(|_| ()) + } + + /// Check if all the timer queue is empty. #[inline] pub fn is_empty(&self) -> bool { - self.0.is_empty() + self.task_queue.is_empty() && self.waker_queue.is_empty() } - /// Cancel the marker value - pub fn cancel_marker(&mut self, marker: u32) -> Option<(Task, u8)> { - if let Some(val) = self.0.find_mut(|nr| nr.marker == marker) { + /// Cancel the marker value for a task + pub fn cancel_task_marker(&mut self, marker: u32) -> Option<(Task, u8)> { + if let Some(val) = self.task_queue.find_mut(|nr| nr.marker == marker) { let nr = val.pop(); Some((nr.task, nr.index)) @@ -67,16 +100,23 @@ where } } - /// Update the instant at an marker value to a new instant + /// Cancel the marker value for a waker + pub fn cancel_waker_marker(&mut self, marker: u32) { + if let Some(val) = self.waker_queue.find_mut(|nr| nr.marker == marker) { + let _ = val.pop(); + } + } + + /// Update the instant at an marker value for a task to a new instant #[allow(clippy::result_unit_err)] - pub fn update_marker( + pub fn update_task_marker( &mut self, marker: u32, new_marker: u32, instant: Mono::Instant, pend_handler: F, ) -> Result<(), ()> { - if let Some(mut val) = self.0.find_mut(|nr| nr.marker == marker) { + if let Some(mut val) = self.task_queue.find_mut(|nr| nr.marker == marker) { val.instant = instant; val.marker = new_marker; @@ -89,66 +129,134 @@ where } } + fn dequeue_task_queue( + &mut self, + instant: Mono::Instant, + mono: &mut Mono, + ) -> Option> { + let now = mono.now(); + if instant <= now { + // task became ready + let nr = unsafe { self.task_queue.pop_unchecked() }; + Some(TaskOrWaker::Task((nr.task, nr.index))) + } else { + // Set compare + mono.set_compare(instant); + + // Double check that the instant we set is really in the future, else + // dequeue. If the monotonic is fast enough it can happen that from the + // read of now to the set of the compare, the time can overflow. This is to + // guard against this. + if instant <= now { + let nr = unsafe { self.task_queue.pop_unchecked() }; + Some(TaskOrWaker::Task((nr.task, nr.index))) + } else { + None + } + } + } + + fn dequeue_waker_queue( + &mut self, + instant: Mono::Instant, + mono: &mut Mono, + ) -> Option> { + let now = mono.now(); + if instant <= now { + // task became ready + let nr = unsafe { self.waker_queue.pop_unchecked() }; + Some(TaskOrWaker::Waker(nr.waker)) + } else { + // Set compare + mono.set_compare(instant); + + // Double check that the instant we set is really in the future, else + // dequeue. If the monotonic is fast enough it can happen that from the + // read of now to the set of the compare, the time can overflow. This is to + // guard against this. + if instant <= now { + let nr = unsafe { self.waker_queue.pop_unchecked() }; + Some(TaskOrWaker::Waker(nr.waker)) + } else { + None + } + } + } + /// Dequeue a task from the ``TimerQueue`` - pub fn dequeue(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option<(Task, u8)> + pub fn dequeue(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option> where F: FnOnce(), { mono.clear_compare_flag(); - if let Some(instant) = self.0.peek().map(|p| p.instant) { - if instant <= mono.now() { - // task became ready - let nr = unsafe { self.0.pop_unchecked() }; + let tq = self.task_queue.peek().map(|p| p.instant); + let wq = self.waker_queue.peek().map(|p| p.instant); - Some((nr.task, nr.index)) - } else { - // Set compare - mono.set_compare(instant); + let dequeue_task; + let instant; - // Double check that the instant we set is really in the future, else - // dequeue. If the monotonic is fast enough it can happen that from the - // read of now to the set of the compare, the time can overflow. This is to - // guard against this. - if instant <= mono.now() { - let nr = unsafe { self.0.pop_unchecked() }; - - Some((nr.task, nr.index)) + match (tq, wq) { + (Some(tq_instant), Some(wq_instant)) => { + if tq_instant <= wq_instant { + dequeue_task = true; + instant = tq_instant; } else { - None + dequeue_task = false; + instant = wq_instant; } } - } else { - // The queue is empty, disable the interrupt. - if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE { - disable_interrupt(); - mono.disable_timer(); + (Some(tq_instant), None) => { + dequeue_task = true; + instant = tq_instant; } + (None, Some(wq_instant)) => { + dequeue_task = false; + instant = wq_instant; + } + (None, None) => { + // The queue is empty, disable the interrupt. + if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE { + disable_interrupt(); + mono.disable_timer(); + } - None + return None; + } + } + + if dequeue_task { + self.dequeue_task_queue(instant, mono) + } else { + self.dequeue_waker_queue(instant, mono) } } } -pub struct NotReady +pub enum TaskOrWaker { + Task((Task, u8)), + Waker(Waker), +} + +pub struct TaskNotReady where Task: Copy, Mono: Monotonic, { + pub task: Task, pub index: u8, pub instant: Mono::Instant, - pub task: Task, pub marker: u32, } -impl Eq for NotReady +impl Eq for TaskNotReady where Task: Copy, Mono: Monotonic, { } -impl Ord for NotReady +impl Ord for TaskNotReady where Task: Copy, Mono: Monotonic, @@ -158,7 +266,7 @@ where } } -impl PartialEq for NotReady +impl PartialEq for TaskNotReady where Task: Copy, Mono: Monotonic, @@ -168,7 +276,7 @@ where } } -impl PartialOrd for NotReady +impl PartialOrd for TaskNotReady where Task: Copy, Mono: Monotonic, @@ -177,3 +285,41 @@ where Some(self.cmp(other)) } } + +pub struct WakerNotReady +where + Mono: Monotonic, +{ + pub waker: Waker, + pub instant: Mono::Instant, + pub marker: u32, +} + +impl Eq for WakerNotReady where Mono: Monotonic {} + +impl Ord for WakerNotReady +where + Mono: Monotonic, +{ + fn cmp(&self, other: &Self) -> Ordering { + self.instant.cmp(&other.instant) + } +} + +impl PartialEq for WakerNotReady +where + Mono: Monotonic, +{ + fn eq(&self, other: &Self) -> bool { + self.instant == other.instant + } +} + +impl PartialOrd for WakerNotReady +where + Mono: Monotonic, +{ + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +}