Example running, timeout and delay futures available

This commit is contained in:
Emil Fresk 2022-06-10 20:08:46 +02:00
parent 13ccd92e63
commit b2ec1fa651
12 changed files with 724 additions and 974 deletions

View file

@ -2,10 +2,6 @@
#![no_std]
#![feature(type_alias_impl_trait)]
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use cortex_m_semihosting::{debug, hprintln};
use panic_semihosting as _;
use systick_monotonic::*;
@ -25,9 +21,7 @@ mod app {
pub type AppDuration = <Systick<100> as rtic::Monotonic>::Duration;
#[shared]
struct Shared {
s: u32,
}
struct Shared {}
#[local]
struct Local {}
@ -39,8 +33,11 @@ mod app {
fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) {
hprintln!("init").unwrap();
normal_task::spawn().ok();
async_task::spawn().ok();
(
Shared { s: 0 },
Shared {},
Local {},
init::Monotonics(Systick::new(cx.core.SYST, 12_000_000)),
)
@ -55,156 +52,15 @@ mod app {
}
}
#[task(priority = 2)]
async fn task(cx: task::Context) {
hprintln!("delay long time").ok();
#[task]
fn normal_task(_cx: normal_task::Context) {
hprintln!("hello from normal").ok();
}
let fut = Delay::spawn(2500.millis());
hprintln!("we have just created the future").ok();
fut.await;
hprintln!("long delay done").ok();
hprintln!("delay short time").ok();
delay(500.millis()).await;
hprintln!("short delay done").ok();
hprintln!("test timeout").ok();
let res = timeout(NeverEndingFuture {}, 1.secs()).await;
hprintln!("timeout done: {:?}", res).ok();
hprintln!("test timeout 2").ok();
let res = timeout(delay(500.millis()), 1.secs()).await;
hprintln!("timeout done 2: {:?}", res).ok();
#[task]
async fn async_task(_cx: async_task::Context) {
hprintln!("hello from async").ok();
debug::exit(debug::EXIT_SUCCESS);
}
#[task(capacity = 12)]
fn delay_handler(_: delay_handler::Context, waker: Waker) {
waker.wake();
}
}
// Delay
pub struct Delay {
until: crate::app::AppInstant,
}
impl Delay {
pub fn spawn(duration: crate::app::AppDuration) -> Self {
let until = crate::app::monotonics::now() + duration;
Delay { until }
}
}
#[inline(always)]
pub fn delay(duration: crate::app::AppDuration) -> Delay {
Delay::spawn(duration)
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let s = self.as_mut();
let now = crate::app::monotonics::now();
hprintln!(" poll Delay").ok();
if now >= s.until {
Poll::Ready(())
} else {
let waker = cx.waker().clone();
crate::app::delay_handler::spawn_after(s.until - now, waker).ok();
Poll::Pending
}
}
}
//=============
// Timeout future
#[derive(Copy, Clone, Debug)]
pub struct TimeoutError;
pub struct Timeout<F: Future> {
future: F,
until: crate::app::AppInstant,
cancel_handle: Option<crate::app::delay_handler::SpawnHandle>,
}
impl<F> Timeout<F>
where
F: Future,
{
pub fn timeout(future: F, duration: crate::app::AppDuration) -> Self {
let until = crate::app::monotonics::now() + duration;
Self {
future,
until,
cancel_handle: None,
}
}
}
#[inline(always)]
pub fn timeout<F: Future>(future: F, duration: crate::app::AppDuration) -> Timeout<F> {
Timeout::timeout(future, duration)
}
impl<F> Future for Timeout<F>
where
F: Future,
{
type Output = Result<F::Output, TimeoutError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let now = crate::app::monotonics::now();
// SAFETY: We don't move the underlying pinned value.
let mut s = unsafe { self.get_unchecked_mut() };
let future = unsafe { Pin::new_unchecked(&mut s.future) };
hprintln!(" poll Timeout").ok();
match future.poll(cx) {
Poll::Ready(r) => {
if let Some(ch) = s.cancel_handle.take() {
ch.cancel().ok();
}
Poll::Ready(Ok(r))
}
Poll::Pending => {
if now >= s.until {
Poll::Ready(Err(TimeoutError))
} else if s.cancel_handle.is_none() {
let waker = cx.waker().clone();
let sh = crate::app::delay_handler::spawn_after(s.until - now, waker)
.expect("Internal RTIC bug, this should never fail");
s.cancel_handle = Some(sh);
Poll::Pending
} else {
Poll::Pending
}
}
}
}
}
pub struct NeverEndingFuture {}
impl Future for NeverEndingFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
// Never finish
hprintln!(" polling NeverEndingFuture").ok();
Poll::Pending
}
}

View file

