mirror of
https://github.com/rtic-rs/rtic.git
synced 2024-11-23 20:22:51 +01:00
rtic-channel: try_* APIs now error if Sender/Receiver does not exist
This commit is contained in:
parent
fd03e7500d
commit
bad222b5a3
1 changed files with 69 additions and 27 deletions
|
@ -119,6 +119,14 @@ macro_rules! make_channel {
|
|||
/// Error state for when the receiver has been dropped.
|
||||
pub struct NoReceiver<T>(pub T);
|
||||
|
||||
/// Errors that 'try_send` can have.
|
||||
pub enum TrySendError<T> {
|
||||
/// Error state for when the receiver has been dropped.
|
||||
NoReceiver(T),
|
||||
/// Error state when the queue is full.
|
||||
Full(T),
|
||||
}
|
||||
|
||||
impl<T> core::fmt::Debug for NoReceiver<T>
|
||||
where
|
||||
T: core::fmt::Debug,
|
||||
|
@ -128,6 +136,32 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> core::fmt::Debug for TrySendError<T>
|
||||
where
|
||||
T: core::fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
TrySendError::NoReceiver(v) => write!(f, "NoReceiver({:?})", v),
|
||||
TrySendError::Full(v) => write!(f, "Full({:?})", v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> PartialEq for TrySendError<T>
|
||||
where
|
||||
T: PartialEq,
|
||||
{
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(TrySendError::NoReceiver(v1), TrySendError::NoReceiver(v2)) => v1.eq(v2),
|
||||
(TrySendError::NoReceiver(_), TrySendError::Full(_)) => false,
|
||||
(TrySendError::Full(_), TrySendError::NoReceiver(_)) => false,
|
||||
(TrySendError::Full(v1), TrySendError::Full(v2)) => v1.eq(v2),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Sender` can send to the channel and can be cloned.
|
||||
pub struct Sender<'a, T, const N: usize>(&'a Channel<T, N>);
|
||||
|
||||
|
@ -178,18 +212,22 @@ impl<'a, T, const N: usize> Sender<'a, T, N> {
|
|||
}
|
||||
|
||||
/// Try to send a value, non-blocking. If the channel is full this will return an error.
|
||||
/// Note; this does not check if the channel is closed.
|
||||
pub fn try_send(&mut self, val: T) -> Result<(), T> {
|
||||
pub fn try_send(&mut self, val: T) -> Result<(), TrySendError<T>> {
|
||||
// If the wait queue is not empty, we can't try to push into the queue.
|
||||
if !self.0.wait_queue.is_empty() {
|
||||
return Err(val);
|
||||
return Err(TrySendError::Full(val));
|
||||
}
|
||||
|
||||
// No receiver available.
|
||||
if self.is_closed() {
|
||||
return Err(TrySendError::NoReceiver(val));
|
||||
}
|
||||
|
||||
let idx =
|
||||
if let Some(idx) = critical_section::with(|cs| self.0.access(cs).freeq.pop_front()) {
|
||||
idx
|
||||
} else {
|
||||
return Err(val);
|
||||
return Err(TrySendError::Full(val));
|
||||
};
|
||||
|
||||
self.send_footer(idx, val);
|
||||
|
@ -330,19 +368,18 @@ impl<'a, T, const N: usize> core::fmt::Debug for Receiver<'a, T, N> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Error state for when all senders has been dropped.
|
||||
pub struct NoSender;
|
||||
|
||||
impl core::fmt::Debug for NoSender {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
write!(f, "NoSender")
|
||||
}
|
||||
/// Possible receive errors.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum ReceiveError {
|
||||
/// Error state for when all senders has been dropped.
|
||||
NoSender,
|
||||
/// Error state for when the queue is empty.
|
||||
Empty,
|
||||
}
|
||||
|
||||
impl<'a, T, const N: usize> Receiver<'a, T, N> {
|
||||
/// Receives a value if there is one in the channel, non-blocking.
|
||||
/// Note; this does not check if the channel is closed.
|
||||
pub fn try_recv(&mut self) -> Option<T> {
|
||||
pub fn try_recv(&mut self) -> Result<T, ReceiveError> {
|
||||
// Try to get a ready slot.
|
||||
let ready_slot = critical_section::with(|cs| self.0.access(cs).readyq.pop_front());
|
||||
|
||||
|
@ -363,15 +400,19 @@ impl<'a, T, const N: usize> Receiver<'a, T, N> {
|
|||
wait_head.wake();
|
||||
}
|
||||
|
||||
Some(r)
|
||||
Ok(r)
|
||||
} else {
|
||||
None
|
||||
if self.is_closed() {
|
||||
Err(ReceiveError::NoSender)
|
||||
} else {
|
||||
Err(ReceiveError::Empty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Receives a value, waiting if the queue is empty.
|
||||
/// If all senders are dropped this will error with `NoSender`.
|
||||
pub async fn recv(&mut self) -> Result<T, NoSender> {
|
||||
pub async fn recv(&mut self) -> Result<T, ReceiveError> {
|
||||
// There was nothing in the queue, setup the waiting.
|
||||
poll_fn(|cx| {
|
||||
// Register waker.
|
||||
|
@ -379,13 +420,14 @@ impl<'a, T, const N: usize> Receiver<'a, T, N> {
|
|||
self.0.receiver_waker.register(cx.waker());
|
||||
|
||||
// Try to dequeue.
|
||||
if let Some(val) = self.try_recv() {
|
||||
return Poll::Ready(Ok(val));
|
||||
}
|
||||
|
||||
// If the queue is empty and there is no sender, return the error.
|
||||
if self.is_closed() {
|
||||
return Poll::Ready(Err(NoSender));
|
||||
match self.try_recv() {
|
||||
Ok(val) => {
|
||||
return Poll::Ready(Ok(val));
|
||||
}
|
||||
Err(ReceiveError::NoSender) => {
|
||||
return Poll::Ready(Err(ReceiveError::NoSender));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
|
@ -476,13 +518,13 @@ mod tests {
|
|||
s.try_send(i).unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(s.try_send(11), Err(11));
|
||||
assert_eq!(s.try_send(11), Err(TrySendError::Full(11)));
|
||||
|
||||
for i in 0..10 {
|
||||
assert_eq!(r.try_recv().unwrap(), i);
|
||||
}
|
||||
|
||||
assert_eq!(r.try_recv(), None);
|
||||
assert_eq!(r.try_recv(), Err(ReceiveError::Empty));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -493,7 +535,7 @@ mod tests {
|
|||
|
||||
assert!(r.is_closed());
|
||||
|
||||
assert_eq!(r.try_recv(), None);
|
||||
assert_eq!(r.try_recv(), Err(ReceiveError::NoSender));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -504,7 +546,7 @@ mod tests {
|
|||
|
||||
assert!(s.is_closed());
|
||||
|
||||
assert_eq!(s.try_send(11), Ok(()));
|
||||
assert_eq!(s.try_send(11), Err(TrySendError::NoReceiver(11)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
Loading…
Reference in a new issue