This commit is contained in:
Paul Zinselmeyer 2023-11-01 21:12:18 +01:00
commit d866e4faaa
Signed by: pfzetto
GPG key ID: 4EEF46A5B276E648
19 changed files with 4078 additions and 0 deletions

68
src/error.rs Normal file
View file

@ -0,0 +1,68 @@
use axum::{
extract::multipart::MultipartError,
http::{uri::InvalidUri, StatusCode},
response::IntoResponse,
};
use log::error;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("render error: {0:?}")]
Render(#[from] sailfish::RenderError),
#[error("invalid uri: {0:?}")]
InvalidUri(#[from] InvalidUri),
#[error("game not found")]
NotFound,
#[error("game broken")]
GameBroken,
#[error("game already started")]
GameAlreadyStarted,
#[error("field already submitted")]
FieldAlreadySubmitted,
#[error("player not found")]
PlayerNotFound,
#[error("stream lagged")]
StreamLagged(u64),
#[error("multipart: {0:?}")]
Multipart(#[from] MultipartError),
#[error("quizfile not found")]
QuizFileNotFound,
#[error("toml")]
Toml(#[from] toml::de::Error),
#[error("forbidden")]
Forbidden,
}
impl IntoResponse for Error {
fn into_response(self) -> axum::response::Response {
match self {
Self::Forbidden => (StatusCode::FORBIDDEN, "forbidden").into_response(),
Self::Toml(_) => (StatusCode::OK, "invalid toml syntax").into_response(),
Self::QuizFileNotFound => (StatusCode::OK, "quizfile not found").into_response(),
Self::PlayerNotFound => (StatusCode::BAD_REQUEST, "player not found").into_response(),
Self::FieldAlreadySubmitted => {
(StatusCode::BAD_REQUEST, "field already submitted").into_response()
}
Self::GameAlreadyStarted => {
(StatusCode::BAD_REQUEST, "game already started").into_response()
}
Self::NotFound => (StatusCode::NOT_FOUND, "game not found").into_response(),
_ => {
error!("{:?}", self);
(StatusCode::INTERNAL_SERVER_ERROR, "internal server error").into_response()
}
}
}
}

199
src/game.rs Normal file
View file

@ -0,0 +1,199 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use qrcode::{render::svg, QrCode};
use sailfish::TemplateOnce;
use tokio::{
select,
sync::{broadcast, RwLock},
};
use crate::{
error::Error, HandlerResult, PlayTemplate, PlayerState, Quiz, ViewTemplate, ViewerState,
};
pub struct Game {
pub id: String,
pub owner: String,
pub state: Arc<RwLock<GameState>>,
pub quiz: Quiz,
pub players: HashMap<String, Player>,
pub on_state_update: broadcast::Sender<()>,
pub on_submission: broadcast::Sender<()>,
}
impl Game {
pub fn new(id: String, owner: String, quiz: Quiz) -> Self {
Self {
id,
owner,
quiz,
state: Arc::new(RwLock::new(GameState::NotStarted)),
players: HashMap::new(),
on_state_update: broadcast::channel(16).0,
on_submission: broadcast::channel(16).0,
}
}
pub fn submissions(&self, field: u32) -> Vec<u32> {
let field = field as usize;
self.players.iter().fold(
vec![0; self.quiz.fields[field].answers.len()],
|mut pacc, p| {
if p.1.submissions.len() > field {
if let Some(Some(submission)) = p.1.submissions.get(field) {
pacc[*submission as usize] += 1;
}
}
pacc
},
)
}
pub async fn next(&mut self) {
let mut state = self.state.write().await;
// game state machine
*state = match *state {
GameState::NotStarted => GameState::Answering(0),
GameState::Answering(field) => GameState::Result(field),
GameState::Result(field) if (field as usize) + 1 < self.quiz.fields.len() => {
GameState::Answering(field + 1)
}
GameState::Result(_) => GameState::Completed,
GameState::Completed => GameState::Completed,
};
self.on_state_update.send(());
// automatically show results after x seconds, cancel when state is changed otherwise
if let GameState::Answering(i) = *state {
let state = self.state.clone();
let state_update_tx = self.on_state_update.clone();
let mut state_update_rx = self.on_state_update.subscribe();
let wait_for = self.quiz.wait_for;
tokio::spawn(async move {
select! {
_ = tokio::time::sleep(Duration::from_secs(wait_for)) => {
let mut state = state.write().await;
*state = GameState::Result(i);
state_update_tx.send(());
},
_ = state_update_rx.recv() => {}
}
});
}
}
pub async fn player_view(&self, player_id: &str, htmx: bool) -> HandlerResult<String> {
let player = self.players.get(player_id).ok_or(Error::PlayerNotFound)?;
let player_position: u32 = player.submissions.len() as u32;
let state = self.state.read().await;
let player_state = match *state {
GameState::NotStarted => PlayerState::NotStarted,
GameState::Answering(i) if player_position <= i => {
PlayerState::Answering((i, &self.quiz.fields[i as usize]))
}
GameState::Answering(i) => PlayerState::Waiting(i),
GameState::Result(i) => PlayerState::Result((
&self.quiz.fields[i as usize],
player.submissions.get(i as usize).and_then(|x| *x),
)),
GameState::Completed => PlayerState::Completed(
player
.submissions
.iter()
.enumerate()
.fold(0, |acc, (i, e)| {
match *e == Some(self.quiz.fields[i].correct) {
true => acc + 1,
false => acc,
}
}) as f32
/ (self.quiz.fields.len() as f32),
),
};
Ok(PlayTemplate {
htmx,
id: &self.id,
player_id,
state: player_state,
}
.render_once()?)
}
pub async fn handle_submission(&mut self, player_id: &str, value: u32) -> HandlerResult<()> {
let player = self
.players
.get_mut(player_id)
.ok_or(Error::PlayerNotFound)?;
let player_position = player.submissions.len();
let state = self.state.read().await;
match *state {
GameState::Answering(i) if (player_position as u32) <= i => {
if (value as usize) < self.quiz.fields[i as usize].answers.len() {
player
.submissions
.append(&mut vec![None; (i as usize) - player_position]);
player.submissions.push(Some(value));
self.on_submission.send(());
if self.submissions(i).iter().sum::<u32>() as usize == self.players.len() {
drop(state);
self.next().await;
}
}
}
_ => {}
}
Ok(())
}
pub async fn viewer_view(&self, htmx: bool, base_url: &str) -> HandlerResult<String> {
let state = self.state.read().await;
let viewer_state = match *state {
GameState::NotStarted => {
let url = format!("{}/{}", base_url, &self.id);
let img = QrCode::new(&url).expect("");
let img = img.render::<svg::Color>().build();
ViewerState::NotStarted((self.players.len() as u32, img, url))
}
GameState::Answering(i) => {
ViewerState::Answering((i, &self.quiz.fields[i as usize], self.submissions(i)))
}
GameState::Result(i) => {
ViewerState::Result((i, &self.quiz.fields[i as usize], self.submissions(i)))
}
GameState::Completed => ViewerState::Completed,
};
Ok(ViewTemplate {
htmx,
id: &self.id,
quiz: &self.quiz,
state: viewer_state,
}
.render_once()?)
}
}
#[derive(Debug, Default)]
pub struct Player {
pub submissions: Vec<Option<u32>>,
}
#[derive(Clone, Debug)]
pub enum GameState {
NotStarted,
Answering(u32),
Result(u32),
Completed,
}

65
src/garbage_collector.rs Normal file
View file

@ -0,0 +1,65 @@
use std::{
collections::{BinaryHeap, HashMap},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::sync::RwLock;
use crate::game::Game;
pub fn start_gc(
game_expiry: Arc<RwLock<BinaryHeap<GarbageCollectorItem>>>,
games: Arc<RwLock<HashMap<String, Game>>>,
) {
let games = games.clone();
let game_expiry = game_expiry.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(3600)).await;
if let Ok(now) = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|y| y.as_secs())
{
let mut game_expiry = game_expiry.write().await;
while let Some(gc) = game_expiry.peek() {
if gc.expires_at > now {
break;
} else if let Some(gc) = game_expiry.pop() {
let mut games = games.write().await;
games.remove(&gc.id);
}
}
}
});
}
#[derive(PartialEq, Eq)]
pub struct GarbageCollectorItem {
id: String,
expires_at: u64,
}
impl GarbageCollectorItem {
pub fn new_in(id: String, time: u64) -> Self {
Self {
id,
expires_at: SystemTime::now()
.checked_add(Duration::from_secs(time))
.and_then(|x| x.duration_since(UNIX_EPOCH).map(|y| y.as_secs()).ok())
.unwrap_or_default(),
}
}
}
impl PartialOrd for GarbageCollectorItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for GarbageCollectorItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.expires_at.cmp(&other.expires_at).reverse()
}
}

