use std::{ collections::HashMap, sync::Arc, task::{ready, Poll}, }; use axum::response::sse::Event; use futures_util::Stream; use tokio::{ select, sync::{ broadcast::{error::RecvError, Receiver}, RwLock, }, }; use tokio_util::sync::ReusableBoxFuture; use crate::{error::Error, game::Game}; pub struct PlayerBroadcastStream { id: String, player_id: String, games: Arc>>, inner: ReusableBoxFuture<'static, (Result, RecvError>, Receiver<()>)>, } impl PlayerBroadcastStream { async fn make_future( mut rx: Receiver<()>, games: Arc>>, id: String, player_id: String, ) -> (Result, RecvError>, Receiver<()>) { let result = match rx.recv().await { Ok(_) => Ok(Self::build_template(games, id, player_id).await), Err(e) => Err(e), }; (result, rx) } async fn build_template( games: Arc>>, id: String, player_id: String, ) -> Result { let games = games.read().await; let game = games.get(&id).ok_or(Error::NotFound)?; Ok(Event::default().data(game.player_view(&player_id, true).await?)) } pub fn new( recv: Receiver<()>, games: Arc>>, id: String, player_id: String, ) -> Self { Self { inner: ReusableBoxFuture::new(Self::make_future( recv, games.clone(), id.clone(), player_id.clone(), )), games, id, player_id, } } } impl Stream for PlayerBroadcastStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { let (result, rx) = ready!(self.inner.poll(cx)); let future = Self::make_future( rx, self.games.clone(), self.id.clone(), self.player_id.clone(), ); self.inner.set(future); match result { Ok(item) => Poll::Ready(Some(item)), Err(RecvError::Closed) => Poll::Ready(None), Err(RecvError::Lagged(n)) => Poll::Ready(Some(Err(Error::StreamLagged(n)))), } } } pub struct ViewerBroadcastStream { id: String, games: Arc>>, inner: ReusableBoxFuture< 'static, ( Result, RecvError>, Receiver<()>, Receiver<()>, ), >, base_url: String, } impl ViewerBroadcastStream { async fn make_future( mut rx1: Receiver<()>, mut rx2: Receiver<()>, games: Arc>>, id: String, base_url: String, ) -> ( Result, RecvError>, Receiver<()>, Receiver<()>, ) { let result = match select! { a = rx1.recv() => a, b = rx2.recv() => b } { Ok(_) => Ok(Self::build_template(games, id, base_url).await), Err(e) => Err(e), }; (result, rx1, rx2) } async fn build_template( games: Arc>>, id: String, base_url: String, ) -> Result { let games = games.read().await; let game = games.get(&id).ok_or(Error::NotFound)?; Ok(Event::default().data(game.viewer_view(true, &base_url).await?)) } pub fn new( rx1: Receiver<()>, rx2: Receiver<()>, games: Arc>>, id: String, base_url: String, ) -> Self { Self { inner: ReusableBoxFuture::new(Self::make_future( rx1, rx2, games.clone(), id.clone(), base_url.clone(), )), games, id, base_url, } } } impl Stream for ViewerBroadcastStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { let (result, rx1, rx2) = ready!(self.inner.poll(cx)); let future = Self::make_future( rx1, rx2, self.games.clone(), self.id.clone(), self.base_url.clone(), ); self.inner.set(future); match result { Ok(item) => Poll::Ready(Some(item)), Err(RecvError::Closed) => Poll::Ready(None), Err(RecvError::Lagged(n)) => Poll::Ready(Some(Err(Error::StreamLagged(n)))), } } }