add prometheus metrics endpoint

This commit is contained in:
Paul Zinselmeyer 2023-11-09 22:28:04 +01:00
parent 8e7ad058f6
commit 585a8a40b8
5 changed files with 140 additions and 28 deletions

View file

@ -23,6 +23,7 @@ log = "0.4"
env_logger = "0.10"
sailfish = "0.8.3"
tower-http = { version="0.4.4", features=["fs"], default-features=false }
prometheus-client = "0.22.0"
chacha20 = "0.9"
sha3 = "0.10"

View file

@ -40,7 +40,10 @@ pub enum Error {
Forbidden,
#[error("render error: {0:?}")]
Tempalte(#[from] sailfish::RenderError),
Template(#[from] sailfish::RenderError),
#[error("prometheus: {0:?}")]
Prometheus(std::fmt::Error),
}
impl IntoResponse for Error {

View file

@ -7,16 +7,17 @@ use std::{
use log::{debug, info, warn};
use tokio::{fs, sync::Mutex};
use crate::{metadata::Metadata, util::Id};
use crate::{metadata::Metadata, util::Id, AppMetrics, BinDownloadLabels};
#[derive(Clone)]
pub(crate) struct GarbageCollector {
heap: Arc<Mutex<BinaryHeap<GarbageCollectorItem>>>,
path: String,
metrics: Arc<AppMetrics>,
}
impl GarbageCollector {
pub(crate) async fn new(path: String) -> Self {
pub(crate) async fn new(path: String, metrics: Arc<AppMetrics>) -> Self {
let mut heap = BinaryHeap::new();
let mut dir = fs::read_dir(&path).await.expect("readable data dir");
while let Ok(Some(entry)) = dir.next_entry().await {
@ -24,6 +25,13 @@ impl GarbageCollector {
let file_name = file_name.to_string_lossy();
if let Some(id) = file_name.strip_suffix(".toml") {
if let Ok(metadata) = Metadata::from_file(&format!("./data/{}.toml", id)).await {
metrics.bin_total.inc();
metrics
.bin_data_size
.inc_by(metadata.size.map(|x| x as i64).unwrap_or_default());
let _ = metrics.bin_downloads.get_or_create(&BinDownloadLabels {
bin: Id::from_str(id),
});
heap.push(GarbageCollectorItem {
expires_at: metadata.expires_at,
id: Id::from_str(id),
@ -34,6 +42,7 @@ impl GarbageCollector {
Self {
heap: Arc::new(Mutex::new(heap)),
path,
metrics,
}
}
@ -71,6 +80,11 @@ impl GarbageCollector {
if res_meta.is_err() || res_data.is_err() {
warn!("failed to delete bin {} for gc", item.id);
}
self.metrics.bin_total.dec();
self.metrics
.bin_data_size
.dec_by(metadata.size.map(|x| x as i64).unwrap_or_default());
}
} else {
info!("cant open metadata file for bin {} for gc", item.id);

View file

@ -1,10 +1,16 @@
#![deny(clippy::unwrap_used)]
use duration_str::{deserialize_option_duration, parse_std};
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram},
registry::{Registry, Unit},
};
use sailfish::TemplateOnce;
use std::{
borrow::Borrow,
env,
str::FromStr,
sync::{atomic::AtomicU64, Arc},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tower_http::services::ServeDir;
@ -12,12 +18,11 @@ use tower_http::services::ServeDir;
use axum::{
async_trait,
body::{HttpBody, StreamBody},
debug_handler,
extract::{BodyStream, FromRef, FromRequest, Multipart, Path, Query, State},
headers::{ContentType, Range},
http::{
header::{self, CONTENT_TYPE},
HeaderMap, Request, StatusCode,
HeaderMap, HeaderValue, Request, StatusCode,
},
response::{Html, IntoResponse, Redirect, Response},
routing::get,
@ -35,7 +40,6 @@ use chacha20::{
use futures_util::StreamExt;
use garbage_collector::GarbageCollector;
use log::{debug, warn};
use render::{html, raw};
use serde::Deserialize;
use sha3::{Digest, Sha3_256};
use tokio::{
@ -71,6 +75,20 @@ pub struct AppState {
key_salt: KeySalt,
id_salt: IdSalt,
garbage_collector: GarbageCollector,
prometheus_registry: Arc<Registry>,
metrics: Arc<AppMetrics>,
}
#[derive(Clone, Default)]
pub struct AppMetrics {
bin_total: Gauge,
bin_data_size: Gauge,
bin_downloads: Family<BinDownloadLabels, Counter>,
}
#[derive(EncodeLabelSet, PartialEq, Eq, Hash, Clone, Debug)]
pub struct BinDownloadLabels {
bin: Id,
}
impl FromRef<AppState> for OidcApplication<EmptyAdditionalClaims> {
@ -128,7 +146,30 @@ async fn main() {
let data_path = env::var("DATA_PATH").expect("DATA_PATH env var");
let garbage_collector = GarbageCollector::new(data_path.clone()).await;
let mut registry = Registry::default();
let app_metrics = Arc::new(AppMetrics::default());
registry.register(
"bin_total",
"total number of stored bins",
app_metrics.bin_total.clone(),
);
registry.register(
"bin_downloads",
"number of times bins got accessed",
app_metrics.bin_downloads.clone(),
);
registry.register_with_unit(
"bin_data_size",
"combined size of all bins",
Unit::Bytes,
app_metrics.bin_data_size.clone(),
);
let garbage_collector = GarbageCollector::new(data_path.clone(), app_metrics.clone()).await;
garbage_collector.spawn();
let state: AppState = AppState {
@ -141,6 +182,8 @@ async fn main() {
id_salt: IdSalt::from_str(&env::var("ID_SALT").expect("ID_SALT env var"))
.expect("ID_SALT valid hex"),
garbage_collector,
prometheus_registry: Arc::new(registry),
metrics: app_metrics,
};
// when the two salts are identical, the bin id and the bin key are also identical, this would
@ -157,6 +200,7 @@ async fn main() {
.delete(delete_bin),
)
.route("/:id/delete", get(delete_bin_interactive).post(delete_bin))
.route("/metrics", get(metrics))
.nest_service("/static", ServeDir::new("static"))
.with_state(state);
axum::Server::bind(&"[::]:8080".parse().expect("valid listen address"))
@ -235,6 +279,12 @@ async fn delete_bin(
warn!("failed to delete bin {} for manual deletion", id);
}
app_state.metrics.bin_total.dec();
app_state
.metrics
.bin_data_size
.dec_by(metadata.size.map(|x| x as i64).unwrap_or_default());
Ok("ok\n")
}
@ -352,6 +402,12 @@ async fn upload_bin(
debug!("bin {id} got filled");
app_state.metrics.bin_total.inc();
app_state
.metrics
.bin_data_size
.inc_by(metadata.size.map(|x| x as i64).unwrap_or_default());
Ok((StatusCode::OK, "ok\n"))
} else {
Err(Error::DataFileExists)
@ -388,7 +444,13 @@ async fn get_item(
let file = File::open(&path).await?;
let reader = BufReader::new(file);
let body = StreamBody::new(DecryptingStream::new(reader, id, &metadata, &key, &nonce));
let body = StreamBody::new(DecryptingStream::new(
reader,
id.clone(),
&metadata,
&key,
&nonce,
));
let mut headers = HeaderMap::new();
headers.insert(
@ -412,10 +474,30 @@ async fn get_item(
headers.insert("Digest", digest);
}
app_state
.metrics
.bin_downloads
.get_or_create(&BinDownloadLabels { bin: id })
.inc();
Ok((StatusCode::OK, headers, body).into_response())
}
}
async fn metrics(State(app_state): State<AppState>) -> HandlerResult<impl IntoResponse> {
let mut buffer = String::new();
prometheus_client::encoding::text::encode(&mut buffer, &app_state.prometheus_registry)
.map_err(Error::Prometheus)?;
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("text/plain; version=0.0.4"),
);
Ok((headers, buffer))
}
enum MultipartOrStream {
Multipart(Multipart),
Stream(BodyStream),

View file

@ -1,23 +1,24 @@
use std::{borrow::Borrow, fmt::Display, str::FromStr};
use std::{borrow::Borrow, fmt::Display, str::FromStr, sync::Arc};
use chacha20::cipher::{generic_array::GenericArray, ArrayLength};
use prometheus_client::encoding::EncodeLabelValue;
use rand::{distributions, Rng};
use sha3::{Digest, Sha3_256};
use crate::{error::Error, PHRASE_LENGTH, SALT_LENGTH};
#[derive(Debug, PartialEq)]
pub(crate) struct Phrase(String);
#[derive(Debug, PartialEq, Eq, Clone)]
pub(crate) struct Id(String);
pub(crate) struct Phrase(Arc<str>);
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub(crate) struct Id(Arc<str>);
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct IdSalt(Vec<u8>);
pub(crate) struct IdSalt(Arc<[u8]>);
#[derive(Debug, PartialEq)]
pub(crate) struct Key(Vec<u8>);
pub(crate) struct Key(Arc<[u8]>);
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct KeySalt(Vec<u8>);
pub(crate) struct KeySalt(Arc<[u8]>);
#[derive(Debug, PartialEq)]
pub(crate) struct Nonce(Vec<u8>);
pub(crate) struct Nonce(Arc<[u8]>);
impl Phrase {
pub(crate) fn random() -> Self {
@ -26,7 +27,7 @@ impl Phrase {
.take(PHRASE_LENGTH)
.map(char::from)
.collect::<String>();
Self(phrase)
Self(phrase.into())
}
}
impl FromStr for Phrase {
@ -37,7 +38,10 @@ impl FromStr for Phrase {
Err(Error::PhraseInvalid)
} else {
Ok(Self(
s.chars().take(s.find('.').unwrap_or(s.len())).collect(),
s.chars()
.take(s.find('.').unwrap_or(s.len()))
.collect::<String>()
.into(),
))
}
}
@ -51,14 +55,14 @@ impl Display for Phrase {
impl Id {
pub(crate) fn from_phrase(phrase: &Phrase, salt: &IdSalt) -> Self {
let mut hasher = Sha3_256::new();
hasher.update(&phrase.0);
hasher.update(&*phrase.0);
hasher.update(&salt.0);
let id = hex::encode(hasher.finalize());
Self(id)
Self(id.into())
}
pub(crate) fn from_str(s: &str) -> Self {
Self(s.to_string())
Self(s.to_owned().into())
}
pub(crate) fn raw(&self) -> &str {
&self.0
@ -69,6 +73,14 @@ impl Display for Id {
self.0.fmt(f)
}
}
impl EncodeLabelValue for Id {
fn encode(
&self,
encoder: &mut prometheus_client::encoding::LabelValueEncoder,
) -> Result<(), std::fmt::Error> {
(&*self.0).encode(encoder)
}
}
impl IdSalt {
pub(crate) fn random() -> Self {
@ -86,16 +98,16 @@ impl FromStr for IdSalt {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(hex::decode(s)?))
Ok(Self(hex::decode(s)?.into()))
}
}
impl Key {
pub(crate) fn from_phrase(phrase: &Phrase, salt: &KeySalt) -> Self {
let mut hasher = Sha3_256::new();
hasher.update(&phrase.0);
hasher.update(&*phrase.0);
hasher.update(&salt.0);
Self(hasher.finalize().to_vec())
Self(hasher.finalize().to_vec().into())
}
}
@ -121,7 +133,7 @@ impl FromStr for KeySalt {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(hex::decode(s)?))
Ok(Self(hex::decode(s)?.into()))
}
}
@ -135,7 +147,7 @@ impl Nonce {
Self(nonce)
}
pub(crate) fn from_hex(hex_value: &str) -> Result<Self, Error> {
Ok(Self(hex::decode(hex_value)?))
Ok(Self(hex::decode(hex_value)?.into()))
}
pub(crate) fn to_hex(&self) -> String {
hex::encode(&self.0)
@ -215,8 +227,8 @@ mod test {
let key_salt = KeySalt::random();
assert_ne!(
hex::decode(Id::from_phrase(&phrase, &id_salt).0).unwrap(),
Key::from_phrase(&phrase, &key_salt).0
hex::decode(&*Id::from_phrase(&phrase, &id_salt).0).unwrap(),
&*Key::from_phrase(&phrase, &key_salt).0
);
}