Introduced "pot" concept for isolating different apps' elements from each other. Removed tracing_setup macro since it disabled most logging output

This commit is contained in:
Philip (a-0) 2024-01-14 14:38:05 +01:00
parent 4f8d6ec3d0
commit a768ce0f4e
17 changed files with 424 additions and 41 deletions

View file

@ -8,12 +8,12 @@ edition = "2021"
[dependencies]
anyhow = "1.0.71"
axum = { version = "0.7.2", features = [ "macros" ] }
chrono = "0.4.31"
itertools = "0.12.0"
cozo = { version = "0.7.5", features = [ "storage-rocksdb", "requests", "graph-algo" ] }
jsonwebtoken = "9.2.0"
serde = { version = "1.0.166", features = [ "derive" ] }
serde_json = "1.0.99"
serde_with = { version = "3.4.0", features = [ "chrono" ] }
serial_test = "2.0.0"
tokio = { version = "1.32.0", features = ["full"] }
tracing = "0.1.37"

View file

@ -8,10 +8,12 @@ use axum::{
response::{IntoResponse, Response},
Extension, Json,
};
use jsonwebtoken::{decode, Header};
use serde::{Deserialize, Serialize};
use serde_with::chrono::{DateTime, Utc};
use tracing::{debug, error, warn};
use ubisync_lib::api::app::{AppRegisterRequest, AppRegisterResponse};
use ubisync_lib::{api::app::{AppRegisterRequest, AppRegisterResponse, AppSetDefaultPotRequest, AppSetDefaultPotResponse, AppCreatePotRequest, AppCreatePotResponse}, types::PotId};
use uuid::Uuid;
use crate::state::ApiState;
@ -25,6 +27,16 @@ impl AppId {
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct App {
pub id: AppId,
pub app_type: String,
pub name: String,
pub description: String,
pub default_pot: Option<PotId>,
pub last_access: DateTime<Utc>,
}
#[derive(Serialize, Deserialize, Debug)]
struct JWTClaims {
sub: AppId,
@ -63,7 +75,7 @@ pub(super) async fn register(
// Maybe ask for consent by user
// If user wants registration, proceed
let result = s.add_app(&body.name, &body.description);
let result = s.add_app(&body.name, &body.description, &body.app_type);
match result {
Ok(id) => {
@ -93,3 +105,57 @@ pub(super) async fn register(
}
}
}
pub(super) async fn set_default_pot(
s: Extension<Arc<ApiState>>,
app_id: Extension<AppId>,
Json(body): Json<AppSetDefaultPotRequest>,
) -> Response {
match s.set_app_default_pot(&app_id.0, &body.pot_id) {
Ok(_) => (StatusCode::OK, Json { 0: AppSetDefaultPotResponse }).into_response(),
Err(e) => {
debug!("Could not set default pot: {}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
pub(super) async fn create_pot(
s: Extension<Arc<ApiState>>,
app_id: Extension<AppId>,
Json(body): Json<AppCreatePotRequest>,
) -> Response {
let app = match s.get_app(&app_id.0) {
Ok(a) => a,
Err(e) => {
debug!("Failed to fetch app: {}", e);
return StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
};
let inferred_app_type = match body.app_type {
Some(t) => t,
None => app.app_type
};
match s.create_pot(&app_id.0, &inferred_app_type) {
Ok(id) => {
// If this is the first pot for this app, set it as default
if app.default_pot.is_none() {
match s.set_app_default_pot(&app_id.0, &id) {
Ok(_) => (StatusCode::OK, Json { 0: AppCreatePotResponse {pot_id: id} }).into_response(),
Err(e) => {
debug!("Failed to set default pot: {}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
// Otherwise, just return
else {
(StatusCode::OK, Json { 0: AppCreatePotResponse {pot_id: id} }).into_response()
}
},
Err(e) => {
debug!("Could not create pot: {}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}

View file

@ -1,6 +1,6 @@
use axum::{
middleware::from_fn,
routing::{get, put},
routing::{get, put, post},
Extension, Router,
};
use std::sync::Arc;
@ -13,6 +13,8 @@ pub mod element;
pub fn get_router(state: ApiState) -> Router {
Router::new()
// authenticated routes
.route("/app/pot", put(app::create_pot))
.route("/app/pot/default", post(app::set_default_pot))
.route("/element", put(element::create))
.route(
"/element/:id",

View file

@ -1,15 +1,18 @@
use std::sync::Arc;
use chrono::Utc;
use cozo::DbInstance;
use jsonwebtoken::{DecodingKey, EncodingKey, Validation};
use serde_with::chrono::Utc;
use tracing::debug;
use ubisync_lib::{
messages::MessageContent,
types::{Element, ElementContent, ElementId},
types::{Element, ElementContent, ElementId, PotId},
};
use crate::{api::v0::app::AppId, state::queries};
use crate::{
api::v0::app::{App, AppId},
state::queries,
};
use super::State;
@ -32,10 +35,10 @@ impl ApiState {
}
}
pub fn add_app(&self, name: &str, description: &str) -> anyhow::Result<AppId> {
pub fn add_app(&self, name: &str, description: &str, app_type: &str) -> anyhow::Result<AppId> {
let id = AppId::new();
let last_access = Utc::now();
queries::apps::add(self.db(), &id, &last_access, name, description)?;
queries::apps::add(self.db(), &id, &last_access, name, description, app_type)?;
debug!("Successfully added app");
Ok(id)
@ -45,6 +48,10 @@ impl ApiState {
queries::apps::exists(self.db(), id)
}
pub fn get_app(&self, id: &AppId) -> anyhow::Result<App> {
queries::apps::get(self.db(), id)
}
pub fn create_element(&self, content: &ElementContent) -> anyhow::Result<ElementId> {
let id = ElementId::new();
queries::elements::add(self.db(), &id, &content, None, true)?;
@ -76,6 +83,16 @@ impl ApiState {
res
}
pub fn set_app_default_pot(&self, app_id: &AppId, pot_id: &PotId) -> anyhow::Result<()> {
queries::apps::set_default_pot(self.db(), app_id, pot_id)
}
pub fn create_pot(&self, app_id: &AppId, app_type: &str) -> anyhow::Result<PotId> {
let pot_id = PotId::new();
queries::apps::create_pot(self.db(), &pot_id, app_id, app_type)?;
Ok(pot_id)
}
pub fn get_element(&self, id: &ElementId) -> anyhow::Result<Element> {
self.state.get_element(id)
}
@ -100,7 +117,7 @@ impl ApiState {
#[cfg(test)]
mod tests {
use tracing::Level;
use ubisync_lib::{tracing_setup, types::ElementContent};
use ubisync_lib::types::ElementContent;
use crate::state::State;
@ -109,7 +126,10 @@ mod tests {
#[tokio::test]
#[serial_test::serial]
async fn test_element_create() {
tracing_setup!(Level::DEBUG);
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
let state = ApiState::new(
State::new("mem").await.unwrap(),
@ -128,7 +148,10 @@ mod tests {
#[tokio::test]
#[serial_test::serial]
async fn test_element_write() {
tracing_setup!(Level::DEBUG);
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
let state = ApiState::new(
State::new("mem").await.unwrap(),

View file

@ -88,15 +88,15 @@ mod tests {
use super::CommState;
use tracing::Level;
use ubisync_lib::{
tracing_setup,
types::{ElementContent, ElementId, MessageId},
};
use ubisync_lib::types::{ElementContent, ElementId, MessageId};
#[tokio::test]
#[serial_test::serial]
async fn test_element_add() {
tracing_setup!(Level::DEBUG);
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
let state = CommState::new(State::new("mem").await.unwrap());
let id = ElementId::new();
@ -117,7 +117,10 @@ mod tests {
#[tokio::test]
#[serial_test::serial]
async fn test_element_update() {
tracing_setup!(Level::DEBUG);
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
let state = CommState::new(State::new("mem").await.unwrap());
let id = ElementId::new();

View file

@ -1,10 +1,11 @@
use std::collections::BTreeMap;
use anyhow::{bail, Error};
use chrono::{DateTime, Utc};
use cozo::{DataValue, DbInstance, Num};
use cozo::{DataValue, DbInstance, Num, ScriptMutability};
use serde_with::chrono::{DateTime, Utc};
use ubisync_lib::types::PotId;
use crate::{api::v0::app::AppId, run_query};
use crate::{api::v0::app::{AppId, App}, run_query};
pub fn add(
db: &DbInstance,
@ -12,6 +13,7 @@ pub fn add(
last_access: &DateTime<Utc>,
name: &str,
description: &str,
app_type: &str,
) -> anyhow::Result<()> {
let params = vec![
(
@ -24,11 +26,13 @@ pub fn add(
),
("name", DataValue::Str(name.into())),
("description", DataValue::Str(description.into())),
("app_type", DataValue::Str(app_type.into())),
("default_pot", DataValue::Null),
];
match run_query!(
&db,
":insert apps {id => last_access, name, description}",
":insert apps {id => last_access, app_type, name, description, default_pot}",
params,
cozo::ScriptMutability::Mutable
) {
@ -45,7 +49,7 @@ pub fn exists(db: &DbInstance, id: &AppId) -> anyhow::Result<bool> {
);
let result = db.run_script(
"?[name] := *apps[$id, last_access, name, description]",
"?[name] := *apps[$id, last_access, app_type, name, description, default_pot]",
params,
cozo::ScriptMutability::Immutable,
);
@ -56,3 +60,68 @@ pub fn exists(db: &DbInstance, id: &AppId) -> anyhow::Result<bool> {
Err(Error::msg("Could not check whether app is registered"))
}
pub fn get(db: &DbInstance, id: &AppId) -> anyhow::Result<App> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script(
"?[last_access, app_type, name, description, default_pot] := *apps[$id, last_access, app_type, name, description, default_pot]",
params,
cozo::ScriptMutability::Immutable,
);
if let Ok(rows) = result {
if let Some(firstrow) = rows.rows.first() {
if let [DataValue::Num(Num::Int(ts)), DataValue::Str(app_type), DataValue::Str(name), DataValue::Str(desc), default_pot] = firstrow.as_slice() {
let last_access = DateTime::from_timestamp(ts.to_owned(), 0).ok_or(Error::msg("Failed to deserialize timestamp"))?;
let pot_id = match default_pot {
DataValue::Str(dpid) => Some(serde_json::from_str(dpid)?),
_ => None,
};
return Ok(App {id: id.clone(), app_type: app_type.to_string(), last_access, name: name.to_string(), description: desc.to_string(), default_pot: pot_id});
}
}
}
Err(Error::msg("Could not find app"))
}
pub fn set_default_pot(db: &DbInstance, id: &AppId, pot: &PotId) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(id)?.into())),
("default_pot", DataValue::Str(serde_json::to_string(pot)?.into()))
];
match run_query!(
&db,
":update apps {id => default_pot}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}
pub fn create_pot(db: &DbInstance, pot_id: &PotId, app_id: &AppId, app_type: &str) -> anyhow::Result<()> {
let params = vec![
("pot_id", DataValue::Str(serde_json::to_string(pot_id)?.into())),
("app_id", DataValue::Str(serde_json::to_string(app_id)?.into())),
("app_type", DataValue::Str(app_type.into())),
];
match run_query!(
&db,
":insert pots {pot_id => app_type}
:insert pot_memberships {pot_id => app_id}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(e) => bail!(e),
}
}

View file

@ -1,6 +1,7 @@
pub mod apps;
pub mod elements;
pub mod peers;
pub mod pots;
#[macro_export]
macro_rules! build_query {

View file

@ -0,0 +1,96 @@
use std::collections::BTreeMap;
use anyhow::Error;
use cozo::{DataValue, DbInstance, ScriptMutability};
use ubisync_lib::types::{Pot, PotId};
use crate::{
api::v0::app::AppId,
run_query,
};
pub fn add(db: &DbInstance, id: &PotId, app_type: &str) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(id)?.into())),
(
"app_type",
DataValue::Str(serde_json::to_string(app_type)?.into()),
),
];
match run_query!(
&db,
":put pots {id => app_type}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result<Pot> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script(
"
?[id, app_type] := *pots{id, app_type}
",
params,
cozo::ScriptMutability::Immutable,
);
match result {
Ok(rows) => {
if let Some(firstrow) = rows.rows.first() {
if let [DataValue::Str(pot_id), DataValue::Str(app_type)] = firstrow.as_slice() {
Ok(Pot::new(
serde_json::from_str(pot_id)?,
app_type.to_string(),
))
} else {
Err(Error::msg("Could not parse result from query"))
}
} else {
Err(Error::msg("Pot not found"))
}
}
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result<Vec<AppId>> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script(
"
?[app_id] := *pot_memberships{id, app_id}
",
params,
cozo::ScriptMutability::Immutable,
);
match result {
Ok(rows) => Ok(rows
.rows
.iter()
.map(|row| {
if let [DataValue::Str(app_id_string)] = row.as_slice() {
if let Ok(app_id) = serde_json::from_str::<AppId>(&app_id_string) {
Some(app_id)
} else {
None
}
} else {
None
}
})
.flatten()
.collect()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}

View file

@ -11,14 +11,26 @@ pub fn add_schema(db: &DbInstance) -> anyhow::Result<()> {
id: String,
=>
last_access: Int,
app_type: String,
name: String,
description: String,
default_pot: String?,
}}
{:create peers {
id: String,
=>
name: String,
}}
{:create pots {
id: String,
=>
app_type: String
}}
{:create pot_memberships {
pot_id: String,
=>
app_id: String,
}}
{:create elements {
id: String,
=>

View file

@ -4,14 +4,16 @@ use tracing::{debug, warn, Level};
use ubisync::{config::Config, Ubisync};
use ubisync_lib::{
api::element::{ElementCreateRequest, ElementGetRequest},
tracing_setup,
types::{Element, ElementContent},
};
use ubisync_sdk::UbisyncClient;
#[tokio::test(flavor = "multi_thread")]
async fn two_nodes_element_creation() {
tracing_setup!(Level::DEBUG);
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
// Two nodes need to bind to different ports
let mut c2 = Config::default();
@ -21,10 +23,16 @@ async fn two_nodes_element_creation() {
ubi1.add_peer_from_id(ubi2.get_destination().unwrap().into())
.unwrap();
let api_client1 = UbisyncClient::init("localhost", 9981, None, "App", "Long desc")
let api_client1 = UbisyncClient::init("localhost", 9981, None, "App", "Long desc", "test-app-type")
.await
.unwrap()
.create_default_pot()
.await
.unwrap();
let api_client2 = UbisyncClient::init("localhost", 9982, None, "App", "Long desc")
let api_client2 = UbisyncClient::init("localhost", 9982, None, "App", "Long desc", "test-app-type")
.await
.unwrap()
.create_default_pot()
.await
.unwrap();