split graph implementation into rebacs_core

This commit is contained in:
Paul Zinselmeyer 2023-09-05 17:14:14 +02:00
parent 7919ebf9a8
commit 666544c355
Signed by: pfzetto
GPG Key ID: 4EEF46A5B276E648
18 changed files with 226 additions and 1016 deletions

39
.drone.yml Normal file
View File

@ -0,0 +1,39 @@
kind: pipeline
ype: docker
name: rebacs
trigger:
event:
- push
steps:
- name: create_image
image: nixos/nix
commands:
- nix build --extra-experimental-features nix-command --extra-experimental-features flakes --cores 0 --max-jobs auto .#dockerImage
- cp result rebacs.tar.gz
- name: upload_image
image: docker:dind
environment:
REGISTRY_PASSWD:
from_secret: REGISTRY_PASSWD
volumes:
- name: dockersock
path: /var/run
commands:
- docker login --username droneci --password $REGISTRY_PASSWD git2.zettoit.eu
- docker load < rebacs.tar.gz
- docker tag rebacs:latest git2.zettoit.eu/zettoit/rebacs:latest
- docker push git2.zettoit.eu/zettoit/rebacs:latest
services:
- name: docker
image: docker:dind
privileged: true
volumes:
- name: dockersock
path: /var/run
volumes:
- name: dockersock
temp: {}

2
.gitignore vendored
View File

@ -1,5 +1,5 @@
/target
/result
.env
graph.dat
graph.dat.bak
api_keys.dat

View File

@ -1,36 +0,0 @@
variables:
DOCKER_TLS_CERTDIR: ""
DOCKER_HOST: tcp://docker:2375
services:
- name: docker:dind
entrypoint: ["dockerd-entrypoint.sh", "--tls=false"]
stages:
- build
- pack
cargo-build:
stage: build
image: rust:slim-bookworm
before_script:
- apt update && apt install protobuf-compiler -y
- rustup toolchain install nightly
- rustup default nightly
script:
- cargo build --release
artifacts:
paths:
- target/release/rebacs
docker:
stage: pack
image: docker:latest
only:
- master
script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker build --network host -t $CI_REGISTRY_IMAGE/master . -f Dockerfile
- docker push $CI_REGISTRY_IMAGE/master
dependencies:
- cargo-build

102
Cargo.lock generated
View File