@ -1,287 +0,0 @@
#![no_main]
#![no_std]
#![feature(type_alias_impl_trait)]
use core::future::Future;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use cortex_m_semihosting::{debug, hprintln};
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)]
mod app {
use crate::Timer;
use crate::*;
#[shared]
struct Shared {
syst: cortex_m::peripheral::SYST,
}
#[local]
struct Local {}
#[init]
fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) {
hprintln!("init").unwrap();
foo::spawn().unwrap();
foo2::spawn().unwrap();
(Shared { syst: cx.core.SYST }, Local {}, init::Monotonics())
}
#[idle]
fn idle(_: idle::Context) -> ! {
// debug::exit(debug::EXIT_SUCCESS);
loop {
hprintln!("idle");
cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs
}
}
type F = impl Future + 'static;
static mut TASK: Task<F> = Task::new();
#[task(shared = [syst])]
fn foo(mut cx: foo::Context) {
// BEGIN BOILERPLATE
fn create(cx: foo::Context<'static>) -> F {
task(cx)
}
hprintln!("foo trampoline").ok();
unsafe {
match TASK {
Task::Idle | Task::Done(_) => {
hprintln!("foo spawn task").ok();
TASK.spawn(|| create(mem::transmute(cx)));
// Per:
// I think transmute could be removed as in:
// TASK.spawn(|| create(cx));
//
// This could be done if spawn for async tasks would be passed
// a 'static reference by the generated code.
//
// Soundness:
// Check if lifetime for async context is correct.
}
_ => {}
};
foo_poll::spawn();
}
// END BOILERPLATE
async fn task(mut cx: foo::Context<'static>) {
hprintln!("foo task").ok();
hprintln!("delay long time").ok();
let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 5000000));
hprintln!("we have just created the future");
fut.await; // this calls poll on the timer future
hprintln!("foo task resumed").ok();
hprintln!("delay short time").ok();
cx.shared.syst.lock(|syst| timer_delay(syst, 1000000)).await;
hprintln!("foo task resumed").ok();
debug::exit(debug::EXIT_SUCCESS);
}
}
#[task(shared = [syst])]
fn foo_poll(mut cx: foo_poll::Context) {
// BEGIN BOILERPLATE
hprintln!("foo poll trampoline").ok();
unsafe {
hprintln!("foo trampoline poll").ok();
TASK.poll(|| {
hprintln!("foo poll closure").ok();
});
match TASK {
Task::Done(ref r) => {
hprintln!("foo trampoline done").ok();
// hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok();
}
_ => {
hprintln!("foo trampoline running").ok();
}
}
}
// END BOILERPLATE
}
type F2 = impl Future + 'static;
static mut TASK2: Task<F2> = Task::new();
#[task(shared = [syst])]
fn foo2(mut cx: foo2::Context) {
// BEGIN BOILERPLATE
fn create(cx: foo2::Context<'static>) -> F2 {
task(cx)
}
hprintln!("foo2 trampoline").ok();
unsafe {
match TASK2 {
Task::Idle | Task::Done(_) => {
hprintln!("foo2 spawn task").ok();
TASK2.spawn(|| create(mem::transmute(cx)));
// Per:
// I think transmute could be removed as in:
// TASK.spawn(|| create(cx));
//
// This could be done if spawn for async tasks would be passed
// a 'static reference by the generated code.
//
// Soundness:
// Check if lifetime for async context is correct.
}
_ => {}
};
foo2_poll::spawn();
}
// END BOILERPLATE
async fn task(mut cx: foo2::Context<'static>) {
hprintln!("foo2 task").ok();
hprintln!("foo2 delay long time").ok();
let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 10_000_000));
hprintln!("we have just created the future");
fut.await; // this calls poll on the timer future
hprintln!("foo task resumed").ok();
}
}
#[task(shared = [syst])]
fn foo2_poll(mut cx: foo2_poll::Context) {
// BEGIN BOILERPLATE
hprintln!("foo2 poll trampoline").ok();
unsafe {
hprintln!("foo2 trampoline poll").ok();
TASK2.poll(|| {
hprintln!("foo2 poll closure").ok();
});
match TASK2 {
Task::Done(ref r) => {
hprintln!("foo2 trampoline done").ok();
// hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok();
}
_ => {
hprintln!("foo2 trampoline running").ok();
}
}
}
// END BOILERPLATE
}
// This the actual RTIC task, binds to systic.
#[task(binds = SysTick, shared = [syst], priority = 2)]
fn systic(mut cx: systic::Context) {
hprintln!("systic interrupt").ok();
cx.shared.syst.lock(|syst| syst.disable_interrupt());
crate::app::foo_poll::spawn(); // this should be from a Queue later
crate::app::foo2_poll::spawn(); // this should be from a Queue later
}
}
//=============
// Waker
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
}
unsafe fn waker_wake(p: *const ()) {
let f: fn() = mem::transmute(p);
f();
}
unsafe fn waker_drop(_: *const ()) {
// nop
}
//============
// Task
enum Task<F: Future + 'static> {
Idle,
Running(F),
Done(F::Output),
}
impl<F: Future + 'static> Task<F> {
const fn new() -> Self {
Self::Idle
}
fn spawn(&mut self, future: impl FnOnce() -> F) {
*self = Task::Running(future());
}
unsafe fn poll(&mut self, wake: fn()) {
match self {
Task::Idle => {}
Task::Running(future) => {
let future = Pin::new_unchecked(future);
let waker_data: *const () = mem::transmute(wake);
let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE));
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(r) => *self = Task::Done(r),
Poll::Pending => {}
};
}
Task::Done(_) => {}
}
}
}
//=============
// Timer
// Later we want a proper queue
use heapless;
pub struct Timer {
pub done: bool,
// pub waker_task: Option<fn() -> Result<(), ()>>,
}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.done {
Poll::Ready(())
} else {
hprintln!("timer polled");
cx.waker().wake_by_ref();
hprintln!("after wake_by_ref");
self.done = true;
Poll::Pending
}
}
}
fn timer_delay(syst: &mut cortex_m::peripheral::SYST, t: u32) -> Timer {
hprintln!("timer_delay {}", t);
syst.set_reload(t);
syst.enable_counter();
syst.enable_interrupt();
Timer {
done: false,
// waker_task: Some(app::foo::spawn), // we should add waker field to async task context i RTIC
}
}

View file

