From e2e7948411217dcf37078825180c4ff6c3182b3a Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Mon, 21 Mar 2022 14:56:56 +0100 Subject: [PATCH] Async examples working, manual codegen --- examples/async.rs | 287 +++++++++++++++++++++++++++++++++++ examples/async2.rs | 361 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 648 insertions(+) create mode 100644 examples/async.rs create mode 100644 examples/async2.rs diff --git a/examples/async.rs b/examples/async.rs new file mode 100644 index 0000000000..6abbbad861 --- /dev/null +++ b/examples/async.rs @@ -0,0 +1,287 @@ +#![no_main] +#![no_std] +#![feature(type_alias_impl_trait)] + +use core::future::Future; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use cortex_m_semihosting::{debug, hprintln}; +use panic_semihosting as _; + +#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)] +mod app { + use crate::Timer; + use crate::*; + + #[shared] + struct Shared { + syst: cortex_m::peripheral::SYST, + } + + #[local] + struct Local {} + + #[init] + fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) { + hprintln!("init").unwrap(); + foo::spawn().unwrap(); + foo2::spawn().unwrap(); + (Shared { syst: cx.core.SYST }, Local {}, init::Monotonics()) + } + + #[idle] + fn idle(_: idle::Context) -> ! { + // debug::exit(debug::EXIT_SUCCESS); + loop { + hprintln!("idle"); + cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs + } + } + + type F = impl Future + 'static; + static mut TASK: Task = Task::new(); + + #[task(shared = [syst])] + fn foo(mut cx: foo::Context) { + // BEGIN BOILERPLATE + fn create(cx: foo::Context<'static>) -> F { + task(cx) + } + + hprintln!("foo trampoline").ok(); + unsafe { + match TASK { + Task::Idle | Task::Done(_) => { + hprintln!("foo spawn task").ok(); + TASK.spawn(|| create(mem::transmute(cx))); + // Per: + // I think transmute could be removed as in: + // TASK.spawn(|| create(cx)); + // + // This could be done if spawn for async tasks would be passed + // a 'static reference by the generated code. + // + // Soundness: + // Check if lifetime for async context is correct. + } + _ => {} + }; + + foo_poll::spawn(); + } + // END BOILERPLATE + + async fn task(mut cx: foo::Context<'static>) { + hprintln!("foo task").ok(); + + hprintln!("delay long time").ok(); + let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 5000000)); + + hprintln!("we have just created the future"); + fut.await; // this calls poll on the timer future + hprintln!("foo task resumed").ok(); + + hprintln!("delay short time").ok(); + cx.shared.syst.lock(|syst| timer_delay(syst, 1000000)).await; + hprintln!("foo task resumed").ok(); + debug::exit(debug::EXIT_SUCCESS); + } + } + + #[task(shared = [syst])] + fn foo_poll(mut cx: foo_poll::Context) { + // BEGIN BOILERPLATE + + hprintln!("foo poll trampoline").ok(); + unsafe { + hprintln!("foo trampoline poll").ok(); + TASK.poll(|| { + hprintln!("foo poll closure").ok(); + }); + + match TASK { + Task::Done(ref r) => { + hprintln!("foo trampoline done").ok(); + // hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok(); + } + _ => { + hprintln!("foo trampoline running").ok(); + } + } + } + // END BOILERPLATE + } + + type F2 = impl Future + 'static; + static mut TASK2: Task = Task::new(); + + #[task(shared = [syst])] + fn foo2(mut cx: foo2::Context) { + // BEGIN BOILERPLATE + fn create(cx: foo2::Context<'static>) -> F2 { + task(cx) + } + + hprintln!("foo2 trampoline").ok(); + unsafe { + match TASK2 { + Task::Idle | Task::Done(_) => { + hprintln!("foo2 spawn task").ok(); + TASK2.spawn(|| create(mem::transmute(cx))); + // Per: + // I think transmute could be removed as in: + // TASK.spawn(|| create(cx)); + // + // This could be done if spawn for async tasks would be passed + // a 'static reference by the generated code. + // + // Soundness: + // Check if lifetime for async context is correct. + } + _ => {} + }; + + foo2_poll::spawn(); + } + // END BOILERPLATE + + async fn task(mut cx: foo2::Context<'static>) { + hprintln!("foo2 task").ok(); + + hprintln!("foo2 delay long time").ok(); + let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 10_000_000)); + + hprintln!("we have just created the future"); + fut.await; // this calls poll on the timer future + hprintln!("foo task resumed").ok(); + } + } + + #[task(shared = [syst])] + fn foo2_poll(mut cx: foo2_poll::Context) { + // BEGIN BOILERPLATE + + hprintln!("foo2 poll trampoline").ok(); + unsafe { + hprintln!("foo2 trampoline poll").ok(); + TASK2.poll(|| { + hprintln!("foo2 poll closure").ok(); + }); + + match TASK2 { + Task::Done(ref r) => { + hprintln!("foo2 trampoline done").ok(); + // hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok(); + } + _ => { + hprintln!("foo2 trampoline running").ok(); + } + } + } + // END BOILERPLATE + } + + // This the actual RTIC task, binds to systic. + #[task(binds = SysTick, shared = [syst], priority = 2)] + fn systic(mut cx: systic::Context) { + hprintln!("systic interrupt").ok(); + cx.shared.syst.lock(|syst| syst.disable_interrupt()); + crate::app::foo_poll::spawn(); // this should be from a Queue later + crate::app::foo2_poll::spawn(); // this should be from a Queue later + } +} + +//============= +// Waker + +static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); + +unsafe fn waker_clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &WAKER_VTABLE) +} + +unsafe fn waker_wake(p: *const ()) { + let f: fn() = mem::transmute(p); + f(); +} + +unsafe fn waker_drop(_: *const ()) { + // nop +} + +//============ +// Task + +enum Task { + Idle, + Running(F), + Done(F::Output), +} + +impl Task { + const fn new() -> Self { + Self::Idle + } + + fn spawn(&mut self, future: impl FnOnce() -> F) { + *self = Task::Running(future()); + } + + unsafe fn poll(&mut self, wake: fn()) { + match self { + Task::Idle => {} + Task::Running(future) => { + let future = Pin::new_unchecked(future); + let waker_data: *const () = mem::transmute(wake); + let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE)); + let mut cx = Context::from_waker(&waker); + + match future.poll(&mut cx) { + Poll::Ready(r) => *self = Task::Done(r), + Poll::Pending => {} + }; + } + Task::Done(_) => {} + } + } +} + +//============= +// Timer +// Later we want a proper queue + +use heapless; +pub struct Timer { + pub done: bool, + // pub waker_task: Option Result<(), ()>>, +} + +impl Future for Timer { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.done { + Poll::Ready(()) + } else { + hprintln!("timer polled"); + cx.waker().wake_by_ref(); + hprintln!("after wake_by_ref"); + self.done = true; + Poll::Pending + } + } +} + +fn timer_delay(syst: &mut cortex_m::peripheral::SYST, t: u32) -> Timer { + hprintln!("timer_delay {}", t); + + syst.set_reload(t); + syst.enable_counter(); + syst.enable_interrupt(); + Timer { + done: false, + // waker_task: Some(app::foo::spawn), // we should add waker field to async task context i RTIC + } +} diff --git a/examples/async2.rs b/examples/async2.rs new file mode 100644 index 0000000000..2d82ed3e96 --- /dev/null +++ b/examples/async2.rs @@ -0,0 +1,361 @@ +#![no_main] +#![no_std] +#![feature(type_alias_impl_trait)] + +use core::future::Future; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use cortex_m_semihosting::{debug, hprintln}; +use panic_semihosting as _; +use systick_monotonic::*; + +// NOTES: +// +// - Async tasks cannot have `#[lock_free]` resources, as they can interleve and each async +// task can have a mutable reference stored. +// - Spawning an async task equates to it being polled at least once. +// - ... + +#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)] +mod app { + use crate::*; + + pub type AppInstant = as rtic::Monotonic>::Instant; + pub type AppDuration = as rtic::Monotonic>::Duration; + + #[shared] + struct Shared { + s: u32, + } + + #[local] + struct Local {} + + #[monotonic(binds = SysTick, default = true)] + type MyMono = Systick<100>; + + #[init] + fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) { + hprintln!("init").unwrap(); + task_executor::spawn().unwrap(); + + ( + Shared { s: 0 }, + Local {}, + init::Monotonics(Systick::new(cx.core.SYST, 12_000_000)), + ) + } + + #[idle] + fn idle(_: idle::Context) -> ! { + // debug::exit(debug::EXIT_SUCCESS); + loop { + // hprintln!("idle"); + cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs + } + } + + // TODO: This should be the task, that is understood by the `syntax` proc-macro + // #[task(priority = 2)] + async fn task(cx: task_executor::Context<'_>) { + #[allow(unused_imports)] + use rtic::mutex_prelude::*; + + hprintln!("delay long time").ok(); + + let fut = Delay::spawn(2500.millis()); + + hprintln!("we have just created the future").ok(); + fut.await; + hprintln!("long delay done").ok(); + + hprintln!("delay short time").ok(); + sleep(500.millis()).await; + hprintln!("short delay done").ok(); + + hprintln!("test timeout").ok(); + let res = timeout(NeverEndingFuture {}, 1.secs()).await; + hprintln!("timeout done: {:?}", res).ok(); + + hprintln!("test timeout 2").ok(); + let res = timeout(Delay::spawn(500.millis()), 1.secs()).await; + hprintln!("timeout done 2: {:?}", res).ok(); + + debug::exit(debug::EXIT_SUCCESS); + } + + ////////////////////////////////////////////// + // BEGIN BOILERPLATE + ////////////////////////////////////////////// + type F = impl Future + 'static; + static mut TASK: AsyncTaskExecutor = AsyncTaskExecutor::new(); + + // TODO: This should be a special case codegen for the `dispatcher`, which runs + // in the dispatcher. Not as its own task, this is just to make it work + // in this example. + #[task(shared = [s])] + fn task_executor(cx: task_executor::Context) { + let task_storage = unsafe { &mut TASK }; + match task_storage { + AsyncTaskExecutor::Idle => { + // TODO: The context generated for async tasks need 'static lifetime, + // use `mem::transmute` for now until codegen is fixed + // + // TODO: Check if there is some way to not need 'static lifetime + hprintln!(" task_executor spawn").ok(); + task_storage.spawn(|| task(unsafe { mem::transmute(cx) })); + task_executor::spawn().ok(); + } + _ => { + hprintln!(" task_executor run").ok(); + task_storage.poll(|| { + task_executor::spawn().ok(); + }); + } + }; + } + ////////////////////////////////////////////// + // END BOILERPLATE + ////////////////////////////////////////////// + + // TODO: This is generated by the `delay` impl, it needs a capacity equal or grater + // than the number of async tasks in the system. Should more likely be a part + // of the monotonic codegen, not its own task. + #[task(capacity = 12)] + fn delay_handler(_: delay_handler::Context, waker: Waker) { + waker.wake(); + } +} + +//============= +// Waker + +static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); + +unsafe fn waker_clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &WAKER_VTABLE) +} + +unsafe fn waker_wake(p: *const ()) { + // The only thing we need from a waker is the function to call to pend the async + // dispatcher. + let f: fn() = mem::transmute(p); + f(); +} + +unsafe fn waker_drop(_: *const ()) { + // nop +} + +//============ +// AsyncTaskExecutor + +enum AsyncTaskExecutor { + Idle, + Running(F), +} + +impl AsyncTaskExecutor { + const fn new() -> Self { + Self::Idle + } + + fn spawn(&mut self, future: impl FnOnce() -> F) { + *self = AsyncTaskExecutor::Running(future()); + } + + fn poll(&mut self, wake: fn()) { + match self { + AsyncTaskExecutor::Idle => {} + AsyncTaskExecutor::Running(future) => unsafe { + let waker_data: *const () = mem::transmute(wake); + let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE)); + let mut cx = Context::from_waker(&waker); + let future = Pin::new_unchecked(future); + + match future.poll(&mut cx) { + Poll::Ready(_) => { + *self = AsyncTaskExecutor::Idle; + hprintln!(" task_executor idle").ok(); + } + Poll::Pending => {} + }; + }, + } + } +} + +//============= +// Delay + +pub struct Delay { + until: crate::app::AppInstant, +} + +impl Delay { + pub fn spawn(duration: crate::app::AppDuration) -> Self { + let until = crate::app::monotonics::now() + duration; + + Delay { until } + } +} + +#[inline(always)] +pub fn sleep(duration: crate::app::AppDuration) -> Delay { + Delay::spawn(duration) +} + +impl Future for Delay { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.as_mut(); + let now = crate::app::monotonics::now(); + + hprintln!(" poll Delay").ok(); + + if now >= s.until { + Poll::Ready(()) + } else { + let waker = cx.waker().clone(); + crate::app::delay_handler::spawn_after(s.until - now, waker).ok(); + + Poll::Pending + } + } +} + +//============= +// Timeout future + +#[derive(Copy, Clone, Debug)] +pub struct TimeoutError; + +pub struct Timeout { + future: F, + until: crate::app::AppInstant, + cancel_handle: Option, +} + +impl Timeout +where + F: Future, +{ + pub fn timeout(future: F, duration: crate::app::AppDuration) -> Self { + let until = crate::app::monotonics::now() + duration; + Self { + future, + until, + cancel_handle: None, + } + } +} + +#[inline(always)] +pub fn timeout(future: F, duration: crate::app::AppDuration) -> Timeout { + Timeout::timeout(future, duration) +} + +impl Future for Timeout +where + F: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let now = crate::app::monotonics::now(); + + // SAFETY: We don't move the underlying pinned value. + let mut s = unsafe { self.get_unchecked_mut() }; + let future = unsafe { Pin::new_unchecked(&mut s.future) }; + + hprintln!(" poll Timeout").ok(); + + match future.poll(cx) { + Poll::Ready(r) => { + if let Some(ch) = s.cancel_handle.take() { + ch.cancel().ok(); + } + + Poll::Ready(Ok(r)) + } + Poll::Pending => { + if now >= s.until { + Poll::Ready(Err(TimeoutError)) + } else if s.cancel_handle.is_none() { + let waker = cx.waker().clone(); + let sh = crate::app::delay_handler::spawn_after(s.until - now, waker) + .expect("Internal RTIC bug, this should never fail"); + s.cancel_handle = Some(sh); + + Poll::Pending + } else { + Poll::Pending + } + } + } + } +} + +pub struct NeverEndingFuture {} + +impl Future for NeverEndingFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + // Never finish + hprintln!(" polling NeverEndingFuture").ok(); + Poll::Pending + } +} + +//============= +// Async SPI driver + +// #[task] +async fn test_spi(async_spi_driver: &mut AsyncSpi) { + let transfer = Transaction { + buf: [0; 16], + n_write: 1, + n_read: 5, + }; + + let ret = async_spi_driver.transfer(transfer).await; + + // do_something(ret); +} + +/// A DMA transaction. +/// +/// NOTE: Don't leak this `Future`, if you do there is immediate UB! +struct Transaction { + pub buf: [u8; 16], + pub n_write: usize, + pub n_read: usize, +} + +struct AsyncSpi { + transaction: Option, + queue: heapless::spsc::Queue, +} + +impl AsyncSpi { + pub fn transfer(&mut self, transfer: Transaction) -> AsyncSpiTransaction { + todo!() + } +} + +struct AsyncSpiTransaction { + // ... +} + +impl Future for AsyncSpiTransaction { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + todo!() + } +}