337
src/main.rs Normal file
View file

@ -0,0 +1,337 @@
#![deny(clippy::unwrap_used)]
use std::{
collections::{BinaryHeap, HashMap},
env,
sync::Arc,
};
use axum::{
extract::{FromRef, Multipart, Path, Query, State},
http::Uri,
response::{
sse::{Event, KeepAlive},
Html, IntoResponse, Redirect, Sse,
},
routing::get,
Form, Router,
};
use axum_htmx::{HxRedirect, HxRequest};
use axum_oidc::oidc::{self, EmptyAdditionalClaims, OidcApplication, OidcExtractor};
use futures_util::Stream;
use game::{Game, Player};
use garbage_collector::{start_gc, GarbageCollectorItem};
use rand::{distributions, Rng};
use sailfish::TemplateOnce;
use serde::{Deserialize, Serialize};
use stream::{PlayerBroadcastStream, ViewerBroadcastStream};
use tokio::sync::RwLock;
use tower_http::services::ServeDir;
use crate::error::Error;
type HandlerResult<T> = Result<T, Error>;
mod error;
mod game;
mod garbage_collector;
mod stream;
#[derive(Clone)]
pub struct AppState {
games: Arc<RwLock<HashMap<String, Game>>>,
game_expiry: Arc<RwLock<BinaryHeap<GarbageCollectorItem>>>,
oidc_application: OidcApplication<EmptyAdditionalClaims>,
application_base: String,
}
impl FromRef<AppState> for OidcApplication<EmptyAdditionalClaims> {
fn from_ref(input: &AppState) -> Self {
input.oidc_application.clone()
}
}
#[tokio::main]
pub async fn main() {
dotenvy::dotenv().ok();
env_logger::init();
let application_base = env::var("APPLICATION_BASE").expect("APPLICATION_BASE env var");
let issuer = env::var("ISSUER").expect("ISSUER env var");
let client_id = env::var("CLIENT_ID").expect("CLIENT_ID env var");
let client_secret = env::var("CLIENT_SECRET").ok();
let scopes = env::var("SCOPES")
.expect("SCOPES env var")
.split(' ')
.map(|x| x.to_owned())
.collect::<Vec<_>>();
let oidc_application = OidcApplication::<EmptyAdditionalClaims>::create(
application_base
.parse()
.expect("valid APPLICATION_BASE url"),
issuer.to_string(),
client_id.to_string(),
client_secret.to_owned(),
scopes.clone(),
oidc::Key::generate(),
)
.await
.expect("Oidc Authentication Client");
let game_expiry: Arc<RwLock<BinaryHeap<GarbageCollectorItem>>> =
Arc::new(RwLock::new(BinaryHeap::new()));
let games = Arc::new(RwLock::new(HashMap::new()));
start_gc(game_expiry.clone(), games.clone());
let app_state = AppState {
games,
game_expiry,
oidc_application,
application_base,
};
let app = Router::new()
.route("/", get(handle_index).post(handle_create))
.route("/:id", get(handle_play).post(handle_play_submission))
.route("/:id/events", get(sse_play))
.route("/:id/view", get(handle_view).post(handle_view_next))
.route("/:id/view/events", get(sse_view))
.nest_service("/static", ServeDir::new("static"))
.with_state(app_state);
axum::Server::bind(&"[::]:8080".parse().expect("valid listen address"))
.serve(app.into_make_service())
.await
.expect("axum server");
}
pub async fn handle_index(
oidc_extractor: OidcExtractor<EmptyAdditionalClaims>,
) -> HandlerResult<impl IntoResponse> {
Ok(Html(IndexTemplate {}.render_once()?))
}
pub async fn handle_create(
State(state): State<AppState>,
oidc_extractor: OidcExtractor<EmptyAdditionalClaims>,
mut body: Multipart,
) -> HandlerResult<impl IntoResponse> {
let mut quiz: Option<Quiz> = None;
while let Some(field) = body.next_field().await? {
if field.name() == Some("quizfile") {
quiz = Some(toml::from_str::<Quiz>(&field.text().await?)?);
}
}
let quiz = quiz.ok_or(Error::QuizFileNotFound)?;
let game_id: String = rand::thread_rng()
.sample_iter(distributions::Alphanumeric)
.take(16)
.map(char::from)
.collect();
let game = Game::new(
game_id.clone(),
oidc_extractor.claims.subject().to_string(),
quiz,
);
let mut games = state.games.write().await;
games.insert(game_id.clone(), game);
let url = format!("{}/{}/view", state.application_base, &game_id);
let mut game_expiry = state.game_expiry.write().await;
game_expiry.push(GarbageCollectorItem::new_in(game_id, 24 * 3600));
Ok((HxRedirect(Uri::from_maybe_shared(url.clone())?), "Ok"))
}
pub async fn handle_view(
Path(id): Path<String>,
State(state): State<AppState>,
HxRequest(htmx): HxRequest,
oidc_extractor: OidcExtractor<EmptyAdditionalClaims>,
) -> HandlerResult<impl IntoResponse> {
let games = state.games.read().await;
let game = games.get(&id).ok_or(Error::NotFound)?;
if game.owner != oidc_extractor.claims.subject().to_string() {
return Err(Error::Forbidden);
}
Ok(Html(game.viewer_view(htmx, &state.application_base).await?))
}
pub async fn handle_view_next(
Path(id): Path<String>,
State(state): State<AppState>,
HxRequest(htmx): HxRequest,
oidc_extractor: OidcExtractor<EmptyAdditionalClaims>,
) -> HandlerResult<impl IntoResponse> {
let mut games = state.games.write().await;
let game = games.get_mut(&id).ok_or(Error::NotFound)?;
if game.owner != oidc_extractor.claims.subject().to_string() {
return Err(Error::Forbidden);
}
game.next().await;
Ok("Ok".into_response())
}
pub async fn sse_view(
Path(id): Path<String>,
State(state): State<AppState>,
oidc_extractor: OidcExtractor<EmptyAdditionalClaims>,
) -> HandlerResult<Sse<impl Stream<Item = Result<Event, Error>>>> {
let games = state.games.read().await;
let game = games.get(&id).ok_or(Error::NotFound)?;
if game.owner != oidc_extractor.claims.subject().to_string() {
return Err(Error::Forbidden);
}
let rx1 = game.on_state_update.subscribe();
let rx2 = game.on_submission.subscribe();
let stream = ViewerBroadcastStream::new(
rx1,
rx2,
state.games.clone(),
id,
state.application_base.clone(),
);
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}
#[derive(Deserialize)]
pub struct PlayerQuery {
player: Option<String>,
}
pub async fn handle_play(
Query(query): Query<PlayerQuery>,
Path(id): Path<String>,
State(state): State<AppState>,
HxRequest(htmx): HxRequest,
) -> HandlerResult<impl IntoResponse> {
let mut games = state.games.write().await;
let game = games.get_mut(&id).ok_or(Error::NotFound)?;
if let Some(player_id) = query.player {
Ok(Html(game.player_view(&player_id, htmx).await?).into_response())
} else {
let player_id: String = rand::thread_rng()
.sample_iter(distributions::Alphanumeric)
.take(32)
.map(char::from)
.collect();
game.players
.insert(player_id.to_string(), Player::default());
game.on_submission.send(());
Ok(Redirect::temporary(&format!(
"{}/{}?player={}",
state.application_base, id, player_id
))
.into_response())
}
}
#[derive(Deserialize)]
pub struct SubmissionPayload {
selected: u32,
player_id: String,
}
pub async fn handle_play_submission(
Path(id): Path<String>,
State(state): State<AppState>,
Form(form): Form<SubmissionPayload>,
) -> HandlerResult<impl IntoResponse> {
let mut games = state.games.write().await;
let game = games.get_mut(&id).ok_or(Error::NotFound)?;
game.handle_submission(&form.player_id, form.selected)
.await?;
Ok(Html(game.player_view(&form.player_id, true).await?))
}
#[derive(Deserialize)]
pub struct SsePlayerQuery {
player: String,
}
pub async fn sse_play(
Query(query): Query<SsePlayerQuery>,
Path(id): Path<String>,
State(state): State<AppState>,
) -> HandlerResult<Sse<impl Stream<Item = Result<Event, Error>>>> {
let games = state.games.read().await;
let game = games.get(&id).ok_or(Error::NotFound)?;
let rx = game.on_state_update.subscribe();
let stream = PlayerBroadcastStream::new(rx, state.games.clone(), id, query.player);
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}
#[derive(TemplateOnce)]
#[template(path = "index.stpl")]
struct IndexTemplate {}
#[derive(TemplateOnce)]
#[template(path = "play.stpl")]
struct PlayTemplate<'a> {
htmx: bool,
id: &'a str,
player_id: &'a str,
state: PlayerState<'a>,
}
#[derive(Clone)]
pub enum PlayerState<'a> {
NotStarted,
Answering((u32, &'a SingleChoice)),
Waiting(u32),
Result((&'a SingleChoice, Option<u32>)),
Completed(f32),
}
#[derive(TemplateOnce)]
#[template(path = "view.stpl")]
struct ViewTemplate<'a> {
htmx: bool,
id: &'a str,
quiz: &'a Quiz,
state: ViewerState<'a>,
}
#[derive(Clone)]
pub enum ViewerState<'a> {
NotStarted((u32, String, String)),
Answering((u32, &'a SingleChoice, Vec<u32>)),
Result((u32, &'a SingleChoice, Vec<u32>)),
Completed,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Quiz {
pub wait_for: u64,
pub fields: Vec<SingleChoice>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SingleChoice {
name: String,
answers: Vec<String>,
correct: u32,
}

183
src/stream.rs Normal file
View file

@ -0,0 +1,183 @@
use std::{
collections::HashMap,
sync::Arc,
task::{ready, Poll},
};
use axum::response::sse::Event;
use futures_util::Stream;
use tokio::{
select,
sync::{
broadcast::{error::RecvError, Receiver},
RwLock,
},
};
use tokio_util::sync::ReusableBoxFuture;
use crate::{error::Error, game::Game};
pub struct PlayerBroadcastStream {
id: String,
player_id: String,
games: Arc<RwLock<HashMap<String, Game>>>,
inner: ReusableBoxFuture<'static, (Result<Result<Event, Error>, RecvError>, Receiver<()>)>,
}
impl PlayerBroadcastStream {
async fn make_future(
mut rx: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>,
id: String,
player_id: String,
) -> (Result<Result<Event, Error>, RecvError>, Receiver<()>) {
let result = match rx.recv().await {
Ok(_) => Ok(Self::build_template(games, id, player_id).await),
Err(e) => Err(e),
};
(result, rx)
}
async fn build_template(
games: Arc<RwLock<HashMap<String, Game>>>,
id: String,
player_id: String,
) -> Result<Event, Error> {
let games = games.read().await;
let game = games.get(&id).ok_or(Error::NotFound)?;
Ok(Event::default().data(game.player_view(&player_id, true).await?))
}
pub fn new(
recv: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>,
id: String,
player_id: String,
) -> Self {
Self {
inner: ReusableBoxFuture::new(Self::make_future(
recv,
games.clone(),
id.clone(),
player_id.clone(),
)),
games,
id,
player_id,
}
}
}
impl Stream for PlayerBroadcastStream {
type Item = Result<Event, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let (result, rx) = ready!(self.inner.poll(cx));
let future = Self::make_future(
rx,
self.games.clone(),
self.id.clone(),
self.player_id.clone(),
);
self.inner.set(future);
match result {
Ok(item) => Poll::Ready(Some(item)),
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(n)) => Poll::Ready(Some(Err(Error::StreamLagged(n)))),
}
}
}
pub struct ViewerBroadcastStream {
id: String,
games: Arc<RwLock<HashMap<String, Game>>>,
inner: ReusableBoxFuture<
'static,
(
Result<Result<Event, Error>, RecvError>,
Receiver<()>,
Receiver<()>,
),
>,
base_url: String,
}
impl ViewerBroadcastStream {
async fn make_future(
mut rx1: Receiver<()>,
mut rx2: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>,
id: String,
base_url: String,
) -> (
Result<Result<Event, Error>, RecvError>,
Receiver<()>,
Receiver<()>,
) {
let result = match select! {
a = rx1.recv() => a,
b = rx2.recv() => b
} {
Ok(_) => Ok(Self::build_template(games, id, base_url).await),
Err(e) => Err(e),
};
(result, rx1, rx2)
}
async fn build_template(
games: Arc<RwLock<HashMap<String, Game>>>,
id: String,
base_url: String,
) -> Result<Event, Error> {
let games = games.read().await;
let game = games.get(&id).ok_or(Error::NotFound)?;
Ok(Event::default().data(game.viewer_view(true, &base_url).await?))
}
pub fn new(
rx1: Receiver<()>,
rx2: Receiver<()>,
games: Arc<RwLock<HashMap<String, Game>>>,
id: String,
base_url: String,
) -> Self {
Self {
inner: ReusableBoxFuture::new(Self::make_future(
rx1,
rx2,
games.clone(),
id.clone(),
base_url.clone(),
)),
games,
id,
base_url,
}
}
}
impl Stream for ViewerBroadcastStream {
type Item = Result<Event, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let (result, rx1, rx2) = ready!(self.inner.poll(cx));
let future = Self::make_future(
rx1,
rx2,
self.games.clone(),
self.id.clone(),
self.base_url.clone(),
);
self.inner.set(future);
match result {
Ok(item) => Poll::Ready(Some(item)),
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(n)) => Poll::Ready(Some(Err(Error::StreamLagged(n)))),
}
}
}