ars/src/stream.rs
2023-11-06 18:38:00 +01:00

181 lines
4.8 KiB
Rust

use std::{
collections::HashMap,
sync::Arc,
task::{ready, Poll},
};
use axum::response::sse::Event;
use futures_util::Stream;
use tokio::{
select,
sync::{
broadcast::{error::RecvError, Receiver},
RwLock,
},
};
use tokio_util::sync::ReusableBoxFuture;
use crate::{
error::Error,
game::{Game, GameId, PlayerId},
};
pub struct PlayerBroadcastStream {
id: GameId,
player_id: PlayerId,
games: Arc<RwLock<HashMap<GameId, Game>>>,
inner: ReusableBoxFuture<'static, (Result<Result<Event, Error>, RecvError>, Receiver<()>)>,
}
impl PlayerBroadcastStream {
async fn make_future(
mut rx: Receiver<()>,
games: Arc<RwLock<HashMap<GameId, Game>>>,
id: GameId,
player_id: PlayerId,
) -> (Result<Result<Event, Error>, RecvError>, Receiver<()>) {
let result = match rx.recv().await {
Ok(_) => Ok(Self::build_template(games, &id, &player_id).await),
Err(e) => Err(e),
};
(result, rx)
}
async fn build_template(
games: Arc<RwLock<HashMap<GameId, Game>>>,
id: &GameId,
player_id: &PlayerId,
) -> Result<Event, Error> {
let games = games.read().await;
let game = games.get(id).ok_or(Error::NotFound)?;
Ok(Event::default().data(game.player_view(player_id, true).await?))
}
pub fn new(
recv: Receiver<()>,
games: Arc<RwLock<HashMap<GameId, Game>>>,
id: GameId,
player_id: PlayerId,
) -> Self {
Self {
inner: ReusableBoxFuture::new(Self::make_future(
recv,
games.clone(),
id.clone(),
player_id.clone(),
)),
games,
id,
player_id,
}
}
}
impl Stream for PlayerBroadcastStream {
type Item = Result<Event, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let (result, rx) = ready!(self.inner.poll(cx));
let future = Self::make_future(
rx,
self.games.clone(),
self.id.clone(),
self.player_id.clone(),
);
self.inner.set(future);
match result {
Ok(item) => Poll::Ready(Some(item)),
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(n)) => Poll::Ready(Some(Err(Error::StreamLagged(n)))),
}
}
}
pub struct ViewerBroadcastStream {
id: GameId,
games: Arc<RwLock<HashMap<GameId, Game>>>,
inner: ReusableBoxFuture<
'static,
(
Result<Result<Event, Error>, RecvError>,
Receiver<()>,
Receiver<()>,
),
>,
base_url: &'static str,
}
impl ViewerBroadcastStream {
async fn make_future(
mut rx1: Receiver<()>,
mut rx2: Receiver<()>,
games: Arc<RwLock<HashMap<GameId, Game>>>,
id: GameId,
base_url: &'static str,
) -> (
Result<Result<Event, Error>, RecvError>,
Receiver<()>,
Receiver<()>,
) {
let result = match select! {
a = rx1.recv() => a,
b = rx2.recv() => b
} {
Ok(_) => Ok(Self::build_template(games, &id, base_url).await),
Err(e) => Err(e),
};
(result, rx1, rx2)
}
async fn build_template(
games: Arc<RwLock<HashMap<GameId, Game>>>,
id: &GameId,
base_url: &'static str,
) -> Result<Event, Error> {
let games = games.read().await;
let game = games.get(id).ok_or(Error::NotFound)?;
Ok(Event::default().data(game.viewer_view(true, base_url).await?))
}
pub fn new(
rx1: Receiver<()>,
rx2: Receiver<()>,
games: Arc<RwLock<HashMap<GameId, Game>>>,
id: GameId,
base_url: &'static str,
) -> Self {
Self {
inner: ReusableBoxFuture::new(Self::make_future(
rx1,
rx2,
games.clone(),
id.clone(),
base_url,
)),
games,
id,
base_url,
}
}
}
impl Stream for ViewerBroadcastStream {
type Item = Result<Event, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let (result, rx1, rx2) = ready!(self.inner.poll(cx));
let future =
Self::make_future(rx1, rx2, self.games.clone(), self.id.clone(), self.base_url);
self.inner.set(future);
match result {
Ok(item) => Poll::Ready(Some(item)),
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(n)) => Poll::Ready(Some(Err(Error::StreamLagged(n)))),
}
}
}