@ -1,361 +0,0 @@
#![no_main]
#![no_std]
#![feature(type_alias_impl_trait)]
use core::future::Future;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use cortex_m_semihosting::{debug, hprintln};
use panic_semihosting as _;
use systick_monotonic::*;
// NOTES:
//
// - Async tasks cannot have `#[lock_free]` resources, as they can interleve and each async
// task can have a mutable reference stored.
// - Spawning an async task equates to it being polled at least once.
// - ...
#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)]
mod app {
use crate::*;
pub type AppInstant = <Systick<100> as rtic::Monotonic>::Instant;
pub type AppDuration = <Systick<100> as rtic::Monotonic>::Duration;
#[shared]
struct Shared {
s: u32,
}
#[local]
struct Local {}
#[monotonic(binds = SysTick, default = true)]
type MyMono = Systick<100>;
#[init]
fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) {
hprintln!("init").unwrap();
task_executor::spawn().unwrap();
(
Shared { s: 0 },
Local {},
init::Monotonics(Systick::new(cx.core.SYST, 12_000_000)),
)
}
#[idle]
fn idle(_: idle::Context) -> ! {
// debug::exit(debug::EXIT_SUCCESS);
loop {
// hprintln!("idle");
cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs
}
}
// TODO: This should be the task, that is understood by the `syntax` proc-macro
// #[task(priority = 2)]
async fn task(cx: task_executor::Context<'_>) {
#[allow(unused_imports)]
use rtic::mutex_prelude::*;
hprintln!("delay long time").ok();
let fut = Delay::spawn(2500.millis());
hprintln!("we have just created the future").ok();
fut.await;
hprintln!("long delay done").ok();
hprintln!("delay short time").ok();
sleep(500.millis()).await;
hprintln!("short delay done").ok();
hprintln!("test timeout").ok();
let res = timeout(NeverEndingFuture {}, 1.secs()).await;
hprintln!("timeout done: {:?}", res).ok();
hprintln!("test timeout 2").ok();
let res = timeout(Delay::spawn(500.millis()), 1.secs()).await;
hprintln!("timeout done 2: {:?}", res).ok();
debug::exit(debug::EXIT_SUCCESS);
}
//////////////////////////////////////////////
// BEGIN BOILERPLATE
//////////////////////////////////////////////
type F = impl Future + 'static;
static mut TASK: AsyncTaskExecutor<F> = AsyncTaskExecutor::new();
// TODO: This should be a special case codegen for the `dispatcher`, which runs
// in the dispatcher. Not as its own task, this is just to make it work
// in this example.
#[task(shared = [s])]
fn task_executor(cx: task_executor::Context) {
let task_storage = unsafe { &mut TASK };
match task_storage {
AsyncTaskExecutor::Idle => {
// TODO: The context generated for async tasks need 'static lifetime,
// use `mem::transmute` for now until codegen is fixed
//
// TODO: Check if there is some way to not need 'static lifetime
hprintln!(" task_executor spawn").ok();
task_storage.spawn(|| task(unsafe { mem::transmute(cx) }));
task_executor::spawn().ok();
}
_ => {
hprintln!(" task_executor run").ok();
task_storage.poll(|| {
task_executor::spawn().ok();
});
}
};
}
//////////////////////////////////////////////
// END BOILERPLATE
//////////////////////////////////////////////
// TODO: This is generated by the `delay` impl, it needs a capacity equal or grater
// than the number of async tasks in the system. Should more likely be a part
// of the monotonic codegen, not its own task.
#[task(capacity = 12)]
fn delay_handler(_: delay_handler::Context, waker: Waker) {
waker.wake();
}
}
//=============
// Waker
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
}
unsafe fn waker_wake(p: *const ()) {
// The only thing we need from a waker is the function to call to pend the async
// dispatcher.
let f: fn() = mem::transmute(p);
f();
}
unsafe fn waker_drop(_: *const ()) {
// nop
}
//============
// AsyncTaskExecutor
enum AsyncTaskExecutor<F: Future + 'static> {
Idle,
Running(F),
}
impl<F: Future + 'static> AsyncTaskExecutor<F> {
const fn new() -> Self {
Self::Idle
}
fn spawn(&mut self, future: impl FnOnce() -> F) {
*self = AsyncTaskExecutor::Running(future());
}
fn poll(&mut self, wake: fn()) {
match self {
AsyncTaskExecutor::Idle => {}
AsyncTaskExecutor::Running(future) => unsafe {
let waker_data: *const () = mem::transmute(wake);
let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE));
let mut cx = Context::from_waker(&waker);
let future = Pin::new_unchecked(future);
match future.poll(&mut cx) {
Poll::Ready(_) => {
*self = AsyncTaskExecutor::Idle;
hprintln!(" task_executor idle").ok();
}
Poll::Pending => {}
};
},
}
}
}
//=============
// Delay
pub struct Delay {
until: crate::app::AppInstant,
}
impl Delay {
pub fn spawn(duration: crate::app::AppDuration) -> Self {
let until = crate::app::monotonics::now() + duration;
Delay { until }
}
}
#[inline(always)]
pub fn sleep(duration: crate::app::AppDuration) -> Delay {
Delay::spawn(duration)
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let s = self.as_mut();
let now = crate::app::monotonics::now();
hprintln!(" poll Delay").ok();
if now >= s.until {
Poll::Ready(())
} else {
let waker = cx.waker().clone();
crate::app::delay_handler::spawn_after(s.until - now, waker).ok();
Poll::Pending
}
}
}
//=============
// Timeout future
#[derive(Copy, Clone, Debug)]
pub struct TimeoutError;
pub struct Timeout<F: Future> {
future: F,
until: crate::app::AppInstant,
cancel_handle: Option<crate::app::delay_handler::SpawnHandle>,
}
impl<F> Timeout<F>
where
F: Future,
{
pub fn timeout(future: F, duration: crate::app::AppDuration) -> Self {
let until = crate::app::monotonics::now() + duration;
Self {
future,
until,
cancel_handle: None,
}
}
}
#[inline(always)]
pub fn timeout<F: Future>(future: F, duration: crate::app::AppDuration) -> Timeout<F> {
Timeout::timeout(future, duration)
}
impl<F> Future for Timeout<F>
where
F: Future,
{
type Output = Result<F::Output, TimeoutError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let now = crate::app::monotonics::now();
// SAFETY: We don't move the underlying pinned value.
let mut s = unsafe { self.get_unchecked_mut() };
let future = unsafe { Pin::new_unchecked(&mut s.future) };
hprintln!(" poll Timeout").ok();
match future.poll(cx) {
Poll::Ready(r) => {
if let Some(ch) = s.cancel_handle.take() {
ch.cancel().ok();
}
Poll::Ready(Ok(r))
}
Poll::Pending => {
if now >= s.until {
Poll::Ready(Err(TimeoutError))
} else if s.cancel_handle.is_none() {
let waker = cx.waker().clone();
let sh = crate::app::delay_handler::spawn_after(s.until - now, waker)
.expect("Internal RTIC bug, this should never fail");
s.cancel_handle = Some(sh);
Poll::Pending
} else {
Poll::Pending
}
}
}
}
}
pub struct NeverEndingFuture {}
impl Future for NeverEndingFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
// Never finish
hprintln!(" polling NeverEndingFuture").ok();
Poll::Pending
}
}
//=============
// Async SPI driver
// #[task]
async fn test_spi(async_spi_driver: &mut AsyncSpi) {
let transfer = Transaction {
buf: [0; 16],
n_write: 1,
n_read: 5,
};
let ret = async_spi_driver.transfer(transfer).await;
// do_something(ret);
}
/// A DMA transaction.
///
/// NOTE: Don't leak this `Future`, if you do there is immediate UB!
struct Transaction {
pub buf: [u8; 16],
pub n_write: usize,
pub n_read: usize,
}
struct AsyncSpi {
transaction: Option<Transaction>,
queue: heapless::spsc::Queue<Waker, 8>,
}
impl AsyncSpi {
pub fn transfer(&mut self, transfer: Transaction) -> AsyncSpiTransaction {
todo!()
}
}
struct AsyncSpiTransaction {
// ...
}
impl Future for AsyncSpiTransaction {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
todo!()
}
}

View file