@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aho-corasick"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a"
checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783"
dependencies = [
"memchr",
]
@ -51,7 +51,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
@ -62,7 +62,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
@ -290,9 +290,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b30f669a7961ef1631673d2766cc92f52d64f7ef354d4fe0ddfd30ed52f0f4f"
checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd"
dependencies = [
"errno-dragonfly",
"libc",
@ -359,7 +359,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
@ -464,6 +464,15 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "home"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb"
dependencies = [
"windows-sys",
]
[[package]]
name = "http"
version = "0.2.9"
@ -681,9 +690,9 @@ checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef"
[[package]]
name = "memchr"
version = "2.5.0"
version = "2.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c"
[[package]]
name = "mime"
@ -759,9 +768,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.32.0"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe"
checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
dependencies = [
"memchr",
]
@ -837,7 +846,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
@ -971,7 +980,14 @@ dependencies = [
]
[[package]]
name = "rebacs"
name = "rebacs_core"
version = "0.1.0"
dependencies = [
"tokio",
]
[[package]]
name = "rebacs_server"
version = "0.1.0"
dependencies = [
"compact_str",
@ -981,6 +997,7 @@ dependencies = [
"jsonwebtoken",
"log",
"prost",
"rebacs_core",
"reqwest",
"serde",
"sha2",
@ -1001,9 +1018,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.9.4"
version = "1.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29"
checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
dependencies = [
"aho-corasick",
"memchr",
@ -1013,9 +1030,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629"
checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
dependencies = [
"aho-corasick",
"memchr",
@ -1090,9 +1107,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.38.9"
version = "0.38.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bfe0f2582b4931a45d1fa608f8a8722e8b3c7ac54dd6d5f3b3212791fedef49"
checksum = "c0c3dde1fc030af041adc40e79c0e7fbcf431dd24870053d187d7c66e4b87453"
dependencies = [
"bitflags 2.4.0",
"errno",
@ -1103,9 +1120,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.6"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8"
dependencies = [
"log",
"ring",
@ -1177,7 +1194,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
@ -1295,9 +1312,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.29"
version = "2.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a"
checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398"
dependencies = [
"proc-macro2",
"quote",
@ -1334,29 +1351,29 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.47"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.47"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
name = "time"
version = "0.3.27"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb39ee79a6d8de55f48f2293a830e040392f1c5f16e336bdd1788cd0aadce07"
checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48"
dependencies = [
"deranged",
"itoa",
@ -1373,9 +1390,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
[[package]]
name = "time-macros"
version = "0.2.13"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "733d258752e9303d392b94b75230d07b0b9c489350c69b851fc6c065fde3e8f9"
checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572"
dependencies = [
"time-core",
]
@ -1432,7 +1449,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
@ -1566,7 +1583,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
]
[[package]]
@ -1619,9 +1636,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.4.0"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5"
dependencies = [
"form_urlencoded",
"idna",
@ -1670,7 +1687,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
"wasm-bindgen-shared",
]
@ -1704,7 +1721,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.31",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -1733,13 +1750,14 @@ checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "which"
version = "4.4.0"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"libc",
"home",
"once_cell",
"rustix",
]
[[package]]

View File

@ -1,29 +1,6 @@
[package]
name = "rebacs"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dotenvy = "0.15.7"
log = "0.4.17"
env_logger = "0.10.0"
serde = { version="1.0", features=["derive"] }
tokio = { version = "1.27.0", features = ["full"] }
tonic = { version="0.9.2", features=["tls"] }
prost = "0.11.9"
sha2 = "0.10.6"
hex = "0.4.3"
compact_str = "0.7.0"
thiserror = "1.0.47"
jsonwebtoken = "8.3.0"
reqwest = { version="0.11.20", features=["json", "rustls-tls"], default-features=false}
[build-dependencies]
tonic-build = "0.9.2"
[workspace]
resolver = "2"
members = [
"rebacs_server",
"rebacs_core",
]

View File

@ -1,4 +0,0 @@
FROM debian:bookworm-slim AS final
WORKDIR /app
COPY ./target/release/rebacs ./server
CMD [ "/app/server" ]

View File

@ -33,8 +33,7 @@
rustToolchain = pkgs.rust-bin.nightly.latest.default;
protoFilter = path: _type: builtins.match ".*proto$" path != null;
tailwindFilter = path: _type: builtins.match "^tailwind.config.js$" path != null;
protoOrCargo = path: type: (protoFilter path type) || (tailwindFilter path type) || (craneLib.filterCargoSources path type);
protoOrCargo = path: type: (protoFilter path type) || (craneLib.filterCargoSources path type);
craneLib = (crane.mkLib pkgs).overrideToolchain rustToolchain;
src = pkgs.lib.cleanSourceWith {
@ -58,7 +57,7 @@
name = "rebacs";
tag = "latest";
config = {
Cmd = [ "${bin}/bin/rebacs" ];
Cmd = [ "${bin}/bin/rebacs_server" ];
};
};

9
rebacs_core/Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[package]
name = "rebacs_core"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.32.0", features = [] }

View File

@ -9,11 +9,13 @@ use std::{
};
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
io::{AsyncBufReadExt, AsyncWriteExt},
sync::RwLock,
};
#[cfg(test)]
mod tests;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct NodeId {
pub namespace: String,
@ -34,11 +36,11 @@ struct Distanced<T> {
}
#[derive(Default)]
pub struct RelationSet {
pub struct RelationGraph {
nodes: RwLock<BTreeSet<Arc<Node>>>,
}
impl RelationSet {
impl RelationGraph {
pub async fn insert(&self, src: impl Into<NodeId>, dst: impl Into<NodeId>) {
let src = src.into();
let dst = dst.into();
@ -157,13 +159,14 @@ impl RelationSet {
false
}
pub async fn to_file(&self, file: &mut File) {
pub async fn write_savefile(&self, writeable: &mut (impl AsyncWriteExt + Unpin)) {
let mut current: (String, String) = (String::new(), String::new());
for node in self.nodes.read().await.iter() {
if current != (node.id.namespace.clone(), node.id.id.clone()) {
current = (node.id.namespace.clone(), node.id.id.clone());
file.write_all("\n".as_bytes()).await.unwrap();
file.write_all(format!("[{}:{}]\n", &current.0, &current.1).as_bytes())
writeable.write_all("\n".as_bytes()).await.unwrap();
writeable
.write_all(format!("[{}:{}]\n", &current.0, &current.1).as_bytes())
.await
.unwrap();
}
@ -186,15 +189,15 @@ impl RelationSet {
.unwrap_or_default();
if let Some(rel) = &node.id.relation {
file.write_all(format!("{} = [ {} ]\n", &rel, &srcs).as_bytes())
writeable
.write_all(format!("{} = [ {} ]\n", &rel, &srcs).as_bytes())
.await
.unwrap();
}
}
}
pub async fn from_file(file: &mut File) -> Self {
let reader = BufReader::new(file);
let mut lines = reader.lines();
pub async fn read_savefile(readable: &mut (impl AsyncBufReadExt + Unpin)) -> Self {
let mut lines = readable.lines();
let graph = Self::default();
let mut node: Option<(String, String)> = None;
while let Ok(Some(line)) = lines.next_line().await {
@ -274,7 +277,7 @@ impl Eq for Node {}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.id.partial_cmp(&other.id)
Some(self.cmp(other))
}
}
impl Ord for Node {

52
rebacs_core/src/tests.rs Normal file
View File

@ -0,0 +1,52 @@
//hello world
use crate::{Distanced, NodeId, RelationGraph};
#[test]
fn distanced_ordering() {
let a = Distanced::new((), 0);
let b = Distanced::one(());
let c = Distanced::new((), 1);
let d = Distanced::new((), 2);
assert!(a < b);
assert!(b == c);
assert!(c < d);
assert!(a < d);
}
#[tokio::test]
async fn simple_graph() {
let graph = RelationGraph::default();
let alice = ("user", "alice");
let bob = ("user", "bob");
let charlie = ("user", "charlie");
let foo_read = ("application", "foo", "read");
let bar_read = ("application", "bar", "read");
graph.insert(alice, foo_read).await;
graph.insert(bob, bar_read).await;
assert!(graph.has_recursive(alice, foo_read, None).await);
assert!(!graph.has_recursive(alice, bar_read, None).await);
assert!(!graph.has_recursive(bob, foo_read, None).await);
assert!(graph.has_recursive(bob, bar_read, None).await);
assert!(!graph.has_recursive(charlie, foo_read, None).await);
assert!(!graph.has_recursive(charlie, bar_read, None).await);
graph.remove(alice, foo_read).await;
graph.remove(alice, bar_read).await;
assert!(!graph.has_recursive(alice, foo_read, None).await);
assert!(!graph.has_recursive(alice, bar_read, None).await);
graph.insert(charlie, foo_read).await;
graph.insert(charlie, bar_read).await;
assert!(graph.has_recursive(charlie, foo_read, None).await);
assert!(graph.has_recursive(charlie, bar_read, None).await);
}

32
rebacs_server/Cargo.toml Normal file
View File

@ -0,0 +1,32 @@
[package]
name = "rebacs_server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dotenvy = "0.15.7"
log = "0.4.17"
env_logger = "0.10.0"
serde = { version="1.0", features=["derive"] }
tokio = { version = "1.27.0", features = ["full"] }
tonic = { version="0.9.2", features=["tls"] }
prost = "0.11.9"
sha2 = "0.10.6"
hex = "0.4.3"
compact_str = "0.7.0"
thiserror = "1.0.47"
jsonwebtoken = "8.3.0"
reqwest = { version="0.11.20", features=["json", "rustls-tls"], default-features=false}
rebacs_core = { path="../rebacs_core" }
[build-dependencies]
tonic-build = "0.9.2"

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use jsonwebtoken::{decode, DecodingKey, TokenData, Validation};
use log::info;
use rebacs_core::{NodeId, RelationGraph};
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
use tonic::metadata::MetadataMap;
@ -12,11 +13,10 @@ use crate::rebacs_proto::{
rebac_service_server, ExistsReq, ExistsRes, GrantReq, GrantRes, IsPermittedReq, IsPermittedRes,
RevokeReq, RevokeRes,
};
use crate::relation_set::{NodeId, RelationSet};
#[derive(Clone)]
pub struct RebacService {
pub graph: Arc<RelationSet>,
pub graph: Arc<RelationGraph>,
pub oidc_pubkey: DecodingKey,
pub oidc_validation: Validation,
pub save_trigger: Sender<()>,
@ -158,7 +158,7 @@ async fn is_permitted(
token: &TokenData<Claims>,
dst: &NodeId,
relation: &str,
graph: &RelationSet,
graph: &RelationGraph,
) -> bool {
let s1 = graph
.has_recursive(

View File

@ -5,10 +5,11 @@ use std::{env, sync::Arc, time::Duration};
use grpc_service::RebacService;
use jsonwebtoken::{Algorithm, DecodingKey, Validation};
use log::info;
use relation_set::RelationSet;
use rebacs_core::RelationGraph;
use serde::Deserialize;
use tokio::{
fs::{self, File},
io::BufReader,
select,
sync::mpsc::channel,
};
@ -16,7 +17,6 @@ use tonic::transport::Server;
pub mod grpc_service;
pub mod rebacs_proto;
pub mod relation_set;
use crate::rebacs_proto::rebac_service_server;
@ -31,10 +31,11 @@ async fn main() {
env_logger::init();
info!("loading graph from graph.dat");
let graph = if let Ok(mut file) = File::open("graph.dat").await {
RelationSet::from_file(&mut file).await
let graph = if let Ok(file) = File::open("graph.dat").await {
let mut reader = BufReader::new(file);
RelationGraph::read_savefile(&mut reader).await
} else {
RelationSet::default()
RelationGraph::default()
};
let graph = Arc::new(graph);
@ -50,7 +51,7 @@ async fn main() {
info!("saving graph");
let _ = fs::copy("graph.dat", "graph.dat.bak").await;
let mut file = File::create("graph.dat").await.unwrap();
save_thread_graph.to_file(&mut file).await;
save_thread_graph.write_savefile(&mut file).await;
}
});

View File

@ -1,735 +0,0 @@
use std::{
cmp::Ordering,
collections::{
hash_map::{Iter, IterMut},
BinaryHeap, HashMap, HashSet,
},
hash::Hash,
ops::Deref,
sync::Arc,
};
use log::info;
use serde::{Deserialize, Serialize};
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
};
#[derive(Default)]
pub struct Graph {
nodes: BidMap<Object, ObjectRef>,
edges: BidThreeMap<ObjectOrSet, Relation, ObjectRef>,
counter: u32,
}
#[derive(Hash, PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
pub struct Object {
pub namespace: String,
pub id: String,
}
#[derive(Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ObjectRef(pub u32);
#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Serialize)]
pub enum ObjectOrSet {
Object(Object),
Set((Object, Relation)),
}
#[derive(Hash, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Relation(pub String);
#[derive(PartialEq, Eq, Clone, Hash, Serialize, Deserialize, Debug)]
pub struct ObjectRelation(pub ObjectRef, pub Relation);
impl Object {
pub fn new(namespace: &str, id: &str) -> Self {
Self {
namespace: namespace.to_string(),
id: id.to_string(),
}
}
}
impl ObjectOrSet {
pub fn object(&self) -> &Object {
match self {
ObjectOrSet::Object(obj) => obj,
ObjectOrSet::Set((obj, _)) => obj,
}
}
pub fn relation(&self) -> Option<&Relation> {
match self {
ObjectOrSet::Object(_) => None,
ObjectOrSet::Set((_, rel)) => Some(rel),
}
}
}
impl From<Object> for ObjectOrSet {
fn from(value: Object) -> Self {
Self::Object(value)
}
}
impl From<(Object, &str)> for ObjectOrSet {
fn from(value: (Object, &str)) -> Self {
Self::Set((value.0, Relation::new(value.1)))
}
}
impl Relation {
pub fn new(relation: &str) -> Self {
Self(relation.to_string())
}
}
impl Deref for Relation {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<(ObjectRef, Relation)> for ObjectRelation {
fn from(value: (ObjectRef, Relation)) -> Self {
Self(value.0, value.1)
}
}
impl From<(ObjectRef, &str)> for ObjectRelation {
fn from(value: (ObjectRef, &str)) -> Self {
Self(value.0, Relation::new(value.1))
}
}
impl From<(&str, &str)> for Object {
fn from((namespace, id): (&str, &str)) -> Self {
Self {
namespace: namespace.to_string(),
id: id.to_string(),
}
}
}
impl From<(&String, &String)> for Object {
fn from((namespace, id): (&String, &String)) -> Self {
Self {
namespace: namespace.to_string(),
id: id.to_string(),
}
}
}
impl From<(String, String)> for Object {
fn from((namespace, id): (String, String)) -> Self {
Self { namespace, id }
}
}
impl Graph {
pub fn get_node(&self, namespace: &str, id: &str) -> Option<ObjectRef> {
self.nodes.get_by_a(&Object::new(namespace, id)).cloned()
}
pub fn object_from_ref(&self, obj: &ObjectRef) -> Object {
self.nodes.get_by_b(obj).unwrap().clone()
}
pub fn get_or_add_node(&mut self, namespace: &str, id: &str) -> ObjectRef {
if let Some(node) = self.get_node(namespace, id) {
node
} else {
self.add_node((namespace, id))
}
}
pub fn add_node(&mut self, node: impl Into<Object>) -> ObjectRef {
let obj_ref = ObjectRef(self.counter);
self.nodes.insert(node.into(), obj_ref);
self.counter += 1;
obj_ref
}
pub fn remove_node(&mut self, node: impl Into<Object>) {
let node = node.into();
let index = self.nodes.remove_by_a(&node);
if let Some(index) = index {
self.edges.remove_by_c(&index);
//self.edges.get_by_a(&ObjectOrSet::Object(*index));
//TODO: remove edges with ObjectOrSet::Set
}
}
pub fn remove_node_by_ref(&mut self, node: impl Into<ObjectRef>) {
let node = node.into();
let index = self.nodes.remove_by_b(&node);
if index.is_some() {
self.edges.remove_by_c(&node);
//let edges = self
// .edges
// .left_to_right
// .keys()
// .filter(|x| *x.object_ref() == node)
// .map(|x| (**x).clone())
// .collect::<Vec<ObjectOrSet>>();
//for edge in edges {
// self.edges.remove_by_a(&edge);
//}
}
}
pub fn has_relation(&self, src: ObjectOrSet, dst: ObjectRelation) -> bool {
self.edges.has(&src, &dst.1, &dst.0)
}
pub fn add_relation(&mut self, src: impl Into<ObjectOrSet>, dst: impl Into<ObjectRelation>) {
let dst = dst.into();
self.edges.insert(src.into(), dst.1, dst.0);
}
pub fn remove_relation(&mut self, src: impl Into<ObjectOrSet>, dst: impl Into<ObjectRelation>) {
let dst = dst.into();
self.edges.remove(&src.into(), &dst.1, &dst.0);
}
pub fn remove_relation_and_residual_node(
&mut self,
src: impl Into<ObjectOrSet>,
dst: impl Into<ObjectRelation>,
) {
let src = src.into();
let dst = dst.into();
self.edges.remove(&src, &dst.1, &dst.0);
//if self.edges.get_by_c(src.object_ref()).is_empty()
// && !self
// .edges
// .left_to_right
// .keys()
// .any(|x| x.object_ref() == src.object_ref())
//{
// self.remove_node_by_ref(*src.object_ref());
//}
//if self.edges.get_by_c(&dst.0).is_empty()
// && !self
// .edges
// .left_to_right
// .keys()
// .any(|x| *x.object_ref() == dst.0)
//{
// self.remove_node_by_ref(dst.0);
//}
}
pub fn is_related_to(
&self,
src: impl Into<ObjectOrSet>,
dst: impl Into<ObjectRelation>,
) -> bool {
let src = src.into();
let dst = dst.into();
let mut dist: HashMap<ObjectRelation, u32> = HashMap::new();
let mut q: BinaryHeap<ObjectRelationDist> = BinaryHeap::new();
for neighbor in self
.edges
.get_by_a(&src)
.iter()
.flat_map(|(r, m)| m.iter().map(|x| ObjectRelation(**x, (**r).clone())))
{
if neighbor == dst {
return true;
}
dist.insert(neighbor.clone(), 1);
q.push(ObjectRelationDist(1, neighbor.clone()));
}
//while let Some(ObjectRelationDist(node_dist, node)) = q.pop() {
// let node_dist = node_dist + 1;
// let node = ObjectOrSet::Set((node.0, node.1));
// for neighbor in self
// .edges
// .get_by_a(&node)
// .iter()
// .flat_map(|(r, m)| m.iter().map(|x| ObjectRelation(**x, (**r).clone())))
// {
// if neighbor == dst {
// return true;
// }
// if let Some(existing_node_dist) = dist.get(&neighbor) {
// if *existing_node_dist < node_dist {
// continue;
// }
// }
// dist.insert(neighbor.clone(), node_dist);
// q.push(ObjectRelationDist(node_dist, neighbor.clone()));
// }
//}
false
}
pub fn related_to(&self, dst: ObjectRef, relation: Relation) -> HashSet<ObjectRef> {
//let mut relation_sets = vec![];
//let mut relations: HashSet<ObjectRef> = HashSet::new();
//for obj in self.edges.get_by_cb(&dst, &relation) {
// match obj {
// ObjectOrSet::Object(obj) => {
// relations.insert(*obj);
// }
// ObjectOrSet::Set(set) => relation_sets.push(set),
// }
//}
//while let Some(set) = relation_sets.pop() {
// for obj in self.edges.get_by_cb(&set.0, &set.1) {
// match obj {
// ObjectOrSet::Object(obj) => {
// relations.insert(*obj);
// }
// ObjectOrSet::Set(set) => relation_sets.push(set),
// }
// }
//}
//relations
todo!()
}
pub fn relations(&self, src: impl Into<ObjectRelation>) -> HashSet<ObjectRef> {
//let src: ObjectRelation = src.into();
//let mut visited = HashSet::new();
//let mut relation_sets = vec![];
//let mut relations = HashSet::new();
//for (rel, neighbors) in self.edges.get_by_a(&ObjectOrSet::Object(src.0)) {
// for neighbor in neighbors {
// if *rel == src.1 {
// relations.insert(*neighbor);
// }
// relation_sets.push((rel, neighbor));
// }
//}
//while let Some((rel, obj_ref)) = relation_sets.pop() {
// if !visited.contains(&(rel, obj_ref)) {
// for (rel, neighbors) in self
// .edges
// .get_by_a(&ObjectOrSet::Set((*obj_ref, (*rel).clone())))
// {
// for neighbor in neighbors {
// if *rel == src.1 {
// relations.insert(*neighbor);
// }
// relation_sets.push((rel, neighbor));
// }
// }
// visited.insert((rel, obj_ref));
// }
//}
//relations
todo!()
}
pub async fn to_file(&self, file: &mut File) {
info!("writing graph to file");
for (obj, obj_ref) in self.nodes.iter() {
file.write_all(format!("[{}:{}]\n", &obj.namespace, &obj.id).as_bytes())
.await
.unwrap();
//for (rel, arr) in self.edges.get_by_c(obj_ref.as_ref()) {
// let arr = arr
// .iter()
// .filter_map(|x| {
// let rel_obj_ref = x.object_ref();
// self.nodes.get_by_b(rel_obj_ref).map(|rel_obj| {
// let (namespace, id) = (&rel_obj.namespace, &rel_obj.id);
// if *namespace == obj.namespace && *id == obj.id {
// match x.relation() {
// None => "self".to_string(),
// Some(rel) => format!("self#{}", &rel.0),
// }
// } else {
// match x.relation() {
// None => format!("{}:{}", &namespace, &id),
// Some(rel) => format!("{}:{}#{}", &namespace, &id, &rel.0),
// }
// }
// })
// })
// .reduce(|acc, e| acc + ", " + &e)
// .unwrap_or_default();
// file.write_all(format!("{} = [{}]\n", &rel.0, &arr).as_bytes())
// .await
// .unwrap();
//}
file.write_all("\n".as_bytes()).await.unwrap();
}
}
pub async fn from_file(file: &mut File) -> Self {
info!("reading graph from file");
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut graph = Graph::default();
let mut node: Option<(ObjectRef, String, String)> = None;
let mut relations = vec![];
while let Ok(Some(line)) = lines.next_line().await {
if line.starts_with('[') && line.ends_with(']') {
let line = &mut line[1..line.len() - 1].split(':');
let namespace = line.next().unwrap();
let id = line.next().unwrap();
let obj_ref = graph.add_node(Object::new(namespace, id));
node = Some((obj_ref, namespace.to_string(), id.to_string()));
} else if line.contains('=') && line.contains('[') && line.contains(']') {
if let Some(dst) = &node {
let equals_pos = line.find('=').unwrap();
let arr_start = line.find('[').unwrap();
let arr_stop = line.find(']').unwrap();
let rel = line[..equals_pos].trim();
let arr = line[arr_start + 1..arr_stop].split(", ");
for obj in arr {
let (src_namespace, src_id, src_rel) = if obj.contains('#') {
let sep_1 = obj.find(':');
let sep_2 = obj.find('#').unwrap();
let (namespace, id) = if let Some(sep_1) = sep_1 {
(&obj[..sep_1], &obj[sep_1 + 1..sep_2])
} else {
(dst.1.as_str(), dst.2.as_str())
};
let rel = &obj[sep_2 + 1..];
(namespace, id, Some(rel))
} else {
let sep_1 = obj.find(':');
let (namespace, id) = if let Some(sep_1) = sep_1 {
(&obj[..sep_1], &obj[sep_1 + 1..])
} else {
(dst.1.as_str(), dst.2.as_str())
};
(namespace, id, None)
};
relations.push((
src_namespace.to_string(),
src_id.to_string(),
src_rel.map(String::from),
dst.0,
rel.to_string(),
));
}
}
}
}
//for relation in relations {
// let src = match relation.2 {
// Some(rel) => {
// let obj = graph.get_node(&relation.0, &relation.1).unwrap();
// ObjectOrSet::Set((obj, Relation::new(&rel)))
// }
// None => {
// let obj = graph.get_node(&relation.0, &relation.1).unwrap();
// ObjectOrSet::Object(obj)
// }
// };
// graph.add_relation(src, ObjectRelation(relation.3, Relation(relation.4)));
//}
graph
}
}
/// Helper Struct used for Dijkstra
#[derive(PartialEq, Eq)]
struct ObjectRelationDist(u32, ObjectRelation);
impl Ord for ObjectRelationDist {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.0.cmp(&self.0)
}
}
impl PartialOrd for ObjectRelationDist {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(other.0.cmp(&self.0))
}
}
pub struct BidMap<A, B> {
left_to_right: HashMap<Arc<A>, Arc<B>>,
right_to_left: HashMap<Arc<B>, Arc<A>>,
}
impl<A, B> Default for BidMap<A, B> {
fn default() -> Self {
Self {
left_to_right: Default::default(),
right_to_left: Default::default(),
}
}
}
impl<A, B> BidMap<A, B>
where
A: Eq + Hash,
B: Eq + Hash,
{
pub fn new() -> Self {
Self {
left_to_right: HashMap::new(),
right_to_left: HashMap::new(),
}
}
pub fn insert(&mut self, a: A, b: B) {
let a = Arc::new(a);
let b = Arc::new(b);
self.left_to_right.insert(a.clone(), b.clone());
self.right_to_left.insert(b, a);
}
pub fn remove_by_a(&mut self, a: &A) -> Option<Arc<B>> {
if let Some(b) = self.left_to_right.remove(a) {
self.right_to_left.remove(&b);
Some(b)
} else {
None
}
}
pub fn remove_by_b(&mut self, b: &B) -> Option<Arc<A>> {
if let Some(a) = self.right_to_left.remove(b) {
self.left_to_right.remove(&a);
Some(a)
} else {
None
}
}
pub fn get_by_a(&self, a: &A) -> Option<&B> {
self.left_to_right.get(a).map(Deref::deref)
}
pub fn get_by_b(&self, b: &B) -> Option<&A> {
self.right_to_left.get(b).map(Deref::deref)
}
pub fn iter(&self) -> Iter<Arc<A>, Arc<B>> {
self.left_to_right.iter()
}
pub fn iter_mut(&mut self) -> IterMut<Arc<A>, Arc<B>> {
self.left_to_right.iter_mut()
}
}
pub struct BidThreeMap<A, B, C> {
left_to_right: HashMap<Arc<A>, HashMap<Arc<B>, HashSet<Arc<C>>>>,
right_to_left: HashMap<Arc<C>, HashMap<Arc<B>, HashSet<Arc<A>>>>,
}
impl<A, B, C> BidThreeMap<A, B, C>
where
A: Eq + Hash,
B: Eq + Hash,
C: Eq + Hash,
{
pub fn new() -> Self {
Self {
left_to_right: HashMap::new(),
right_to_left: HashMap::new(),
}
}
pub fn insert(&mut self, a: A, b: B, c: C) {
let a = Arc::new(a);
let b = Arc::new(b);
let c = Arc::new(c);
if let Some(middle) = self.left_to_right.get_mut(&a) {
if let Some(right) = middle.get_mut(&b) {
right.insert(c.clone());
} else {
let mut right = HashSet::new();
right.insert(c.clone());
middle.insert(b.clone(), right);
}
} else {
let mut middle = HashMap::new();
let mut right = HashSet::new();
right.insert(c.clone());
middle.insert(b.clone(), right);
self.left_to_right.insert(a.clone(), middle);
}
if let Some(middle) = self.right_to_left.get_mut(&c) {
if let Some(left) = middle.get_mut(&b) {
left.insert(a);
} else {
let mut left = HashSet::new();
left.insert(a);
middle.insert(b, left);
}
} else {
let mut middle = HashMap::new();
let mut left = HashSet::new();
left.insert(a);
middle.insert(b, left);
self.right_to_left.insert(c, middle);
}
}
pub fn remove(&mut self, a: &A, b: &B, c: &C) {
if let Some(right) = self.left_to_right.get_mut(a).and_then(|ltr| ltr.get_mut(b)) {
right.remove(c);
}
if let Some(left) = self.right_to_left.get_mut(c).and_then(|rtl| rtl.get_mut(b)) {
left.remove(a);
}
}
pub fn remove_by_a(&mut self, a: &A) {
if let Some(map) = self.left_to_right.remove(a) {
for (b, set) in map {
for c in set {
if let Some(set) = self
.right_to_left
.get_mut(&c)
.and_then(|ltr| ltr.get_mut(&b))
{
set.remove(a);
}
}
}
}
}
pub fn remove_by_c(&mut self, c: &C) {
if let Some(map) = self.right_to_left.remove(c) {
for (b, set) in map {
for a in set {
if let Some(set) = self
.left_to_right
.get_mut(&a)
.and_then(|ltr| ltr.get_mut(&b))
{
set.remove(c);
}
}
}
}
}
pub fn has(&self, a: &A, b: &B, c: &C) -> bool {
self.left_to_right
.get(a)
.and_then(|ltr| ltr.get(b))
.and_then(|ltr| ltr.get(c))
.is_some()
}
pub fn get_by_ab(&self, a: &A, b: &B) -> HashSet<&C> {
self.left_to_right
.get(a)
.and_then(|ltr| ltr.get(b))
.map(|ltr| ltr.iter().map(|x| x.as_ref()).collect::<HashSet<_>>())
.unwrap_or_default()
}
pub fn get_by_cb(&self, c: &C, b: &B) -> HashSet<&A> {
self.right_to_left
.get(c)
.and_then(|rtl| rtl.get(b))
.map(|rtl| rtl.iter().map(|x| x.as_ref()).collect::<HashSet<_>>())
.unwrap_or_default()
}
pub fn get_by_a(&self, a: &A) -> HashMap<&B, HashSet<&C>> {
self.left_to_right
.get(a)
.iter()
.flat_map(|x| x.iter())
.map(|(b, c)| {
(
b.as_ref(),
c.iter().map(|x| x.as_ref()).collect::<HashSet<&C>>(),
)
})
.collect::<_>()
}
pub fn get_by_c(&self, c: &C) -> HashMap<&B, HashSet<&A>> {
self.right_to_left
.get(c)
.iter()
.flat_map(|x| x.iter())
.map(|(b, a)| {
(
b.as_ref(),
a.iter().map(|x| x.as_ref()).collect::<HashSet<&A>>(),
)
})
.collect::<_>()
}
}
impl<A, B, C> Default for BidThreeMap<A, B, C> {
fn default() -> Self {
Self {
left_to_right: Default::default(),
right_to_left: Default::default(),
}
}
}
//impl Ord for ObjectOrSet {
// fn cmp(&self, other: &Self) -> Ordering {}
//}
//
//impl Ord for ObjectRef {}cmp
impl PartialOrd for Relation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.0.partial_cmp(&other.0)
}
}
impl Ord for Relation {
fn cmp(&self, other: &Self) -> Ordering {
self.0.cmp(&other.0)
}
}
impl PartialOrd for ObjectOrSet {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (
self.object().partial_cmp(other.object()),
self.relation(),
other.relation(),
) {
(Some(Ordering::Equal), Some(self_rel), Some(other_rel)) => {
self_rel.partial_cmp(other_rel)
}
(ord, _, _) => ord,
}
}
}
impl Ord for ObjectOrSet {
fn cmp(&self, other: &Self) -> Ordering {
self.object()
.cmp(other.object())
.then(self.relation().cmp(&other.relation()))
}
}
impl PartialOrd for Object {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.namespace.partial_cmp(&other.namespace) {
Some(core::cmp::Ordering::Equal) => self.id.partial_cmp(&other.id),
ord => ord,
}
}
}
impl Ord for Object {
fn cmp(&self, other: &Self) -> Ordering {
self.namespace
.cmp(&other.namespace)
.then(self.id.cmp(&other.id))
}
}

View File

@ -1,145 +0,0 @@
use std::{sync::Arc, time::Duration};
use kafka::{
consumer::{Consumer, FetchOffset},
producer::{Producer, Record, RequiredAcks},
};
use log::debug;
use serde::{Deserialize, Serialize};
use tokio::{
runtime::Runtime,
sync::{mpsc, RwLock},
task::JoinHandle,
};
use crate::{
graph::{Graph, ObjectRelation},
object::{Object, ObjectOrSet, ObjectRef},
};
#[derive(Serialize, Deserialize, Debug)]
pub enum Event {
AddObject(Object),
RemoveObject(Object),
AddRelation((ObjectOrSet, ObjectRelation)),
RemoveRelation((ObjectOrSet, ObjectRelation)),
}
pub struct GraphProxy {
graph: Arc<RwLock<Graph>>,
producer_thread: JoinHandle<()>,
producer_tx: mpsc::Sender<Event>,
consumer_thread: JoinHandle<()>,
}
impl GraphProxy {
pub async fn run() -> Self {
let graph = Arc::new(RwLock::new(Graph::default()));
let (producer_tx, mut producer_rx) = mpsc::channel(1024);
let mut producer = Producer::from_hosts(vec!["localhost:9092".to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()
.unwrap();
let producer_thread = tokio::spawn(async move {
loop {
if let Some(event) = producer_rx.recv().await {
let ser_event = serde_cbor::to_vec(&event).unwrap();
producer
.send(&Record::from_value("gpm", ser_event))
.unwrap();
debug!("emitted Event: {:?}", event);
}
}
});
let mut consumer = Consumer::from_hosts(vec!["localhost:9092".to_string()])
.with_client_id("gpm_dev".to_string())
.with_topic("gpm".to_string())
.with_fallback_offset(FetchOffset::Earliest)
.create()
.unwrap();
let consumer_graph = graph.clone();
let consumer_thread = tokio::task::spawn_blocking(move || {
let runtime = Runtime::new().unwrap();
loop {
for msg_sets in consumer.poll().unwrap().iter() {
for msg in msg_sets.messages() {
let event: Event = serde_cbor::from_slice(msg.value).unwrap();
debug!("received Event: {:?}", event);
let mut graph = runtime.block_on(consumer_graph.write());
match event {
Event::AddObject(obj) => {
graph.add_node(obj);
}
Event::RemoveObject(obj) => {
graph.remove_node(obj);
}
Event::AddRelation((src, dst)) => {
graph.add_relation(src, dst);
}
Event::RemoveRelation((src, dst)) => {
graph.remove_relation(src, dst);
}
};
}
consumer.consume_messageset(msg_sets).unwrap();
}
consumer.commit_consumed().unwrap();
}
});
Self {
graph,
producer_thread,
producer_tx,
consumer_thread,
}
}
pub fn stop(&mut self) {
self.producer_thread.abort();
self.consumer_thread.abort();
}
pub async fn add_node(&mut self, node: Object) {
self.producer_tx.send(Event::AddObject(node)).await.unwrap();
}
pub async fn remove_node(&mut self, node: Object) {
self.producer_tx
.send(Event::RemoveObject(node))
.await
.unwrap();
}
pub async fn add_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) {
self.producer_tx
.send(Event::AddRelation((src, dst)))
.await
.unwrap();
}
pub async fn remove_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) {
self.producer_tx
.send(Event::RemoveRelation((src, dst)))
.await
.unwrap();
}
pub async fn get_node(&self, namespace: &str, id: &str) -> Option<ObjectRef> {
let graph = self.graph.read().await;
graph.get_node(namespace, id)
}
pub async fn is_related_to(
&self,
src: impl Into<ObjectOrSet>,
dst: impl Into<ObjectRelation>,
) -> bool {
let graph = self.graph.read().await;
graph.is_related_to(src, dst)
}
pub async fn related_by(&self, src: impl Into<ObjectRelation>) -> Vec<ObjectOrSet> {
let graph = self.graph.read().await;
graph.related_by(src)
}
}