memory optimization

This commit is contained in:
Paul Zinselmeyer 2023-11-06 18:38:00 +01:00
parent 5ae2a2d4e2
commit a6a4d68651
6 changed files with 204 additions and 147 deletions

View file

@ -1,11 +1,14 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
ops::Deref,
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use qrcode::{render::svg, QrCode}; use qrcode::{render::svg, QrCode};
use rand::{distributions, Rng};
use sailfish::TemplateOnce; use sailfish::TemplateOnce;
use serde::Deserialize;
use tokio::{ use tokio::{
select, select,
sync::{broadcast, RwLock}, sync::{broadcast, RwLock},
@ -16,6 +19,65 @@ use crate::{
ViewerState, ViewerState,
}; };
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
pub struct GameId(Arc<str>);
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
pub struct PlayerId(Arc<str>);
impl Deref for GameId {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<String> for GameId {
fn from(value: String) -> Self {
Self(value.into())
}
}
impl GameId {
pub fn random() -> Self {
Self(
rand::thread_rng()
.sample_iter(distributions::Alphanumeric)
.take(8)
.map(char::from)
.collect::<String>()
.into(),
)
}
}
impl Deref for PlayerId {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<String> for PlayerId {
fn from(value: String) -> Self {
Self(value.into())
}
}
impl PlayerId {
pub fn random() -> Self {
Self(
rand::thread_rng()
.sample_iter(distributions::Alphanumeric)
.take(32)
.map(char::from)
.collect::<String>()
.into(),
)
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum GameState { pub enum GameState {
NotStarted, NotStarted,
@ -25,18 +87,18 @@ pub enum GameState {
} }
pub struct Game { pub struct Game {
pub id: String, pub id: GameId,
pub owner: String, pub owner: String,
pub state: Arc<RwLock<GameState>>, pub state: Arc<RwLock<GameState>>,
pub quiz: Quiz, pub quiz: Quiz,
pub players: HashSet<String>, pub players: HashSet<PlayerId>,
pub on_state_update: broadcast::Sender<()>, pub on_state_update: broadcast::Sender<()>,
pub on_submission: broadcast::Sender<()>, pub on_submission: broadcast::Sender<()>,
pub questions: Vec<Box<dyn Question>>, pub questions: Vec<Box<dyn Question>>,
} }
impl Game { impl Game {
pub fn new(id: String, owner: String, quiz: Quiz) -> Self { pub fn new(id: GameId, owner: String, quiz: Quiz) -> Self {
Self { Self {
id, id,
owner, owner,
@ -87,7 +149,7 @@ impl Game {
} }
} }
pub async fn player_view(&self, player_id: &str, htmx: bool) -> HandlerResult<String> { pub async fn player_view(&self, player_id: &PlayerId, htmx: bool) -> HandlerResult<String> {
if !self.players.contains(player_id) { if !self.players.contains(player_id) {
return Err(Error::PlayerNotFound); return Err(Error::PlayerNotFound);
} }
@ -134,7 +196,7 @@ impl Game {
pub async fn handle_answer( pub async fn handle_answer(
&mut self, &mut self,
player_id: &str, player_id: &PlayerId,
values: &HashMap<String, String>, values: &HashMap<String, String>,
) -> HandlerResult<()> { ) -> HandlerResult<()> {
if !self.players.contains(player_id) { if !self.players.contains(player_id) {
@ -145,7 +207,7 @@ impl Game {
if let GameState::Answering(i) = *state { if let GameState::Answering(i) = *state {
self.questions[i as usize] self.questions[i as usize]
.handle_answer(player_id, values) .handle_answer(player_id.clone(), values)
.await?; .await?;
self.on_submission.send(()); self.on_submission.send(());
@ -164,7 +226,7 @@ impl Game {
let viewer_state = match *state { let viewer_state = match *state {
GameState::NotStarted => { GameState::NotStarted => {
let url = format!("{}/{}", base_url, &self.id); let url = format!("{}/{}", base_url, &self.id.deref());
let img = QrCode::new(&url).expect(""); let img = QrCode::new(&url).expect("");
let img = img.render::<svg::Color>().build(); let img = img.render::<svg::Color>().build();
ViewerState::NotStarted((self.players.len() as u32, img, url)) ViewerState::NotStarted((self.players.len() as u32, img, url))

View file

@ -6,11 +6,11 @@ use std::{
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::game::Game; use crate::game::{Game, GameId};
pub fn start_gc( pub fn start_gc(
game_expiry: Arc<RwLock<BinaryHeap<GarbageCollectorItem>>>, game_expiry: Arc<RwLock<BinaryHeap<GarbageCollectorItem>>>,
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
) { ) {
let games = games.clone(); let games = games.clone();
let game_expiry = game_expiry.clone(); let game_expiry = game_expiry.clone();
@ -36,12 +36,12 @@ pub fn start_gc(
#[derive(PartialEq, Eq)] #[derive(PartialEq, Eq)]
pub struct GarbageCollectorItem { pub struct GarbageCollectorItem {
id: String, id: GameId,
expires_at: u64, expires_at: u64,
} }
impl GarbageCollectorItem { impl GarbageCollectorItem {
pub fn new_in(id: String, time: u64) -> Self { pub fn new_in(id: GameId, time: u64) -> Self {
Self { Self {
id, id,
expires_at: SystemTime::now() expires_at: SystemTime::now()

View file

@ -3,15 +3,14 @@
use std::{ use std::{
collections::{BinaryHeap, HashMap}, collections::{BinaryHeap, HashMap},
env, env,
ops::Deref,
sync::Arc, sync::Arc,
}; };
use axum::{ use axum::{
async_trait,
body::HttpBody,
error_handling::HandleErrorLayer, error_handling::HandleErrorLayer,
extract::{FromRef, Multipart, Path, Query, State}, extract::{Multipart, Path, Query, State},
http::{Request, StatusCode, Uri}, http::{StatusCode, Uri},
response::{ response::{
sse::{Event, KeepAlive}, sse::{Event, KeepAlive},
Html, IntoResponse, Redirect, Sse, Html, IntoResponse, Redirect, Sse,
@ -21,21 +20,19 @@ use axum::{
}; };
use axum_htmx::{HxRedirect, HxRequest}; use axum_htmx::{HxRedirect, HxRequest};
use axum_oidc::{ use axum_oidc::{
error::MiddlewareError, EmptyAdditionalClaims, OidcAuthLayer, OidcClaims, OidcClient, error::MiddlewareError, EmptyAdditionalClaims, OidcAuthLayer, OidcClaims, OidcLoginLayer,
OidcLoginLayer,
}; };
use futures_util::Stream; use futures_util::Stream;
use game::Game; use game::{Game, GameId, PlayerId};
use garbage_collector::{start_gc, GarbageCollectorItem}; use garbage_collector::{start_gc, GarbageCollectorItem};
use question::single_choice::SingleChoiceQuestion; use question::{single_choice::SingleChoiceQuestion, Question};
use rand::{distributions, Rng};
use sailfish::TemplateOnce; use sailfish::TemplateOnce;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use stream::{PlayerBroadcastStream, ViewerBroadcastStream}; use stream::{PlayerBroadcastStream, ViewerBroadcastStream};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tower::{Layer, ServiceBuilder}; use tower::ServiceBuilder;
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
use tower_sessions::{cookie::SameSite, Expiry, MemoryStore, SessionManagerLayer}; use tower_sessions::{cookie::SameSite, MemoryStore, SessionManagerLayer};
use crate::error::Error; use crate::error::Error;
@ -50,9 +47,9 @@ mod question;
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
game_expiry: Arc<RwLock<BinaryHeap<GarbageCollectorItem>>>, game_expiry: Arc<RwLock<BinaryHeap<GarbageCollectorItem>>>,
application_base: String, application_base: &'static str,
} }
#[tokio::main] #[tokio::main]
@ -108,7 +105,7 @@ pub async fn main() {
let app_state = AppState { let app_state = AppState {
games, games,
game_expiry, game_expiry,
application_base, application_base: Box::leak(application_base.into()),
}; };
let app = Router::new() let app = Router::new()
@ -147,11 +144,7 @@ pub async fn handle_create(
let quiz = quiz.ok_or(Error::QuizFileNotFound)?; let quiz = quiz.ok_or(Error::QuizFileNotFound)?;
let game_id: String = rand::thread_rng() let game_id = GameId::random();
.sample_iter(distributions::Alphanumeric)
.take(8)
.map(char::from)
.collect();
let game = Game::new(game_id.clone(), claims.subject().to_string(), quiz); let game = Game::new(game_id.clone(), claims.subject().to_string(), quiz);
@ -159,7 +152,7 @@ pub async fn handle_create(
games.insert(game_id.clone(), game); games.insert(game_id.clone(), game);
let url = format!("{}/{}/view", state.application_base, &game_id); let url = format!("{}/{}/view", state.application_base, &game_id.deref());
let mut game_expiry = state.game_expiry.write().await; let mut game_expiry = state.game_expiry.write().await;
game_expiry.push(GarbageCollectorItem::new_in(game_id, 24 * 3600)); game_expiry.push(GarbageCollectorItem::new_in(game_id, 24 * 3600));
@ -168,7 +161,7 @@ pub async fn handle_create(
} }
pub async fn handle_view( pub async fn handle_view(
Path(id): Path<String>, Path(id): Path<GameId>,
State(state): State<AppState>, State(state): State<AppState>,
HxRequest(htmx): HxRequest, HxRequest(htmx): HxRequest,
OidcClaims(claims): OidcClaims<EmptyAdditionalClaims>, OidcClaims(claims): OidcClaims<EmptyAdditionalClaims>,
@ -180,13 +173,12 @@ pub async fn handle_view(
return Err(Error::Forbidden); return Err(Error::Forbidden);
} }
Ok(Html(game.viewer_view(htmx, &state.application_base).await?)) Ok(Html(game.viewer_view(htmx, state.application_base).await?))
} }
pub async fn handle_view_next( pub async fn handle_view_next(
Path(id): Path<String>, Path(id): Path<GameId>,
State(state): State<AppState>, State(state): State<AppState>,
HxRequest(htmx): HxRequest,
OidcClaims(claims): OidcClaims<EmptyAdditionalClaims>, OidcClaims(claims): OidcClaims<EmptyAdditionalClaims>,
) -> HandlerResult<impl IntoResponse> { ) -> HandlerResult<impl IntoResponse> {
let mut games = state.games.write().await; let mut games = state.games.write().await;
@ -202,7 +194,7 @@ pub async fn handle_view_next(
} }
pub async fn sse_view( pub async fn sse_view(
Path(id): Path<String>, Path(id): Path<GameId>,
State(state): State<AppState>, State(state): State<AppState>,
OidcClaims(claims): OidcClaims<EmptyAdditionalClaims>, OidcClaims(claims): OidcClaims<EmptyAdditionalClaims>,
) -> HandlerResult<Sse<impl Stream<Item = Result<Event, Error>>>> { ) -> HandlerResult<Sse<impl Stream<Item = Result<Event, Error>>>> {
@ -216,13 +208,8 @@ pub async fn sse_view(
let rx1 = game.on_state_update.subscribe(); let rx1 = game.on_state_update.subscribe();
let rx2 = game.on_submission.subscribe(); let rx2 = game.on_submission.subscribe();
let stream = ViewerBroadcastStream::new( let stream =
rx1, ViewerBroadcastStream::new(rx1, rx2, state.games.clone(), id, state.application_base);
rx2,
state.games.clone(),
id,
state.application_base.clone(),
);
Ok(Sse::new(stream).keep_alive(KeepAlive::default())) Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
} }
@ -234,27 +221,25 @@ pub struct PlayerQuery {
pub async fn handle_player( pub async fn handle_player(
Query(query): Query<PlayerQuery>, Query(query): Query<PlayerQuery>,
Path(id): Path<String>, Path(id): Path<GameId>,
State(state): State<AppState>, State(state): State<AppState>,
HxRequest(htmx): HxRequest, HxRequest(htmx): HxRequest,
) -> HandlerResult<impl IntoResponse> { ) -> HandlerResult<impl IntoResponse> {
let mut games = state.games.write().await; let mut games = state.games.write().await;
let game = games.get_mut(&id).ok_or(Error::NotFound)?; let game = games.get_mut(&id).ok_or(Error::NotFound)?;
if let Some(player_id) = query.player { if let Some(player_id) = query.player.map(PlayerId::from) {
Ok(Html(game.player_view(&player_id, htmx).await?).into_response()) Ok(Html(game.player_view(&player_id, htmx).await?).into_response())
} else { } else {
let player_id: String = rand::thread_rng() let player_id = PlayerId::random();
.sample_iter(distributions::Alphanumeric) game.players.insert(player_id.clone());
.take(32)
.map(char::from)
.collect();
game.players.insert(player_id.to_string());
game.on_submission.send(()); game.on_submission.send(());
Ok(Redirect::temporary(&format!( Ok(Redirect::temporary(&format!(
"{}/{}?player={}", "{}/{}?player={}",
state.application_base, id, player_id state.application_base,
id.deref(),
player_id.deref()
)) ))
.into_response()) .into_response())
} }
@ -262,13 +247,13 @@ pub async fn handle_player(
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct SubmissionPayload { pub struct SubmissionPayload {
player_id: String, player_id: PlayerId,
#[serde(flatten)] #[serde(flatten)]
values: HashMap<String, String>, values: HashMap<String, String>,
} }
pub async fn handle_player_answer( pub async fn handle_player_answer(
Path(id): Path<String>, Path(id): Path<GameId>,
State(state): State<AppState>, State(state): State<AppState>,
Form(form): Form<SubmissionPayload>, Form(form): Form<SubmissionPayload>,
) -> HandlerResult<impl IntoResponse> { ) -> HandlerResult<impl IntoResponse> {
@ -282,12 +267,12 @@ pub async fn handle_player_answer(
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct SsePlayerQuery { pub struct SsePlayerQuery {
player: String, player: PlayerId,
} }
pub async fn sse_player( pub async fn sse_player(
Query(query): Query<SsePlayerQuery>, Query(query): Query<SsePlayerQuery>,
Path(id): Path<String>, Path(id): Path<GameId>,
State(state): State<AppState>, State(state): State<AppState>,
) -> HandlerResult<Sse<impl Stream<Item = Result<Event, Error>>>> { ) -> HandlerResult<Sse<impl Stream<Item = Result<Event, Error>>>> {
let games = state.games.read().await; let games = state.games.read().await;
@ -312,7 +297,6 @@ struct PlayTemplate<'a> {
state: PlayerState, state: PlayerState,
} }
#[derive(Clone)]
pub enum PlayerState { pub enum PlayerState {
NotStarted, NotStarted,
Answering { inner_body: String }, Answering { inner_body: String },
@ -329,7 +313,6 @@ struct ViewTemplate<'a> {
state: ViewerState, state: ViewerState,
} }
#[derive(Clone)]
pub enum ViewerState { pub enum ViewerState {
NotStarted((u32, String, String)), NotStarted((u32, String, String)),
Answering { Answering {
@ -357,30 +340,11 @@ pub enum QuizQuestion {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SingleChoice { pub struct SingleChoice {
name: String, name: Box<str>,
answers: Vec<String>, answers: Box<[Box<str>]>,
correct: u32, correct: u32,
} }
#[async_trait]
pub trait Question: Send + Sync {
async fn render_player(&self, player_id: &str, show_result: bool) -> Result<String, Error>;
async fn handle_answer(
&mut self,
player_id: &str,
values: &HashMap<String, String>,
) -> Result<(), Error>;
async fn has_answered(&self, player_id: &str) -> Result<bool, Error>;
async fn answered_correctly(&self, player_id: &str) -> Result<bool, Error>;
async fn answer_count(&self) -> Result<u32, Error>;
async fn render_viewer(&self, show_result: bool) -> Result<String, Error>;
}
impl From<QuizQuestion> for Box<dyn Question> { impl From<QuizQuestion> for Box<dyn Question> {
fn from(value: QuizQuestion) -> Self { fn from(value: QuizQuestion) -> Self {
match value { match value {

View file

@ -1 +1,27 @@
use std::collections::HashMap;
use axum::async_trait;
use crate::{error::Error, game::PlayerId};
pub mod single_choice; pub mod single_choice;
#[async_trait]
pub trait Question: Send + Sync {
async fn render_player(&self, player_id: &PlayerId, show_result: bool)
-> Result<String, Error>;
async fn handle_answer(
&mut self,
player_id: PlayerId,
values: &HashMap<String, String>,
) -> Result<(), Error>;
async fn has_answered(&self, player_id: &PlayerId) -> Result<bool, Error>;
async fn answered_correctly(&self, player_id: &PlayerId) -> Result<bool, Error>;
async fn answer_count(&self) -> Result<u32, Error>;
async fn render_viewer(&self, show_result: bool) -> Result<String, Error>;
}

View file

@ -3,11 +3,11 @@ use std::collections::HashMap;
use axum::async_trait; use axum::async_trait;
use sailfish::TemplateOnce; use sailfish::TemplateOnce;
use crate::{error::Error, Question, SingleChoice}; use crate::{error::Error, game::PlayerId, Question, SingleChoice};
pub struct SingleChoiceQuestion { pub struct SingleChoiceQuestion {
inner: SingleChoice, inner: SingleChoice,
submissions: HashMap<String, u32>, submissions: HashMap<PlayerId, u32>,
} }
impl SingleChoiceQuestion { impl SingleChoiceQuestion {
@ -21,12 +21,15 @@ impl SingleChoiceQuestion {
#[async_trait] #[async_trait]
impl Question for SingleChoiceQuestion { impl Question for SingleChoiceQuestion {
async fn render_player(&self, player_id: &str, show_result: bool) -> Result<String, Error> { async fn render_player(
&self,
player_id: &PlayerId,
show_result: bool,
) -> Result<String, Error> {
if show_result { if show_result {
let player_sub_index = self.submissions.get(player_id); let player_sub_index = self.submissions.get(player_id);
let player_sub_value = let player_sub_value = player_sub_index.map(|x| &*self.inner.answers[*x as usize]);
player_sub_index.map(|x| self.inner.answers[*x as usize].as_str());
Ok(ResultTemplate { Ok(ResultTemplate {
is_correct: Some(&self.inner.correct) == player_sub_index, is_correct: Some(&self.inner.correct) == player_sub_index,
@ -46,7 +49,7 @@ impl Question for SingleChoiceQuestion {
async fn handle_answer( async fn handle_answer(
&mut self, &mut self,
player_id: &str, player_id: PlayerId,
values: &HashMap<String, String>, values: &HashMap<String, String>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let value = values let value = values
@ -59,20 +62,20 @@ impl Question for SingleChoiceQuestion {
return Err(Error::InvalidInput); return Err(Error::InvalidInput);
} }
if self.submissions.contains_key(player_id) { if self.submissions.contains_key(&player_id) {
return Err(Error::QuestionAlreadySubmitted); return Err(Error::QuestionAlreadySubmitted);
} }
self.submissions.insert(player_id.to_string(), value); self.submissions.insert(player_id, value);
Ok(()) Ok(())
} }
async fn has_answered(&self, player_id: &str) -> Result<bool, Error> { async fn has_answered(&self, player_id: &PlayerId) -> Result<bool, Error> {
Ok(self.submissions.get(player_id).is_some()) Ok(self.submissions.get(player_id).is_some())
} }
async fn answered_correctly(&self, player_id: &str) -> Result<bool, Error> { async fn answered_correctly(&self, player_id: &PlayerId) -> Result<bool, Error> {
Ok(self Ok(self
.submissions .submissions
.get(player_id) .get(player_id)
@ -90,30 +93,34 @@ impl Question for SingleChoiceQuestion {
name: &self.inner.name, name: &self.inner.name,
total_submissions: self.submissions.len() as u32, total_submissions: self.submissions.len() as u32,
submissions: &self.submissions.iter().fold( submissions: &self.submissions.iter().fold(
vec![0; self.inner.answers.len()], vec![0; self.inner.answers.len()].into_boxed_slice(),
|mut acc, (_, v)| { |mut acc, (_, v)| {
acc[*v as usize] += 1; acc[*v as usize] += 1;
acc acc
}, },
), ),
submissions_correct: &self.submissions.iter().fold( submissions_correct: &self
vec![0; self.inner.answers.len()], .submissions
|mut acc, (_, v)| { .iter()
if *v == self.inner.correct { .filter(|(_, v)| **v == self.inner.correct)
.fold(
vec![0; self.inner.answers.len()].into_boxed_slice(),
|mut acc, (_, v)| {
acc[*v as usize] += 1; acc[*v as usize] += 1;
} acc
acc },
}, ),
), submissions_wrong: &self
submissions_wrong: &self.submissions.iter().fold( .submissions
vec![0; self.inner.answers.len()], .iter()
|mut acc, (_, v)| { .filter(|(_, v)| **v != self.inner.correct)
if *v != self.inner.correct { .fold(
vec![0; self.inner.answers.len()].into_boxed_slice(),
|mut acc, (_, v)| {
acc[*v as usize] += 1; acc[*v as usize] += 1;
} acc
acc },
}, ),
),
correct_answer: &self.inner.answers[self.inner.correct as usize], correct_answer: &self.inner.answers[self.inner.correct as usize],
answers: &self.inner.answers, answers: &self.inner.answers,
} }
@ -126,7 +133,7 @@ impl Question for SingleChoiceQuestion {
struct PlayerTemplate<'a> { struct PlayerTemplate<'a> {
name: &'a str, name: &'a str,
player_id: &'a str, player_id: &'a str,
answers: &'a [String], answers: &'a [Box<str>],
} }
#[derive(TemplateOnce)] #[derive(TemplateOnce)]
@ -147,5 +154,5 @@ struct ViewerTemplate<'a> {
submissions_correct: &'a [u32], submissions_correct: &'a [u32],
submissions_wrong: &'a [u32], submissions_wrong: &'a [u32],
correct_answer: &'a str, correct_answer: &'a str,
answers: &'a [String], answers: &'a [Box<str>],
} }

View file

@ -15,43 +15,46 @@ use tokio::{
}; };
use tokio_util::sync::ReusableBoxFuture; use tokio_util::sync::ReusableBoxFuture;
use crate::{error::Error, game::Game}; use crate::{
error::Error,
game::{Game, GameId, PlayerId},
};
pub struct PlayerBroadcastStream { pub struct PlayerBroadcastStream {
id: String, id: GameId,
player_id: String, player_id: PlayerId,
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
inner: ReusableBoxFuture<'static, (Result<Result<Event, Error>, RecvError>, Receiver<()>)>, inner: ReusableBoxFuture<'static, (Result<Result<Event, Error>, RecvError>, Receiver<()>)>,
} }
impl PlayerBroadcastStream { impl PlayerBroadcastStream {
async fn make_future( async fn make_future(
mut rx: Receiver<()>, mut rx: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
id: String, id: GameId,
player_id: String, player_id: PlayerId,
) -> (Result<Result<Event, Error>, RecvError>, Receiver<()>) { ) -> (Result<Result<Event, Error>, RecvError>, Receiver<()>) {
let result = match rx.recv().await { let result = match rx.recv().await {
Ok(_) => Ok(Self::build_template(games, id, player_id).await), Ok(_) => Ok(Self::build_template(games, &id, &player_id).await),
Err(e) => Err(e), Err(e) => Err(e),
}; };
(result, rx) (result, rx)
} }
async fn build_template( async fn build_template(
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
id: String, id: &GameId,
player_id: String, player_id: &PlayerId,
) -> Result<Event, Error> { ) -> Result<Event, Error> {
let games = games.read().await; let games = games.read().await;
let game = games.get(&id).ok_or(Error::NotFound)?; let game = games.get(id).ok_or(Error::NotFound)?;
Ok(Event::default().data(game.player_view(&player_id, true).await?)) Ok(Event::default().data(game.player_view(player_id, true).await?))
} }
pub fn new( pub fn new(
recv: Receiver<()>, recv: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
id: String, id: GameId,
player_id: String, player_id: PlayerId,
) -> Self { ) -> Self {
Self { Self {
inner: ReusableBoxFuture::new(Self::make_future( inner: ReusableBoxFuture::new(Self::make_future(
@ -91,8 +94,8 @@ impl Stream for PlayerBroadcastStream {
} }
pub struct ViewerBroadcastStream { pub struct ViewerBroadcastStream {
id: String, id: GameId,
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
inner: ReusableBoxFuture< inner: ReusableBoxFuture<
'static, 'static,
( (
@ -102,16 +105,16 @@ pub struct ViewerBroadcastStream {
), ),
>, >,
base_url: String, base_url: &'static str,
} }
impl ViewerBroadcastStream { impl ViewerBroadcastStream {
async fn make_future( async fn make_future(
mut rx1: Receiver<()>, mut rx1: Receiver<()>,
mut rx2: Receiver<()>, mut rx2: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
id: String, id: GameId,
base_url: String, base_url: &'static str,
) -> ( ) -> (
Result<Result<Event, Error>, RecvError>, Result<Result<Event, Error>, RecvError>,
Receiver<()>, Receiver<()>,
@ -121,27 +124,27 @@ impl ViewerBroadcastStream {
a = rx1.recv() => a, a = rx1.recv() => a,
b = rx2.recv() => b b = rx2.recv() => b
} { } {
Ok(_) => Ok(Self::build_template(games, id, base_url).await), Ok(_) => Ok(Self::build_template(games, &id, base_url).await),
Err(e) => Err(e), Err(e) => Err(e),
}; };
(result, rx1, rx2) (result, rx1, rx2)
} }
async fn build_template( async fn build_template(
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
id: String, id: &GameId,
base_url: String, base_url: &'static str,
) -> Result<Event, Error> { ) -> Result<Event, Error> {
let games = games.read().await; let games = games.read().await;
let game = games.get(&id).ok_or(Error::NotFound)?; let game = games.get(id).ok_or(Error::NotFound)?;
Ok(Event::default().data(game.viewer_view(true, &base_url).await?)) Ok(Event::default().data(game.viewer_view(true, base_url).await?))
} }
pub fn new( pub fn new(
rx1: Receiver<()>, rx1: Receiver<()>,
rx2: Receiver<()>, rx2: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>, games: Arc<RwLock<HashMap<GameId, Game>>>,
id: String, id: GameId,
base_url: String, base_url: &'static str,
) -> Self { ) -> Self {
Self { Self {
inner: ReusableBoxFuture::new(Self::make_future( inner: ReusableBoxFuture::new(Self::make_future(
@ -149,7 +152,7 @@ impl ViewerBroadcastStream {
rx2, rx2,
games.clone(), games.clone(),
id.clone(), id.clone(),
base_url.clone(), base_url,
)), )),
games, games,
id, id,
@ -166,13 +169,8 @@ impl Stream for ViewerBroadcastStream {
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> { ) -> std::task::Poll<Option<Self::Item>> {
let (result, rx1, rx2) = ready!(self.inner.poll(cx)); let (result, rx1, rx2) = ready!(self.inner.poll(cx));
let future = Self::make_future( let future =
rx1, Self::make_future(rx1, rx2, self.games.clone(), self.id.clone(), self.base_url);
rx2,
self.games.clone(),
self.id.clone(),
self.base_url.clone(),
);
self.inner.set(future); self.inner.set(future);
match result { match result {
Ok(item) => Poll::Ready(Some(item)), Ok(item) => Poll::Ready(Some(item)),