@ -12,6 +12,7 @@ mod init;
mod local_resources;
mod local_resources_struct;
mod module;
mod monotonic;
mod post_init;
mod pre_init;
mod shared_resources;
@ -95,6 +96,8 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 {
let (mod_app_software_tasks, root_software_tasks, user_software_tasks) =
software_tasks::codegen(app, analysis, extra);
let monotonics = monotonic::codegen(app, analysis, extra);
let mod_app_dispatchers = dispatchers::codegen(app, analysis, extra);
let mod_app_timer_queue = timer_queue::codegen(app, analysis, extra);
let user_imports = &app.user_imports;
@ -102,59 +105,6 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 {
let name = &app.name;
let device = &extra.device;
let monotonic_parts: Vec<_> = app
.monotonics
.iter()
.map(|(_, monotonic)| {
let name = &monotonic.ident;
let name_str = &name.to_string();
let ident = util::monotonic_ident(name_str);
let doc = &format!(
"This module holds the static implementation for `{}::now()`",
name_str
);
let default_monotonic = if monotonic.args.default {
quote!(pub use #name::now;)
} else {
quote!()
};
quote! {
#default_monotonic
#[doc = #doc]
#[allow(non_snake_case)]
pub mod #name {
/// Read the current time from this monotonic
pub fn now() -> <super::super::#name as rtic::Monotonic>::Instant {
rtic::export::interrupt::free(|_| {
use rtic::Monotonic as _;
if let Some(m) = unsafe{ &mut *super::super::#ident.get_mut() } {
m.now()
} else {
<super::super::#name as rtic::Monotonic>::zero()
}
})
}
}
}
})
.collect();
let monotonics = if monotonic_parts.is_empty() {
quote!()
} else {
quote!(
pub use rtic::Monotonic as _;
/// Holds static methods for each monotonic.
pub mod monotonics {
#(#monotonic_parts)*
}
)
};
let rt_err = util::rt_err_ident();
quote!(

View file

@ -5,7 +5,7 @@ use rtic_syntax::ast::App;
use crate::{analyze::Analysis, check::Extra, codegen::util};
/// Generates task dispatchers
pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStream2> {
pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> Vec<TokenStream2> {
let mut items = vec![];
let interrupts = &analysis.interrupts;
@ -64,6 +64,9 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
static #rq: rtic::RacyCell<#rq_ty> = rtic::RacyCell::new(#rq_expr);
));
let device = &extra.device;
let enum_ = util::interrupt_ident();
let interrupt = util::suffixed(&interrupts[&level].0.to_string());
let arms = channel
.tasks
.iter()
@ -73,37 +76,124 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
let fq = util::fq_ident(name);
let inputs = util::inputs_ident(name);
let (_, tupled, pats, _) = util::regroup_inputs(&task.inputs);
let exec_name = util::internal_task_ident(name, "EXEC");
quote!(
#(#cfgs)*
#t::#name => {
let #tupled =
(&*#inputs
.get())
.get_unchecked(usize::from(index))
.as_ptr()
.read();
(&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
let priority = &rtic::export::Priority::new(PRIORITY);
#name(
#name::Context::new(priority)
#(,#pats)*
)
}
)
if task.is_async {
let executor_run_ident = util::executor_run_ident(name);
quote!(
#(#cfgs)*
#t::#name => {
if !(&mut *#exec_name.get_mut()).is_running() {
let #tupled =
(&*#inputs
.get())
.get_unchecked(usize::from(index))
.as_ptr()
.read();
(&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
let priority = &rtic::export::Priority::new(PRIORITY);
(&mut *#exec_name.get_mut()).spawn(#name(#name::Context::new(priority), #(,#pats)*));
#executor_run_ident.store(true, core::sync::atomic::Ordering::Relaxed);
} else {
retry_queue.push_unchecked((#t::#name, index));
}
}
)
} else {
quote!(
#(#cfgs)*
#t::#name => {
let #tupled =
(&*#inputs
.get())
.get_unchecked(usize::from(index))
.as_ptr()
.read();
(&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
let priority = &rtic::export::Priority::new(PRIORITY);
#name(
#name::Context::new(priority)
#(,#pats)*
)
}
)
}
})
.collect::<Vec<_>>();
for (name, task) in app.software_tasks.iter() {
if task.is_async {
let type_name = util::internal_task_ident(name, "F");
let exec_name = util::internal_task_ident(name, "EXEC");
stmts.push(quote!(
type #type_name = impl core::future::Future + 'static;
static #exec_name:
rtic::RacyCell<rtic::export::executor::AsyncTaskExecutor<#type_name>> =
rtic::RacyCell::new(rtic::export::executor::AsyncTaskExecutor::new());
));
}
}
let n_executors: usize = app
.software_tasks
.iter()
.map(|(_, task)| if task.is_async { 1 } else { 0 })
.sum();
// TODO: This `retry_queue` comes from the current design of the dispatcher queue handling.
// To remove this we would need to redesign how the dispatcher handles queues, and this can
// be done as an optimization later.
//
// The core issue is that we should only dequeue the ready queue if the exexutor associated
// to the task is not running. As it is today this queue is blindly dequeued, see the
// `while let Some(...) = (&mut *#rq.get_mut())...` a few lines down. The current "hack" is
// to just requeue the executor run if it should not have been dequeued. This needs however
// to be done after the ready queue has been exhausted.
if n_executors > 0 {
stmts.push(quote!(
let mut retry_queue: rtic::export::Vec<_, #n_executors> = rtic::export::Vec::new();
));
}
stmts.push(quote!(
while let Some((task, index)) = (&mut *#rq.get_mut()).split().1.dequeue() {
match task {
#(#arms)*
}
}
while let Some((task, index)) = retry_queue.pop() {
rtic::export::interrupt::free(|_| {
(&mut *#rq.get_mut()).enqueue_unchecked((task, index));
});
}
));
for (name, _task) in app.software_tasks.iter().filter_map(|(name, task)| {
if task.is_async {
Some((name, task))
} else {
None
}
}) {
let exec_name = util::internal_task_ident(name, "EXEC");
let executor_run_ident = util::executor_run_ident(name);
stmts.push(quote!(
if #executor_run_ident.load(core::sync::atomic::Ordering::Relaxed) {
#executor_run_ident.store(false, core::sync::atomic::Ordering::Relaxed);
(&mut *#exec_name.get_mut()).poll(|| {
#executor_run_ident.store(true, core::sync::atomic::Ordering::Release);
rtic::pend(#device::#enum_::#interrupt);
});
}
));
}
let doc = format!("Interrupt handler to dispatch tasks at priority {}", level);
let interrupt = util::suffixed(&interrupts[&level].0.to_string());
let attribute = &interrupts[&level].1.attrs;
items.push(quote!(
#[allow(non_snake_case)]

View file

@ -54,14 +54,6 @@ pub fn codegen(
Context::Idle | Context::HardwareTask(_) | Context::SoftwareTask(_) => {}
}
// if ctxt.has_locals(app) {
// let ident = util::locals_ident(ctxt, app);
// module_items.push(quote!(
// #[doc(inline)]
// pub use super::#ident as Locals;
// ));
// }
if ctxt.has_local_resources(app) {
let ident = util::local_resources_ident(ctxt, app);
let lt = if local_resources_tick {
@ -133,6 +125,7 @@ pub fn codegen(
));
module_items.push(quote!(
#[doc(inline)]
pub use super::#internal_monotonics_ident as Monotonics;
));
}
@ -193,6 +186,7 @@ pub fn codegen(
module_items.push(quote!(
#(#cfgs)*
#[doc(inline)]
pub use super::#internal_context_name as Context;
));
@ -225,6 +219,8 @@ pub fn codegen(
#(#cfgs)*
/// Spawns the task directly
#[allow(non_snake_case)]
#[doc(hidden)]
pub fn #internal_spawn_ident(#(#args,)*) -> Result<(), #ty> {
let input = #tupled;
@ -239,7 +235,6 @@ pub fn codegen(
rtic::export::interrupt::free(|_| {
(&mut *#rq.get_mut()).enqueue_unchecked((#t::#name, index));
});
rtic::pend(#device::#enum_::#interrupt);
Ok(())
@ -252,6 +247,7 @@ pub fn codegen(
module_items.push(quote!(
#(#cfgs)*
#[doc(inline)]
pub use super::#internal_spawn_ident as spawn;
));
@ -294,15 +290,21 @@ pub fn codegen(
if monotonic.args.default {
module_items.push(quote!(
#[doc(inline)]
pub use #m::spawn_after;
#[doc(inline)]
pub use #m::spawn_at;
#[doc(inline)]
pub use #m::SpawnHandle;
));
}
module_items.push(quote!(
pub mod #m {
#[doc(inline)]
pub use super::super::#internal_spawn_after_ident as spawn_after;
#[doc(inline)]
pub use super::super::#internal_spawn_at_ident as spawn_at;
#[doc(inline)]
pub use super::super::#internal_spawn_handle_ident as SpawnHandle;
}
));
@ -316,6 +318,7 @@ pub fn codegen(
marker: u32,
}
#(#cfgs)*
impl core::fmt::Debug for #internal_spawn_handle_ident {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct(#spawn_handle_string).finish()
@ -327,7 +330,7 @@ pub fn codegen(
pub fn cancel(self) -> Result<#ty, ()> {
rtic::export::interrupt::free(|_| unsafe {
let tq = &mut *#tq.get_mut();
if let Some((_task, index)) = tq.cancel_marker(self.marker) {
if let Some((_task, index)) = tq.cancel_task_marker(self.marker) {
// Get the message
let msg = (&*#inputs
.get())
@ -362,11 +365,12 @@ pub fn codegen(
let tq = (&mut *#tq.get_mut());
tq.update_marker(self.marker, marker, instant, || #pend).map(|_| #name::#m::SpawnHandle { marker })
tq.update_task_marker(self.marker, marker, instant, || #pend).map(|_| #name::#m::SpawnHandle { marker })
})
}
}
#(#cfgs)*
/// Spawns the task after a set duration relative to the current time
///
@ -407,10 +411,10 @@ pub fn codegen(
rtic::export::interrupt::free(|_| {
let marker = #tq_marker.get().read();
let nr = rtic::export::NotReady {
instant,
index,
let nr = rtic::export::TaskNotReady {
task: #t::#name,
index,
instant,
marker,
};
@ -418,7 +422,7 @@ pub fn codegen(
let tq = &mut *#tq.get_mut();
tq.enqueue_unchecked(
tq.enqueue_task_unchecked(
nr,
|| #enable_interrupt,
|| #pend,

View file

@ -0,0 +1,251 @@
use proc_macro2::TokenStream as TokenStream2;
use quote::quote;
use rtic_syntax::ast::App;
use crate::{analyze::Analysis, check::Extra, codegen::util};
/// Generates monotonic module dispatchers
pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2 {
let mut monotonic_parts: Vec<_> = Vec::new();
let tq_marker = util::timer_queue_marker_ident();
for (_, monotonic) in &app.monotonics {
// let instants = util::monotonic_instants_ident(name, &monotonic.ident);
let monotonic_name = monotonic.ident.to_string();
let tq = util::tq_ident(&monotonic_name);
let m = &monotonic.ident;
let m_ident = util::monotonic_ident(&monotonic_name);
let m_isr = &monotonic.args.binds;
let enum_ = util::interrupt_ident();
let name_str = &m.to_string();
let ident = util::monotonic_ident(name_str);
let doc = &format!(
"This module holds the static implementation for `{}::now()`",
name_str
);
let (enable_interrupt, pend) = if &*m_isr.to_string() == "SysTick" {
(
quote!(core::mem::transmute::<_, rtic::export::SYST>(()).enable_interrupt()),
quote!(rtic::export::SCB::set_pendst()),
)
} else {
let rt_err = util::rt_err_ident();
(
quote!(rtic::export::NVIC::unmask(#rt_err::#enum_::#m_isr)),
quote!(rtic::pend(#rt_err::#enum_::#m_isr)),
)
};
let default_monotonic = if monotonic.args.default {
quote!(
#[doc(inline)]
pub use #m::now;
#[doc(inline)]
pub use #m::delay;
#[doc(inline)]
pub use #m::timeout_at;
#[doc(inline)]
pub use #m::timeout_after;
)
} else {
quote!()
};
monotonic_parts.push(quote! {
#default_monotonic
#[doc = #doc]
#[allow(non_snake_case)]
pub mod #m {
/// Read the current time from this monotonic
pub fn now() -> <super::super::#m as rtic::Monotonic>::Instant {
rtic::export::interrupt::free(|_| {
use rtic::Monotonic as _;
if let Some(m) = unsafe{ &mut *super::super::#ident.get_mut() } {
m.now()
} else {
<super::super::#m as rtic::Monotonic>::zero()
}
})
}
fn enqueue_waker(
instant: <super::super::#m as rtic::Monotonic>::Instant,
waker: core::task::Waker
) -> Result<u32, ()> {
unsafe {
rtic::export::interrupt::free(|_| {
let marker = super::super::#tq_marker.get().read();
super::super::#tq_marker.get_mut().write(marker.wrapping_add(1));
let nr = rtic::export::WakerNotReady {
waker,
instant,
marker,
};
let tq = &mut *super::super::#tq.get_mut();
tq.enqueue_waker(
nr,
|| #enable_interrupt,
|| #pend,
(&mut *super::super::#m_ident.get_mut()).as_mut()).map(|_| marker)
})
}
}
/// Delay
#[inline(always)]
#[allow(non_snake_case)]
pub fn delay(duration: <super::super::#m as rtic::Monotonic>::Duration)
-> DelayFuture {
let until = now() + duration;
DelayFuture { until, tq_marker: None }
}
/// Delay future.
#[allow(non_snake_case)]
#[allow(non_camel_case_types)]
pub struct DelayFuture {
until: <super::super::#m as rtic::Monotonic>::Instant,
tq_marker: Option<u32>,
}
impl core::future::Future for DelayFuture {
type Output = Result<(), ()>;
fn poll(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>
) -> core::task::Poll<Self::Output> {
let mut s = self.as_mut();
let now = now();
if now >= s.until {
core::task::Poll::Ready(Ok(()))
} else {
if s.tq_marker.is_some() {
core::task::Poll::Pending
} else {
match enqueue_waker(s.until, cx.waker().clone()) {
Ok(marker) => {
s.tq_marker = Some(marker);
core::task::Poll::Pending
},
Err(()) => core::task::Poll::Ready(Err(())),
}
}
}
}
}
/// Timeout future.
#[allow(non_snake_case)]
#[allow(non_camel_case_types)]
pub struct TimeoutFuture<F: core::future::Future> {
future: F,
until: <super::super::#m as rtic::Monotonic>::Instant,
tq_marker: Option<u32>,
}
/// Timeout after
#[allow(non_snake_case)]
#[inline(always)]
pub fn timeout_after<F: core::future::Future>(
future: F,
duration: <super::super::#m as rtic::Monotonic>::Duration
) -> TimeoutFuture<F> {
let until = now() + duration;
TimeoutFuture {
future,
until,
tq_marker: None,
}
}
/// Timeout at
#[allow(non_snake_case)]
#[inline(always)]
pub fn timeout_at<F: core::future::Future>(
future: F,
instant: <super::super::#m as rtic::Monotonic>::Instant
) -> TimeoutFuture<F> {
TimeoutFuture {
future,
until: instant,
tq_marker: None,
}
}
impl<F> core::future::Future for TimeoutFuture<F>
where
F: core::future::Future,
{
type Output = Result<Result<F::Output, super::TimeoutError>, ()>;
fn poll(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>
) -> core::task::Poll<Self::Output> {
let now = now();
// SAFETY: We don't move the underlying pinned value.
let mut s = unsafe { self.get_unchecked_mut() };
let future = unsafe { core::pin::Pin::new_unchecked(&mut s.future) };
match future.poll(cx) {
core::task::Poll::Ready(r) => {
if let Some(marker) = s.tq_marker {
rtic::export::interrupt::free(|_| unsafe {
let tq = &mut *super::super::#tq.get_mut();
tq.cancel_waker_marker(marker);
});
}
core::task::Poll::Ready(Ok(Ok(r)))
}
core::task::Poll::Pending => {
if now >= s.until {
// Timeout
core::task::Poll::Ready(Ok(Err(super::TimeoutError)))
} else if s.tq_marker.is_none() {
match enqueue_waker(s.until, cx.waker().clone()) {
Ok(marker) => {
s.tq_marker = Some(marker);
core::task::Poll::Pending
},
Err(()) => core::task::Poll::Ready(Err(())), // TQ full
}
} else {
core::task::Poll::Pending
}
}
}
}
}
}
});
}
if monotonic_parts.is_empty() {
quote!()
} else {
quote!(
pub use rtic::Monotonic as _;
/// Holds static methods for each monotonic.
pub mod monotonics {
/// A timeout error.
#[derive(Debug)]
pub struct TimeoutError;
#(#monotonic_parts)*
}
)
}
}

View file

@ -27,13 +27,8 @@ pub fn codegen(
let mut root = vec![];
let mut user_tasks = vec![];
// Async tasks
for (name, task) in app.software_tasks.iter().filter(|(_, task)| task.is_async) {
// todo
}
// Non-async tasks
for (name, task) in app.software_tasks.iter().filter(|(_, task)| !task.is_async) {
// Any task
for (name, task) in app.software_tasks.iter() {
let inputs = &task.inputs;
let (_, _, _, input_ty) = util::regroup_inputs(inputs);
@ -87,6 +82,7 @@ pub fn codegen(
let uninit = mk_uninit();
let inputs_ident = util::inputs_ident(name);
mod_app.push(quote!(
#uninit
// /// Buffer that holds the inputs of a task
@ -96,6 +92,18 @@ pub fn codegen(
static #inputs_ident: rtic::RacyCell<[core::mem::MaybeUninit<#input_ty>; #cap_lit]> =
rtic::RacyCell::new([#(#elems,)*]);
));
if task.is_async {
let executor_ident = util::executor_run_ident(name);
mod_app.push(quote!(
#[allow(non_camel_case_types)]
#[allow(non_upper_case_globals)]
#[doc(hidden)]
static #executor_ident: core::sync::atomic::AtomicBool =
core::sync::atomic::AtomicBool::new(false);
));
}
let inputs = &task.inputs;
// `${task}Resources`
let mut shared_needs_lt = false;
@ -131,11 +139,17 @@ pub fn codegen(
let attrs = &task.attrs;
let cfgs = &task.cfgs;
let stmts = &task.stmts;
let async_marker = if task.is_async {
quote!(async)
} else {
quote!()
};
user_tasks.push(quote!(
#(#attrs)*
#(#cfgs)*
#[allow(non_snake_case)]
fn #name(#context: #name::Context #(,#inputs)*) {
#async_marker fn #name(#context: #name::Context #(,#inputs)*) {
use rtic::Mutex as _;
use rtic::mutex::prelude::*;

View file

@ -1,9 +1,8 @@
use crate::{analyze::Analysis, check::Extra, codegen::util};
use proc_macro2::TokenStream as TokenStream2;
use quote::quote;
use rtic_syntax::ast::App;
use crate::{analyze::Analysis, check::Extra, codegen::util};
/// Generates timer queues and timer queue handlers
#[allow(clippy::too_many_lines)]
pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStream2> {
@ -67,8 +66,14 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
.iter()
.map(|(_name, task)| task.args.capacity as usize)
.sum();
let n = util::capacity_literal(cap);
let tq_ty = quote!(rtic::export::TimerQueue<#mono_type, #t, #n>);
let n_task = util::capacity_literal(cap);
let n_worker: usize = app
.software_tasks
.iter()
.map(|(_name, task)| task.is_async as usize)
.sum();
let n_worker = util::capacity_literal(n_worker);
let tq_ty = quote!(rtic::export::TimerQueue<#mono_type, #t, #n_task, #n_worker>);
// For future use
// let doc = format!(" RTIC internal: {}:{}", file!(), line!());
@ -76,8 +81,12 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
#[doc(hidden)]
#[allow(non_camel_case_types)]
#[allow(non_upper_case_globals)]
static #tq: rtic::RacyCell<#tq_ty> =
rtic::RacyCell::new(rtic::export::TimerQueue(rtic::export::SortedLinkedList::new_u16()));
static #tq: rtic::RacyCell<#tq_ty> = rtic::RacyCell::new(
rtic::export::TimerQueue {
task_queue: rtic::export::SortedLinkedList::new_u16(),
waker_queue: rtic::export::SortedLinkedList::new_u16(),
}
);
));
let mono = util::monotonic_ident(&monotonic_name);
@ -118,7 +127,9 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
quote!(
#(#cfgs)*
#t::#name => {
rtic::export::interrupt::free(|_| (&mut *#rq.get_mut()).split().0.enqueue_unchecked((#rqt::#name, index)));
rtic::export::interrupt::free(|_|
(&mut *#rq.get_mut()).split().0.enqueue_unchecked((#rqt::#name, index))
);
#pend
}
@ -137,7 +148,7 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
#[no_mangle]
#[allow(non_snake_case)]
unsafe fn #bound_interrupt() {
while let Some((task, index)) = rtic::export::interrupt::free(|_|
while let Some(task_or_waker) = rtic::export::interrupt::free(|_|
if let Some(mono) = (&mut *#m_ident.get_mut()).as_mut() {
(&mut *#tq.get_mut()).dequeue(|| #disable_isr, mono)
} else {
@ -146,8 +157,13 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
core::hint::unreachable_unchecked()
})
{
match task {
#(#arms)*
match task_or_waker {
rtic::export::TaskOrWaker::Waker(waker) => waker.wake(),
rtic::export::TaskOrWaker::Task((task, index)) => {
match task {
#(#arms)*
}
}
}
}

View file

@ -67,6 +67,11 @@ pub fn inputs_ident(task: &Ident) -> Ident {
mark_internal_name(&format!("{}_INPUTS", task))
}
/// Generates an identifier for the `EXECUTOR_RUN` atomics (`async` API)
pub fn executor_run_ident(task: &Ident) -> Ident {
mark_internal_name(&format!("{}_EXECUTOR_RUN", task))
}
/// Generates an identifier for the `INSTANTS` buffer (`schedule` API)
pub fn monotonic_instants_ident(task: &Ident, monotonic: &Ident) -> Ident {
mark_internal_name(&format!("{}_{}_INSTANTS", task, monotonic))
@ -222,7 +227,7 @@ pub fn rq_ident(priority: u8) -> Ident {
/// Generates an identifier for the `enum` of `schedule`-able tasks
pub fn schedule_t_ident() -> Ident {
Ident::new("SCHED_T", Span::call_site())
mark_internal_name("SCHED_T")
}
/// Generates an identifier for the `enum` of `spawn`-able tasks
@ -230,7 +235,7 @@ pub fn schedule_t_ident() -> Ident {
/// This identifier needs the same structure as the `RQ` identifier because there's one ready queue
/// for each of these `T` enums
pub fn spawn_t_ident(priority: u8) -> Ident {
Ident::new(&format!("P{}_T", priority), Span::call_site())
mark_internal_name(&format!("P{}_T", priority))
}
/// Suffixed identifier

View file

@ -4,7 +4,7 @@ use core::{
sync::atomic::{AtomicBool, Ordering},
};
pub use crate::tq::{NotReady, TimerQueue};
pub use crate::tq::{TaskNotReady, TaskOrWaker, TimerQueue, WakerNotReady};
pub use bare_metal::CriticalSection;
pub use cortex_m::{
asm::nop,
@ -16,8 +16,74 @@ pub use cortex_m::{
pub use heapless::sorted_linked_list::SortedLinkedList;
pub use heapless::spsc::Queue;
pub use heapless::BinaryHeap;
pub use heapless::Vec;
pub use rtic_monotonic as monotonic;
pub mod executor {
use core::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
}
unsafe fn waker_wake(p: *const ()) {
// The only thing we need from a waker is the function to call to pend the async
// dispatcher.
let f: fn() = mem::transmute(p);
f();
}
unsafe fn waker_drop(_: *const ()) {
// nop
}
//============
// AsyncTaskExecutor
pub struct AsyncTaskExecutor<F: Future + 'static> {
task: Option<F>,
}
impl<F: Future + 'static> AsyncTaskExecutor<F> {
pub const fn new() -> Self {
Self { task: None }
}
pub fn is_running(&self) -> bool {
self.task.is_some()
}
pub fn spawn(&mut self, future: F) {
self.task = Some(future);
}
pub fn poll(&mut self, wake: fn()) {
if let Some(future) = &mut self.task {
unsafe {
let waker = Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE));
let mut cx = Context::from_waker(&waker);
let future = Pin::new_unchecked(future);
match future.poll(&mut cx) {
Poll::Ready(_) => {
self.task = None;
}
Poll::Pending => {}
};
}
}
}
}
}
pub type SCFQ<const N: usize> = Queue<u8, N>;
pub type SCRQ<T, const N: usize> = Queue<(T, u8), N>;

270
src/tq.rs
View file

@ -1,29 +1,25 @@
use crate::Monotonic;
use core::cmp::Ordering;
use core::task::Waker;
use heapless::sorted_linked_list::{LinkedIndexU16, Min, SortedLinkedList};
pub struct TimerQueue<Mono, Task, const N: usize>(
pub SortedLinkedList<NotReady<Mono, Task>, LinkedIndexU16, Min, N>,
)
where
Mono: Monotonic,
Task: Copy;
impl<Mono, Task, const N: usize> TimerQueue<Mono, Task, N>
pub struct TimerQueue<Mono, Task, const N_TASK: usize, const N_WAKER: usize>
where
Mono: Monotonic,
Task: Copy,
{
/// # Safety
///
/// Writing to memory with a transmute in order to enable
/// interrupts of the ``SysTick`` timer
///
/// Enqueue a task without checking if it is full
#[inline]
pub unsafe fn enqueue_unchecked<F1, F2>(
&mut self,
nr: NotReady<Mono, Task>,
pub task_queue: SortedLinkedList<TaskNotReady<Mono, Task>, LinkedIndexU16, Min, N_TASK>,
pub waker_queue: SortedLinkedList<WakerNotReady<Mono>, LinkedIndexU16, Min, N_WAKER>,
}
impl<Mono, Task, const N_TASK: usize, const N_WAKER: usize> TimerQueue<Mono, Task, N_TASK, N_WAKER>
where
Mono: Monotonic,
Task: Copy,
{
fn check_if_enable<F1, F2>(
&self,
instant: Mono::Instant,
enable_interrupt: F1,
pend_handler: F2,
mono: Option<&mut Mono>,
@ -33,11 +29,17 @@ where
{
// Check if the top contains a non-empty element and if that element is
// greater than nr
let if_heap_max_greater_than_nr =
self.0.peek().map_or(true, |head| nr.instant < head.instant);
let if_task_heap_max_greater_than_nr = self
.task_queue
.peek()
.map_or(true, |head| instant < head.instant);
let if_waker_heap_max_greater_than_nr = self
.waker_queue
.peek()
.map_or(true, |head| instant < head.instant);
if if_heap_max_greater_than_nr {
if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE && self.0.is_empty() {
if if_task_heap_max_greater_than_nr || if_waker_heap_max_greater_than_nr {
if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE && self.is_empty() {
if let Some(mono) = mono {
mono.enable_timer();
}
@ -46,19 +48,50 @@ where
pend_handler();
}
self.0.push_unchecked(nr);
}
/// Check if the timer queue is empty.
/// Enqueue a task without checking if it is full
#[inline]
pub unsafe fn enqueue_task_unchecked<F1, F2>(
&mut self,
nr: TaskNotReady<Mono, Task>,
enable_interrupt: F1,
pend_handler: F2,
mono: Option<&mut Mono>,
) where
F1: FnOnce(),
F2: FnOnce(),
{
self.check_if_enable(nr.instant, enable_interrupt, pend_handler, mono);
self.task_queue.push_unchecked(nr);
}
/// Enqueue a waker
#[inline]
pub fn enqueue_waker<F1, F2>(
&mut self,
nr: WakerNotReady<Mono>,
enable_interrupt: F1,
pend_handler: F2,
mono: Option<&mut Mono>,
) -> Result<(), ()>
where
F1: FnOnce(),
F2: FnOnce(),
{
self.check_if_enable(nr.instant, enable_interrupt, pend_handler, mono);
self.waker_queue.push(nr).map_err(|_| ())
}
/// Check if all the timer queue is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
self.task_queue.is_empty() && self.waker_queue.is_empty()
}
/// Cancel the marker value
pub fn cancel_marker(&mut self, marker: u32) -> Option<(Task, u8)> {
if let Some(val) = self.0.find_mut(|nr| nr.marker == marker) {
/// Cancel the marker value for a task
pub fn cancel_task_marker(&mut self, marker: u32) -> Option<(Task, u8)> {
if let Some(val) = self.task_queue.find_mut(|nr| nr.marker == marker) {
let nr = val.pop();
Some((nr.task, nr.index))
@ -67,16 +100,23 @@ where
}
}
/// Update the instant at an marker value to a new instant
/// Cancel the marker value for a waker
pub fn cancel_waker_marker(&mut self, marker: u32) {
if let Some(val) = self.waker_queue.find_mut(|nr| nr.marker == marker) {
let _ = val.pop();
}
}
/// Update the instant at an marker value for a task to a new instant
#[allow(clippy::result_unit_err)]
pub fn update_marker<F: FnOnce()>(
pub fn update_task_marker<F: FnOnce()>(
&mut self,
marker: u32,
new_marker: u32,
instant: Mono::Instant,
pend_handler: F,
) -> Result<(), ()> {
if let Some(mut val) = self.0.find_mut(|nr| nr.marker == marker) {
if let Some(mut val) = self.task_queue.find_mut(|nr| nr.marker == marker) {
val.instant = instant;
val.marker = new_marker;
@ -89,66 +129,134 @@ where
}
}
fn dequeue_task_queue(
&mut self,
instant: Mono::Instant,
mono: &mut Mono,
) -> Option<TaskOrWaker<Task>> {
let now = mono.now();
if instant <= now {
// task became ready
let nr = unsafe { self.task_queue.pop_unchecked() };
Some(TaskOrWaker::Task((nr.task, nr.index)))
} else {
// Set compare
mono.set_compare(instant);
// Double check that the instant we set is really in the future, else
// dequeue. If the monotonic is fast enough it can happen that from the
// read of now to the set of the compare, the time can overflow. This is to
// guard against this.
if instant <= now {
let nr = unsafe { self.task_queue.pop_unchecked() };
Some(TaskOrWaker::Task((nr.task, nr.index)))
} else {
None
}
}
}
fn dequeue_waker_queue(
&mut self,
instant: Mono::Instant,
mono: &mut Mono,
) -> Option<TaskOrWaker<Task>> {
let now = mono.now();
if instant <= now {
// task became ready
let nr = unsafe { self.waker_queue.pop_unchecked() };
Some(TaskOrWaker::Waker(nr.waker))
} else {
// Set compare
mono.set_compare(instant);
// Double check that the instant we set is really in the future, else
// dequeue. If the monotonic is fast enough it can happen that from the
// read of now to the set of the compare, the time can overflow. This is to
// guard against this.
if instant <= now {
let nr = unsafe { self.waker_queue.pop_unchecked() };
Some(TaskOrWaker::Waker(nr.waker))
} else {
None
}
}
}
/// Dequeue a task from the ``TimerQueue``
pub fn dequeue<F>(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option<(Task, u8)>
pub fn dequeue<F>(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option<TaskOrWaker<Task>>
where
F: FnOnce(),
{
mono.clear_compare_flag();
if let Some(instant) = self.0.peek().map(|p| p.instant) {
if instant <= mono.now() {
// task became ready
let nr = unsafe { self.0.pop_unchecked() };
let tq = self.task_queue.peek().map(|p| p.instant);
let wq = self.waker_queue.peek().map(|p| p.instant);
Some((nr.task, nr.index))
} else {
// Set compare
mono.set_compare(instant);
let dequeue_task;
let instant;
// Double check that the instant we set is really in the future, else
// dequeue. If the monotonic is fast enough it can happen that from the
// read of now to the set of the compare, the time can overflow. This is to
// guard against this.
if instant <= mono.now() {
let nr = unsafe { self.0.pop_unchecked() };
Some((nr.task, nr.index))
match (tq, wq) {
(Some(tq_instant), Some(wq_instant)) => {
if tq_instant <= wq_instant {
dequeue_task = true;
instant = tq_instant;
} else {
None
dequeue_task = false;
instant = wq_instant;
}
}
} else {
// The queue is empty, disable the interrupt.
if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE {
disable_interrupt();
mono.disable_timer();
(Some(tq_instant), None) => {
dequeue_task = true;
instant = tq_instant;
}
(None, Some(wq_instant)) => {
dequeue_task = false;
instant = wq_instant;
}
(None, None) => {
// The queue is empty, disable the interrupt.
if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE {
disable_interrupt();
mono.disable_timer();
}
None
return None;
}
}
if dequeue_task {
self.dequeue_task_queue(instant, mono)
} else {
self.dequeue_waker_queue(instant, mono)
}
}
}
pub struct NotReady<Mono, Task>
pub enum TaskOrWaker<Task> {
Task((Task, u8)),
Waker(Waker),
}
pub struct TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
{
pub task: Task,
pub index: u8,
pub instant: Mono::Instant,
pub task: Task,
pub marker: u32,
}
impl<Mono, Task> Eq for NotReady<Mono, Task>
impl<Mono, Task> Eq for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
{
}
impl<Mono, Task> Ord for NotReady<Mono, Task>
impl<Mono, Task> Ord for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
@ -158,7 +266,7 @@ where
}
}
impl<Mono, Task> PartialEq for NotReady<Mono, Task>
impl<Mono, Task> PartialEq for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
@ -168,7 +276,7 @@ where
}
}
impl<Mono, Task> PartialOrd for NotReady<Mono, Task>
impl<Mono, Task> PartialOrd for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
@ -177,3 +285,41 @@ where
Some(self.cmp(other))
}
}
pub struct WakerNotReady<Mono>
where
Mono: Monotonic,
{
pub waker: Waker,
pub instant: Mono::Instant,
pub marker: u32,
}
impl<Mono> Eq for WakerNotReady<Mono> where Mono: Monotonic {}
impl<Mono> Ord for WakerNotReady<Mono>
where
Mono: Monotonic,
{
fn cmp(&self, other: &Self) -> Ordering {
self.instant.cmp(&other.instant)
}
}
impl<Mono> PartialEq for WakerNotReady<Mono>
where
Mono: Monotonic,
{
fn eq(&self, other: &Self) -> bool {
self.instant == other.instant
}
}
impl<Mono> PartialOrd for WakerNotReady<Mono>
where
Mono: Monotonic,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}