From d8f1733eb30726f28a589a738630e4862fd5f8d5 Mon Sep 17 00:00:00 2001 From: "Philip (a-0)" <@ph:a-0.me> Date: Fri, 5 Jan 2024 20:48:23 +0100 Subject: [PATCH] Refactoring, mainly formatting --- src/api/mod.rs | 51 +++++----- src/api/v0/app.rs | 55 ++++++++--- src/api/v0/element.rs | 38 ++++++-- src/api/v0/mod.rs | 21 +++-- src/comm/message_processor.rs | 18 ++-- src/comm/messages/mod.rs | 14 +-- src/comm/mod.rs | 129 ++++++++++++++----------- src/comm/types.rs | 11 ++- src/config.rs | 16 +++- src/lib.rs | 19 ++-- src/state/api_state.rs | 59 +++++++++--- src/state/comm_state.rs | 68 +++++++++++--- src/state/mod.rs | 56 +++++++---- src/state/queries/apps.rs | 46 ++++++--- src/state/queries/elements.rs | 146 +++++++++++++++++++++-------- src/state/queries/mod.rs | 66 ++++++------- src/state/queries/peers.rs | 58 ++++++------ src/state/schema.rs | 12 ++- src/state/types/element.rs | 22 +++-- src/state/types/element_content.rs | 11 +-- src/state/types/element_id.rs | 4 +- src/state/types/message_id.rs | 5 +- src/state/types/peer_id.rs | 7 +- src/state/types/tag.rs | 5 +- tests/api.rs | 78 +++++++++++---- 25 files changed, 667 insertions(+), 348 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 901b7ca..40c999d 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,28 +1,9 @@ - use axum::Router; use tokio::{net::TcpListener, task::JoinHandle}; -use serde::{Serialize, Deserialize}; -use uuid::Uuid; -use crate::{state::ApiState, config::ApiConfig}; +use crate::{config::ApiConfig, state::ApiState}; -mod v0; - - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct AppId(Uuid); - -impl AppId { - pub fn new() -> Self { - AppId { 0: Uuid::new_v4() } - } -} - -#[derive(Serialize, Deserialize)] -pub struct AppDescription { - pub name: String, - pub desc_text: String, -} +pub mod v0; pub struct Api { server_thread: JoinHandle<()>, @@ -42,7 +23,11 @@ pub struct ApiBuilder { impl Default for ApiBuilder { fn default() -> Self { - ApiBuilder { bind_ip: None, port: None, version: None } + ApiBuilder { + bind_ip: None, + port: None, + version: None, + } } } @@ -51,7 +36,7 @@ impl From for ApiBuilder { ApiBuilder { bind_ip: value.ip, port: value.port, - version: value.version + version: value.version, } } } @@ -72,12 +57,18 @@ impl ApiBuilder { Some(p) => p, None => &9981, }; - let listener = TcpListener::bind(&format!("{}:{}", ip, port)).await.unwrap(); - - let task_handle = tokio::spawn({ async move { - axum::serve(listener, app).await.unwrap(); - }}); + let listener = TcpListener::bind(&format!("{}:{}", ip, port)) + .await + .unwrap(); - Api{ server_thread: task_handle } + let task_handle = tokio::spawn({ + async move { + axum::serve(listener, app).await.unwrap(); + } + }); + + Api { + server_thread: task_handle, + } } -} \ No newline at end of file +} diff --git a/src/api/v0/app.rs b/src/api/v0/app.rs index ad94b04..ddb12c0 100644 --- a/src/api/v0/app.rs +++ b/src/api/v0/app.rs @@ -1,11 +1,29 @@ use std::sync::Arc; -use serde::{Serialize, Deserialize}; -use axum::{Extension, extract::Request, body::Body, middleware::Next, response::{IntoResponse, Response}, http::{header::AUTHORIZATION, StatusCode}, Json}; +use axum::{ + body::Body, + extract::Request, + http::{header::AUTHORIZATION, StatusCode}, + middleware::Next, + response::{IntoResponse, Response}, + Extension, Json, +}; use jsonwebtoken::{decode, Header}; -use tracing::{debug, warn, error}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, error, warn}; +use uuid::Uuid; -use crate::{state::ApiState, api::{AppId, AppDescription}}; +use crate::state::ApiState; + + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct AppId(Uuid); + +impl AppId { + pub fn new() -> Self { + AppId { 0: Uuid::new_v4() } + } +} #[derive(Serialize, Deserialize, Debug)] struct JWTClaims { @@ -13,7 +31,19 @@ struct JWTClaims { } -pub(super) async fn auth(s: Extension>, mut request: Request, next: Next) -> Response { + +#[derive(Serialize, Deserialize)] +pub struct AppDescription { + pub name: String, + pub desc_text: String, +} + + +pub(super) async fn auth( + s: Extension>, + mut request: Request, + next: Next, +) -> Response { if let Some(auth_header) = request.headers().get(AUTHORIZATION) { if let Ok(header_string) = auth_header.to_str() { if header_string.starts_with("Bearer") { @@ -35,9 +65,10 @@ pub(super) async fn auth(s: Extension>, mut request: Request StatusCode::UNAUTHORIZED.into_response() } - - -pub(super) async fn register(s: Extension>, Json(data): Json) -> Response { +pub(super) async fn register( + s: Extension>, + Json(data): Json, +) -> Response { // Maybe ask for consent by user // If user wants registration, proceed @@ -48,7 +79,7 @@ pub(super) async fn register(s: Extension>, Json(data): Json>, Json(data): Json { error!("Failed to register new application! {:?}", e); StatusCode::INTERNAL_SERVER_ERROR.into_response() - }, + } } -} \ No newline at end of file +} diff --git a/src/api/v0/element.rs b/src/api/v0/element.rs index b2b89c3..8f42322 100644 --- a/src/api/v0/element.rs +++ b/src/api/v0/element.rs @@ -1,15 +1,22 @@ use std::sync::Arc; -use axum::{extract::{Path, Json}, Extension, response::{IntoResponse, Response}, http::StatusCode}; -use tracing::{warn, debug}; - -use crate::state::{types::{ElementId, ElementContent}, ApiState}; +use axum::{ + extract::{Json, Path}, + http::StatusCode, + response::{IntoResponse, Response}, + Extension, +}; +use tracing::{debug, warn}; +use crate::state::{ + types::{ElementContent, ElementId}, + ApiState, +}; pub(super) async fn get(Path(id): Path, s: Extension>) -> Response { let element = s.get_element(&id); match element { - Ok(el) => (StatusCode::OK, Json{0: el}).into_response(), + Ok(el) => (StatusCode::OK, Json { 0: el }).into_response(), Err(e) => { warn!("Element not found:\n{:?}", e); StatusCode::NOT_FOUND.into_response() @@ -17,16 +24,29 @@ pub(super) async fn get(Path(id): Path, s: Extension>) } } -pub(super) async fn create(s: Extension>, Json(content): Json) -> Response { +pub(super) async fn create( + s: Extension>, + Json(content): Json, +) -> Response { let element_id = s.create_element(&content); debug!("{:?}", element_id); match element_id { - Ok(id) => (StatusCode::OK, Json{0: &Into::::into(&id)}).into_response(), + Ok(id) => ( + StatusCode::OK, + Json { + 0: &Into::::into(&id), + }, + ) + .into_response(), Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), } } -pub(super) async fn set(Path(id): Path, s: Extension>, Json(content): Json) -> Response { +pub(super) async fn set( + Path(id): Path, + s: Extension>, + Json(content): Json, +) -> Response { let res = s.write_element_content(&id, &content); match res { Ok(_) => StatusCode::OK.into_response(), @@ -40,4 +60,4 @@ pub(super) async fn remove(Path(id): Path, s: Extension Ok(_) => StatusCode::OK.into_response(), Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), } -} \ No newline at end of file +} diff --git a/src/api/v0/mod.rs b/src/api/v0/mod.rs index 5845b62..f75f787 100644 --- a/src/api/v0/mod.rs +++ b/src/api/v0/mod.rs @@ -1,20 +1,25 @@ +use axum::{ + middleware::from_fn, + routing::{get, put}, + Extension, Router, +}; use std::sync::Arc; -use axum::{Router, routing::{put, get}, Extension, middleware::from_fn}; use crate::state::ApiState; -mod app; -mod element; +pub mod app; +pub mod element; pub fn get_router(state: ApiState) -> Router { Router::new() - // authenticated routes + // authenticated routes .route("/element", put(element::create)) - .route("/element/:id", get(element::get).post(element::set).delete(element::remove)) + .route( + "/element/:id", + get(element::get).post(element::set).delete(element::remove), + ) .layer(from_fn(app::auth)) - // public / unauthenticated routes + // public / unauthenticated routes .route("/app/register", put(app::register)) .layer(Extension(Arc::new(state))) } - - diff --git a/src/comm/message_processor.rs b/src/comm/message_processor.rs index 94d1979..3d82883 100644 --- a/src/comm/message_processor.rs +++ b/src/comm/message_processor.rs @@ -4,22 +4,24 @@ use crate::state::{types::PeerId, CommState}; use super::messages::{Message, MessageContent}; - - pub fn handle(state: &CommState, peer: &PeerId, message: Message) { debug!("Handling message now: {:?}", message); match message.content() { MessageContent::Hello { peer_name } => { state.set_peer(peer, peer_name).expect("State failed"); - }, + } MessageContent::CreateElement { id, content } => { - state.add_received_element(id, content, message.id()).expect("State failed"); - }, + state + .add_received_element(id, content, message.id()) + .expect("State failed"); + } MessageContent::SetElement { id, content } => { - state.update_element_content(id, content, message.id()).expect("State failed"); - }, + state + .update_element_content(id, content, message.id()) + .expect("State failed"); + } MessageContent::RemoveElement { id } => { state.remove_element(id).expect("State failed"); } } -} \ No newline at end of file +} diff --git a/src/comm/messages/mod.rs b/src/comm/messages/mod.rs index c49416e..aa0625d 100644 --- a/src/comm/messages/mod.rs +++ b/src/comm/messages/mod.rs @@ -1,7 +1,6 @@ -use serde::{Serialize, Deserialize}; - -use crate::state::types::{MessageId, ElementId, ElementContent}; +use serde::{Deserialize, Serialize}; +use crate::state::types::{ElementContent, ElementId, MessageId}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Message { @@ -10,7 +9,6 @@ pub struct Message { content: MessageContent, } - #[derive(Serialize, Deserialize, Debug, Clone)] pub enum MessageContent { Hello { @@ -26,19 +24,17 @@ pub enum MessageContent { }, RemoveElement { id: ElementId, - } + }, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MessageSignature {} - - impl Message { pub fn new(content: MessageContent) -> Self { Message { id: MessageId::new(), - signature: MessageSignature { }, + signature: MessageSignature {}, content: content, } } @@ -56,4 +52,4 @@ impl MessageSignature { pub fn verify(&self, _content: &MessageContent) -> bool { true } -} \ No newline at end of file +} diff --git a/src/comm/mod.rs b/src/comm/mod.rs index c4c731a..9301dee 100644 --- a/src/comm/mod.rs +++ b/src/comm/mod.rs @@ -1,23 +1,23 @@ -pub mod messages; pub mod message_processor; +pub mod messages; mod types; -use tracing::{warn, error, debug}; +use tracing::{debug, error, warn}; pub use types::*; use std::collections::HashMap; +use std::io::{Read, Write}; use std::ops::Deref; use std::sync::Arc; -use std::io::{Read, Write}; use anyhow::bail; -use i2p::net::{I2pListenerBuilder, I2pListener, I2pSocketAddr, I2pStream, I2pAddr}; +use i2p::net::{I2pAddr, I2pListener, I2pListenerBuilder, I2pSocketAddr, I2pStream}; use i2p::sam_options::SAMOptions; use tokio::sync::RwLock; use tokio::task::JoinHandle; -use crate::Config; -use crate::state::CommState; use crate::state::types::PeerId; +use crate::state::CommState; +use crate::Config; use self::messages::Message; @@ -31,16 +31,14 @@ pub struct CommHandle { impl CommHandle { pub fn new(state: CommState, config: &Config) -> anyhow::Result { - let mut listener_builder = I2pListenerBuilder::default() - .with_options(SAMOptions::default()); + let mut listener_builder = + I2pListenerBuilder::default().with_options(SAMOptions::default()); if let Some(privkey) = &config.i2p_private_key { listener_builder = listener_builder.with_private_key(privkey.to_string()); } - let listener = listener_builder - .build() - .unwrap(); + let listener = listener_builder.build().unwrap(); Ok(CommHandle { state: Arc::new(state), @@ -71,7 +69,9 @@ impl CommHandle { // there is another stream - thus, the existing streams will not be read. // `spawn_blocking` moves the reading task to a special pool of tasks which are // executed _despite_ other tasks blocking for something. - tokio::task::spawn_blocking(move || Self::read_connection(wrapped_stream, state_arc)); + tokio::task::spawn_blocking(move || { + Self::read_connection(wrapped_stream, state_arc) + }); } } } @@ -90,11 +90,14 @@ impl CommHandle { for peer in self.state.get_peers().unwrap() { debug!("Sending to peer '{:?}' message '{:?}'", &peer, &msg); if let Err(e) = self.send_to_addr(&peer.addr(), msg_string.as_bytes()).await { - debug!("Failed to send message.\nError: {:?}\nMessage: {:?}", e, &msg); + debug!( + "Failed to send message.\nError: {:?}\nMessage: {:?}", + e, &msg + ); } } Ok(()) - }, + } Err(e) => bail!(e), } } @@ -104,7 +107,7 @@ impl CommHandle { Ok(msg_string) => { self.send_to_addr(dest, msg_string.as_bytes()).await?; Ok(()) - }, + } Err(e) => bail!(e), } } @@ -116,12 +119,14 @@ impl CommHandle { Ok(client) => { //client.inner.sam.conn.set_nodelay(true)?; //client.inner.sam.conn.set_nonblocking(false)?; - self.clients.write().await.insert(addr.clone(), Arc::new(RwLock::new(client))); - }, + self.clients + .write() + .await + .insert(addr.clone(), Arc::new(RwLock::new(client))); + } Err(e) => bail!(e), } } - // Fetch current client for this connection from clients map, and send the message if let Some(client) = self.clients.read().await.get(&addr) { @@ -129,18 +134,21 @@ impl CommHandle { match writeguard.write_all(msg) { Ok(_) => { writeguard.flush()?; - return Ok(()) - }, + return Ok(()); + } Err(e) => { warn!("Error writing to stream: {}", e) } } - } - else { - return Err(anyhow::Error::msg("No client found despite trying to add one beforehand.")) + } else { + return Err(anyhow::Error::msg( + "No client found despite trying to add one beforehand.", + )); } self.clients.write().await.remove(&addr); - Err(anyhow::Error::msg("Failed to send anything, most likely the stream was broken and has been removed")) + Err(anyhow::Error::msg( + "Failed to send anything, most likely the stream was broken and has been removed", + )) } pub fn i2p_address(&self) -> anyhow::Result { @@ -156,27 +164,29 @@ impl CommHandle { Ok(i2p_dest) } - fn read_connection(wrapped_stream: Arc>, state: Arc) -> JoinHandle<()> { + fn read_connection( + wrapped_stream: Arc>, + state: Arc, + ) -> JoinHandle<()> { tokio::spawn(async move { let mut stream = wrapped_stream.write().await; let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into(); - + // All streams start with a \n byte which does not belong to the payload, take that from the stream. if let Err(e) = stream.read(&mut [0; 1]) { error!("Error while reading first byte of stream: {}", e); return; } - - let iterator = serde_json::Deserializer::from_reader(&mut *stream).into_iter::(); + + let iterator = serde_json::Deserializer::from_reader(&mut *stream) + .into_iter::(); for item in iterator { match item { - Ok(value) => { - match serde_json::from_value::(value) { - Ok(message) => { - message_processor::handle(state.deref(), &peer, message); - }, - Err(e) => warn!("Deserialization failed: {:?}", e), + Ok(value) => match serde_json::from_value::(value) { + Ok(message) => { + message_processor::handle(state.deref(), &peer, message); } + Err(e) => warn!("Deserialization failed: {:?}", e), }, Err(e) => { warn!("Deserialization failed: {:?}", e); @@ -188,39 +198,52 @@ impl CommHandle { } } - - - #[cfg(test)] mod tests { use std::time::Duration; - use i2p::Session; use i2p::net::I2pListener; use i2p::sam::StreamForward; use i2p::sam_options::SAMOptions; + use i2p::Session; - use crate::Config; - use crate::state::{State, CommState}; use crate::comm::{messages, Message}; use crate::state::types::ElementId; + use crate::state::{CommState, State}; + use crate::Config; use super::CommHandle; #[tokio::test(flavor = "multi_thread")] pub async fn msg() { - let ch = CommHandle::new(CommState::new(State::new().await.unwrap()), &Config::default() ).unwrap(); + let ch = CommHandle::new( + CommState::new(State::new().await.unwrap()), + &Config::default(), + ) + .unwrap(); ch.run().await; println!("My address: {:?}", ch.i2p_b32_address()); - + ch.send( &ch.i2p_address().unwrap(), - Message::new(messages::MessageContent::Hello { peer_name: "a".to_string() }) - ).await.expect("Could not send hello"); + Message::new(messages::MessageContent::Hello { + peer_name: "a".to_string(), + }), + ) + .await + .expect("Could not send hello"); for i in 0..10 { - let result = ch.send( - &ch.i2p_address().unwrap(), - Message::new(messages::MessageContent::CreateElement { id: ElementId::new(), content: crate::state::types::ElementContent::Text(format!("hello world no. {}", i)) }) - ).await; + let result = ch + .send( + &ch.i2p_address().unwrap(), + Message::new(messages::MessageContent::CreateElement { + id: ElementId::new(), + content: crate::state::types::ElementContent::Text(format!( + "hello world no. {}", + i + )), + }), + ) + .await; tokio::time::sleep(Duration::from_millis(300)).await; println!("Result of sending: {:?}", result); } @@ -232,11 +255,11 @@ mod tests { pub fn from_privkey() { let privkey = "DPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gVKUHq0huLwfh0u06PlujRXTgcUJw9pg4Vkh-e0CGQFL6yn2FxCUIvaryyFt3-8mwO1OTkQyB7u1rnO9FpLlKeT9FPSkwmaxZmwQ1kvsuTTIp5ntxQZ1XMCDm2qhRWdcEsYxTKLJIMYxN1Ujk9Y7SqNYORmxrwQWC4ENGnt~VyvbAAAAfAabqgU0GhMWN2syDQ5sYZ61WXDqC4esasxwyLvJ-ES7~k40Uq9htc8t16-RXEq0Q17C499WxW6~GQRcXbgBNd0bMdV-46RsFo1jNgfB6H4nkuTrQXMqXB6s2Fhx2gwcHRk3Lt5DE4N0mvHG8Po974tJWr1hIRiSxQUtSj5kcZOOT~EKWMoCA7qDgZORZAnJavaRr0S-PiPQwAw8HOekdw50CyOByxXEfLBAi-Kz1nhdNvMHIrtcBZ~RpsxOK63O633e0PeYwrOOG7AFVLh7SzdwVvI1-KUe7y2ADBcoHuJRMwk5MEV-BATEfhWA2SzWw1qFRzJyb-pGbgGCJQOoc1YcP8jikBJhtuRbD5K-wK5MXoHL"; let _ = I2pListener { - forward: StreamForward::with_session(&Session::from_destination( - i2p::sam::DEFAULT_API, - &privkey, - SAMOptions::default()).expect("Failed to create session for listener2") - ).expect("Failed to create StreamForward for listener2") + forward: StreamForward::with_session( + &Session::from_destination(i2p::sam::DEFAULT_API, &privkey, SAMOptions::default()) + .expect("Failed to create session for listener2"), + ) + .expect("Failed to create StreamForward for listener2"), }; } } diff --git a/src/comm/types.rs b/src/comm/types.rs index 374f686..204129f 100644 --- a/src/comm/types.rs +++ b/src/comm/types.rs @@ -1,11 +1,10 @@ use anyhow::bail; use i2p::net::I2pSocketAddr; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use crate::state::types::PeerId; - #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Peer { id: PeerId, @@ -15,7 +14,11 @@ pub struct Peer { impl Peer { pub fn new(id: PeerId, name: String) -> Self { - Peer {id: id, name: name, family: vec![]} + Peer { + id: id, + name: name, + family: vec![], + } } pub fn addr(&self) -> I2pSocketAddr { @@ -48,4 +51,4 @@ impl TryFrom for Peer { fn try_from(value: String) -> Result { Self::try_from(value.as_str()) } -} \ No newline at end of file +} diff --git a/src/config.rs b/src/config.rs index d8a52e9..776d73f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Config { @@ -9,7 +9,11 @@ pub struct Config { impl Default for Config { fn default() -> Self { - Config { i2p_private_key: None, api_config: Default::default(), jwt_secret: "insecuresecret".to_string() } + Config { + i2p_private_key: None, + api_config: Default::default(), + jwt_secret: "insecuresecret".to_string(), + } } } @@ -22,6 +26,10 @@ pub struct ApiConfig { impl Default for ApiConfig { fn default() -> Self { - ApiConfig { ip: None, port: None, version: None } + ApiConfig { + ip: None, + port: None, + version: None, + } } -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index c5e3df2..3520438 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,11 @@ use api::{Api, ApiBuilder}; use comm::{CommHandle, Peer}; use config::Config; use i2p::net::I2pSocketAddr; -use serde::{Serialize, Deserialize}; -use state::{State, types::{MessageId, PeerId}, CommState, ApiState}; +use serde::{Deserialize, Serialize}; +use state::{ + types::{MessageId, PeerId}, + ApiState, CommState, State, +}; pub mod api; pub mod comm; @@ -18,7 +21,6 @@ pub struct MessageRelations { pub parents: Vec, } - pub struct Ubisync { comm_handle: Arc, state_handle: Arc, @@ -31,13 +33,17 @@ impl Ubisync { let comm_handle = Arc::new(CommHandle::new(CommState::new(state.clone()), config)?); state.set_comm_handle(comm_handle.clone()); - let api = Arc::new(ApiBuilder::from(config.api_config.clone()).build(ApiState::new(state.clone(), &config.jwt_secret)).await); + let api = Arc::new( + ApiBuilder::from(config.api_config.clone()) + .build(ApiState::new(state.clone(), &config.jwt_secret)) + .await, + ); comm_handle.run().await; Ok(Ubisync { comm_handle: comm_handle, state_handle: state, - api: api + api: api, }) } @@ -45,7 +51,7 @@ impl Ubisync { self.api.clone() } - pub fn add_peer(&self, p: impl TryInto) -> anyhow::Result<()> { + pub fn add_peer(&self, p: impl TryInto) -> anyhow::Result<()> { match p.try_into() { Ok(peer) => self.state_handle.set_peer(&peer), Err(e) => bail!(e), @@ -66,7 +72,6 @@ impl Ubisync { } } - #[cfg(test)] mod tests { #[test] diff --git a/src/state/api_state.rs b/src/state/api_state.rs index d41e6c2..0ad06b1 100644 --- a/src/state/api_state.rs +++ b/src/state/api_state.rs @@ -2,14 +2,19 @@ use std::sync::Arc; use chrono::Utc; use cozo::DbInstance; -use jsonwebtoken::{EncodingKey, DecodingKey, Validation}; +use jsonwebtoken::{DecodingKey, EncodingKey, Validation}; use tracing::debug; -use crate::{state::{types::ElementId, queries}, comm::messages::MessageContent, api::{AppDescription, AppId}}; - -use super::{State, types::{ElementContent, Element}}; - +use crate::{ + api::v0::app::{AppDescription, AppId}, + comm::messages::MessageContent, + state::{queries, types::ElementId}, +}; +use super::{ + types::{Element, ElementContent}, + State, +}; pub struct ApiState { state: Arc, @@ -33,7 +38,13 @@ impl ApiState { pub fn add_app(&self, description: &AppDescription) -> anyhow::Result { let id = AppId::new(); let last_access = Utc::now(); - queries::apps::add(self.db(), &id, &last_access, &description.name, &description.desc_text)?; + queries::apps::add( + self.db(), + &id, + &last_access, + &description.name, + &description.desc_text, + )?; debug!("Successfully added app"); Ok(id) @@ -48,11 +59,18 @@ impl ApiState { queries::elements::add(self.db(), &id, &content, None, true)?; debug!("Added element {{{}}}", &id.to_string()); - self.state.send_to_peers(MessageContent::CreateElement { id: id.clone(), content: content.clone() }); + self.state.send_to_peers(MessageContent::CreateElement { + id: id.clone(), + content: content.clone(), + }); Ok(id) } - pub fn write_element_content(&self, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { + pub fn write_element_content( + &self, + id: &ElementId, + content: &ElementContent, + ) -> anyhow::Result<()> { queries::elements::set_content(self.db(), id, content)?; queries::elements::set_local_changes(self.db(), id, true)?; debug!("Wrote element content {{{}}}", &id.to_string()); @@ -71,7 +89,6 @@ impl ApiState { self.state.get_element(id) } - fn db(&self) -> &DbInstance { &self.state.db } @@ -93,13 +110,18 @@ impl ApiState { mod tests { use super::ApiState; use crate::state::{types::ElementContent, State}; - + #[tokio::test] #[serial_test::serial] async fn test_element_create() { tracing_subscriber::fmt().pretty().init(); - let state = ApiState::new(State::new().await.unwrap(), "abcdabcdabcdabcdabcdabcdabcdabcd"); - let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap(); + let state = ApiState::new( + State::new().await.unwrap(), + "abcdabcdabcdabcdabcdabcdabcdabcd", + ); + let id = state + .create_element(&ElementContent::Text("Test-text".to_string())) + .unwrap(); let el = state.get_element(&id).unwrap(); assert_eq!( ElementContent::Text("Test-text".to_string()), @@ -111,9 +133,16 @@ mod tests { #[serial_test::serial] async fn test_element_write() { tracing_subscriber::fmt().pretty().init(); - let state = ApiState::new(State::new().await.unwrap(), "abcdabcdabcdabcdabcdabcdabcdabcd"); - let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap(); - state.write_element_content(&id,&ElementContent::Text("Test-text 2".to_string())).unwrap(); + let state = ApiState::new( + State::new().await.unwrap(), + "abcdabcdabcdabcdabcdabcdabcdabcd", + ); + let id = state + .create_element(&ElementContent::Text("Test-text".to_string())) + .unwrap(); + state + .write_element_content(&id, &ElementContent::Text("Test-text 2".to_string())) + .unwrap(); let el = state.get_element(&id).unwrap(); assert_eq!( ElementContent::Text("Test-text 2".to_string()), diff --git a/src/state/comm_state.rs b/src/state/comm_state.rs index 4884c62..227fa24 100644 --- a/src/state/comm_state.rs +++ b/src/state/comm_state.rs @@ -3,29 +3,47 @@ use std::sync::Arc; use cozo::DbInstance; use tracing::debug; -use crate::{state::queries, comm::Peer}; - -use super::{State, types::{MessageId, ElementContent, ElementId, PeerId, Element}}; +use crate::{comm::Peer, state::queries}; +use super::{ + types::{Element, ElementContent, ElementId, MessageId, PeerId}, + State, +}; //TODO: Notify API about changes pub struct CommState { - state: Arc + state: Arc, } impl CommState { pub fn new(state: Arc) -> Self { CommState { state: state } } - - pub fn add_received_element(&self, id: &ElementId, content: &ElementContent, latest_message: &MessageId) -> anyhow::Result<()> { - queries::elements::add(self.db(), &id, &content, Some(latest_message.to_owned()), false)?; + + pub fn add_received_element( + &self, + id: &ElementId, + content: &ElementContent, + latest_message: &MessageId, + ) -> anyhow::Result<()> { + queries::elements::add( + self.db(), + &id, + &content, + Some(latest_message.to_owned()), + false, + )?; debug!("Added element {{{}}}", &id.to_string()); Ok(()) } - pub fn update_element_content(&self, id: &ElementId, content: &ElementContent, latest_message: &MessageId) -> anyhow::Result<()> { + pub fn update_element_content( + &self, + id: &ElementId, + content: &ElementContent, + latest_message: &MessageId, + ) -> anyhow::Result<()> { //TODO: resolve potential conflicts with local changes queries::elements::set_content(self.db(), id, content)?; queries::elements::set_latest_message(self.db(), id, Some(latest_message.to_owned()))?; @@ -48,7 +66,7 @@ impl CommState { pub fn set_peer(&self, id: &PeerId, name: &str) -> anyhow::Result<()> { queries::peers::put(self.db(), id, name)?; debug!("Set peer {{{}}}", id.to_string()); - + Ok(()) } @@ -56,7 +74,6 @@ impl CommState { self.state.get_peers() } - fn db(&self) -> &DbInstance { &self.state.db } @@ -65,15 +82,24 @@ impl CommState { #[cfg(test)] mod tests { use super::CommState; - use crate::state::{types::{ElementContent, ElementId, MessageId}, State}; - + use crate::state::{ + types::{ElementContent, ElementId, MessageId}, + State, + }; + #[tokio::test] #[serial_test::serial] async fn test_element_add() { tracing_subscriber::fmt().pretty().init(); let state = CommState::new(State::new().await.unwrap()); let id = ElementId::new(); - state.add_received_element(&id, &ElementContent::Text("Test-text".to_string()), &MessageId::new()).unwrap(); + state + .add_received_element( + &id, + &ElementContent::Text("Test-text".to_string()), + &MessageId::new(), + ) + .unwrap(); let el = state.get_element(&id).unwrap(); assert_eq!( ElementContent::Text("Test-text".to_string()), @@ -87,8 +113,20 @@ mod tests { tracing_subscriber::fmt().pretty().init(); let state = CommState::new(State::new().await.unwrap()); let id = ElementId::new(); - state.add_received_element(&id, &ElementContent::Text("Test-text".to_string()), &MessageId::new()).unwrap(); - state.update_element_content(&id,&ElementContent::Text("Test-text 2".to_string()), &MessageId::new()).unwrap(); + state + .add_received_element( + &id, + &ElementContent::Text("Test-text".to_string()), + &MessageId::new(), + ) + .unwrap(); + state + .update_element_content( + &id, + &ElementContent::Text("Test-text 2".to_string()), + &MessageId::new(), + ) + .unwrap(); let el = state.get_element(&id).unwrap(); assert_eq!( ElementContent::Text("Test-text 2".to_string()), diff --git a/src/state/mod.rs b/src/state/mod.rs index 8f00bc7..bd3231f 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -2,11 +2,14 @@ use std::sync::{Arc, RwLock}; use anyhow::Error; use cozo::DbInstance; -use tracing::{error, debug}; +use tracing::{debug, error}; -use crate::comm::{Peer, CommHandle, messages::{Message, MessageContent}}; +use crate::comm::{ + messages::{Message, MessageContent}, + CommHandle, Peer, +}; -use self::types::{ElementContent, ElementId, Element, Tag}; +use self::types::{Element, ElementContent, ElementId, Tag}; pub mod types; @@ -29,28 +32,48 @@ impl State { match db { Ok(d) => { schema::add_schema(&d)?; - Ok(Arc::new(State {db: d, comm_handle: RwLock::new(None)})) - }, + Ok(Arc::new(State { + db: d, + comm_handle: RwLock::new(None), + })) + } Err(e) => Err(Error::msg(format!("{:?}", e))), } } pub fn set_comm_handle(&self, handle: Arc) { - *self.comm_handle.write().as_deref_mut().expect("Could not set state's CommHandle") = Some(handle); + *self + .comm_handle + .write() + .as_deref_mut() + .expect("Could not set state's CommHandle") = Some(handle); } - pub fn set_element_content(&self, element_id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { + pub fn set_element_content( + &self, + element_id: &ElementId, + content: &ElementContent, + ) -> anyhow::Result<()> { let res = queries::elements::set_content(&self.db, element_id, content); - debug!("Set content of element with id {:?}: {:?}", element_id, self.get_element(element_id)); - - self.send_to_peers(MessageContent::SetElement { id: element_id.clone(), content: content.clone() }); + debug!( + "Set content of element with id {:?}: {:?}", + element_id, + self.get_element(element_id) + ); + + self.send_to_peers(MessageContent::SetElement { + id: element_id.clone(), + content: content.clone(), + }); res } pub fn remove_element(&self, element_id: &ElementId) -> anyhow::Result<()> { let res = queries::elements::remove(&self.db, element_id); - self.send_to_peers(MessageContent::RemoveElement { id: element_id.clone() }); + self.send_to_peers(MessageContent::RemoveElement { + id: element_id.clone(), + }); res } @@ -60,8 +83,11 @@ impl State { pub fn get_elements_by_tag(&self, tag: &Tag) -> Vec { queries::elements::get_by_tag(&self.db, tag) - .map_err(|e| {error!("{}", e); e}) - .unwrap_or(vec![]) + .map_err(|e| { + error!("{}", e); + e + }) + .unwrap_or(vec![]) } pub fn set_peer(&self, peer: &Peer) -> anyhow::Result<()> { @@ -72,8 +98,6 @@ impl State { queries::peers::get(&self.db) } - - fn send_to_peers(&self, ct: MessageContent) { match self.comm_handle.read() { Ok(opt) => { @@ -83,7 +107,7 @@ impl State { let _ = arc.broadcast(Message::new(ct)).await; }); } - }, + } Err(e) => debug!("{}", e), } } diff --git a/src/state/queries/apps.rs b/src/state/queries/apps.rs index 1c6b1b7..e5d09a0 100644 --- a/src/state/queries/apps.rs +++ b/src/state/queries/apps.rs @@ -2,35 +2,57 @@ use std::collections::BTreeMap; use anyhow::{bail, Error}; use chrono::{DateTime, Utc}; -use cozo::{DbInstance, DataValue, Num}; +use cozo::{DataValue, DbInstance, Num}; -use crate::{run_query, api::AppId}; +use crate::{api::v0::app::AppId, run_query}; - - -pub fn add(db: &DbInstance, id: &AppId, last_access: &DateTime, name: &str, description: &str) -> anyhow::Result<()> { +pub fn add( + db: &DbInstance, + id: &AppId, + last_access: &DateTime, + name: &str, + description: &str, +) -> anyhow::Result<()> { let params = vec![ - ("id", DataValue::Str(serde_json::to_string(&id).unwrap().into())), - ("last_access", DataValue::Num(Num::Int(last_access.timestamp()))), + ( + "id", + DataValue::Str(serde_json::to_string(&id).unwrap().into()), + ), + ( + "last_access", + DataValue::Num(Num::Int(last_access.timestamp())), + ), ("name", DataValue::Str(name.into())), ("description", DataValue::Str(description.into())), ]; - match run_query!(&db, ":insert apps {id => last_access, name, description}", params, cozo::ScriptMutability::Mutable) { + match run_query!( + &db, + ":insert apps {id => last_access, name, description}", + params, + cozo::ScriptMutability::Mutable + ) { Ok(_) => Ok(()), - Err(report) => bail!(report) + Err(report) => bail!(report), } } pub fn exists(db: &DbInstance, id: &AppId) -> anyhow::Result { let mut params = BTreeMap::new(); - params.insert("id".to_string(), DataValue::Str(serde_json::to_string(&id)?.into())); + params.insert( + "id".to_string(), + DataValue::Str(serde_json::to_string(&id)?.into()), + ); - let result = db.run_script("?[name] := *apps[$id, last_access, name, description]", params, cozo::ScriptMutability::Immutable); + let result = db.run_script( + "?[name] := *apps[$id, last_access, name, description]", + params, + cozo::ScriptMutability::Immutable, + ); if let Ok(rows) = result { return Ok(rows.rows.len() == 1); } Err(Error::msg("Could not check whether app is registered")) -} \ No newline at end of file +} diff --git a/src/state/queries/elements.rs b/src/state/queries/elements.rs index b82770c..7323ee7 100644 --- a/src/state/queries/elements.rs +++ b/src/state/queries/elements.rs @@ -1,46 +1,96 @@ use std::collections::BTreeMap; -use anyhow::{Error, bail}; -use cozo::{DbInstance, DataValue, JsonData, ScriptMutability}; +use anyhow::{bail, Error}; +use cozo::{DataValue, DbInstance, JsonData, ScriptMutability}; use serde_json::Value; -use tracing::{error, debug}; +use tracing::{debug, error}; -use crate::{state::{ElementContent, ElementId, Element, types::{Tag, MessageId}}, run_query}; +use crate::{ + run_query, + state::{ + types::{MessageId, Tag}, + Element, ElementContent, ElementId, + }, +}; -pub fn add(db: &DbInstance, id: &ElementId, content: &ElementContent, latest_message: Option, local_changes: bool) -> anyhow::Result<()> { +pub fn add( + db: &DbInstance, + id: &ElementId, + content: &ElementContent, + latest_message: Option, + local_changes: bool, +) -> anyhow::Result<()> { let params = vec![ ("id", DataValue::Str(serde_json::to_string(&id)?.into())), - ("content", DataValue::Json(JsonData(serde_json::to_value(content)?))), - ("latest_message", match latest_message { - Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()), - None => DataValue::Null, - }), + ( + "content", + DataValue::Json(JsonData(serde_json::to_value(content)?)), + ), + ( + "latest_message", + match latest_message { + Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()), + None => DataValue::Null, + }, + ), ("local_changes", DataValue::Bool(local_changes)), ]; - match run_query!(&db, ":insert elements {id => content, latest_message, local_changes}", params, ScriptMutability::Mutable) { + match run_query!( + &db, + ":insert elements {id => content, latest_message, local_changes}", + params, + ScriptMutability::Mutable + ) { Ok(_) => Ok(()), Err(report) => bail!(report), } } -pub fn set_content(db: &DbInstance, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { - set_property(db, id, "content", DataValue::Json(JsonData(serde_json::to_value(content)?))) +pub fn set_content( + db: &DbInstance, + id: &ElementId, + content: &ElementContent, +) -> anyhow::Result<()> { + set_property( + db, + id, + "content", + DataValue::Json(JsonData(serde_json::to_value(content)?)), + ) } -pub fn set_latest_message(db: &DbInstance, id: &ElementId, latest_message: Option) -> anyhow::Result<()> { - set_property(db, id, "latest_message", match latest_message { - Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()), - None => DataValue::Null, - }) +pub fn set_latest_message( + db: &DbInstance, + id: &ElementId, + latest_message: Option, +) -> anyhow::Result<()> { + set_property( + db, + id, + "latest_message", + match latest_message { + Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()), + None => DataValue::Null, + }, + ) } -pub fn set_local_changes(db: &DbInstance, id: &ElementId, local_changes: bool) -> anyhow::Result<()> { +pub fn set_local_changes( + db: &DbInstance, + id: &ElementId, + local_changes: bool, +) -> anyhow::Result<()> { set_property(db, id, "local_changes", DataValue::Bool(local_changes)) } pub fn remove(db: &DbInstance, id: &ElementId) -> anyhow::Result<()> { - match run_query!(&db, ":delete elements {id}", vec![("id", DataValue::Str(serde_json::to_string(&id)?.into()))], cozo::ScriptMutability::Mutable) { + match run_query!( + &db, + ":delete elements {id}", + vec![("id", DataValue::Str(serde_json::to_string(&id)?.into()))], + cozo::ScriptMutability::Mutable + ) { Ok(_) => Ok(()), Err(report) => bail!(report), } @@ -48,7 +98,10 @@ pub fn remove(db: &DbInstance, id: &ElementId) -> anyhow::Result<()> { pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result { let mut params = BTreeMap::new(); - params.insert("id".to_string(), DataValue::Str(serde_json::to_string(&id)?.into())); + params.insert( + "id".to_string(), + DataValue::Str(serde_json::to_string(&id)?.into()), + ); let result = db.run_script(" ?[content, latest_message, local_changes] := *elements[$id, content, latest_message, local_changes] @@ -57,7 +110,9 @@ pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result { Ok(val) => { if let Some(firstrow) = val.rows.first() { debug!("db result: {:?}", &firstrow.as_slice()); - if let [ DataValue::Json(JsonData(content)), latest_message, DataValue::Bool(local_changes) ] = firstrow.as_slice() { + if let [DataValue::Json(JsonData(content)), latest_message, DataValue::Bool(local_changes)] = + firstrow.as_slice() + { return Ok(Element::from(( id.to_owned(), serde_json::from_value(content.to_owned())?, @@ -65,56 +120,71 @@ pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result { DataValue::Str(s) => Some(serde_json::from_str(s)?), _ => None, }, - local_changes.to_owned() + local_changes.to_owned(), ))); } return Err(Error::msg("Could not parse db result as Element")); + } else { + return Err(Error::msg("No rows returned for element query")); } - else { - return Err(Error::msg("No rows returned for element query")) - } - }, + } Err(report) => bail!(report), } } pub fn get_by_tag(db: &DbInstance, tag: &Tag) -> anyhow::Result> { let mut params = BTreeMap::new(); - params.insert("tag".to_string(), DataValue::Str(serde_json::to_string(tag)?.into())); + params.insert( + "tag".to_string(), + DataValue::Str(serde_json::to_string(tag)?.into()), + ); - let result = db.run_script(" + let result = db.run_script( + " ?[element] := *tags[$tag, element] - ", params, cozo::ScriptMutability::Immutable); + ", + params, + cozo::ScriptMutability::Immutable, + ); match result { Ok(named_rows) => { let mut element_ids = vec![]; for row in named_rows.rows { - if let [ DataValue::Json(JsonData(Value::String(element_id)))] = row.as_slice() { + if let [DataValue::Json(JsonData(Value::String(element_id)))] = row.as_slice() { match serde_json::from_str(&element_id) { Ok(id) => element_ids.push(id), Err(e) => { error!("Error parsing element id {}: {}", element_id, e); - continue + continue; } } } } Ok(element_ids) - }, + } Err(report) => bail!(report), } } - -fn set_property(db: &DbInstance, id: &ElementId, key: &str, value: DataValue) -> anyhow::Result<()> { +fn set_property( + db: &DbInstance, + id: &ElementId, + key: &str, + value: DataValue, +) -> anyhow::Result<()> { let params = vec![ ("id", DataValue::Str(serde_json::to_string(id)?.into())), - (key, value) + (key, value), ]; - match run_query!(&db, format!(":update elements {{id => {key}}}"), params, ScriptMutability::Mutable) { + match run_query!( + &db, + format!(":update elements {{id => {key}}}"), + params, + ScriptMutability::Mutable + ) { Ok(_) => Ok(()), Err(report) => bail!(report), } -} \ No newline at end of file +} diff --git a/src/state/queries/mod.rs b/src/state/queries/mod.rs index 9799dcb..82a6329 100644 --- a/src/state/queries/mod.rs +++ b/src/state/queries/mod.rs @@ -4,47 +4,47 @@ pub mod peers; #[macro_export] macro_rules! build_query { - ($payload:expr, $params:expr) => { - { - use cozo::DataValue; - use std::collections::BTreeMap; - // Build parameters map - let mut params_map: BTreeMap = Default::default(); - let mut parameters_init = String::new(); + ($payload:expr, $params:expr) => {{ + use cozo::DataValue; + use std::collections::BTreeMap; + // Build parameters map + let mut params_map: BTreeMap = Default::default(); + let mut parameters_init = String::new(); - if $params.len() > 0 { - for (name, value) in $params { - let _: &str = name; // only for type annotation - params_map.insert(name.to_string(), value); - } - - // First line: Initialize parameters, make them available in CozoScript - use itertools::Itertools; - parameters_init += "?["; - parameters_init += ¶ms_map.iter().map(|(name, _)| name).format(", ").to_string(); - parameters_init += "] <- [["; - parameters_init += ¶ms_map.iter().map(|(name, _)| format!("${}", name)).format(", ").to_string(); - parameters_init += "]]"; + if $params.len() > 0 { + for (name, value) in $params { + let _: &str = name; // only for type annotation + params_map.insert(name.to_string(), value); } - // Return query string and parameters map - ( - format!("{}\n\n{}", parameters_init, $payload), - params_map - ) + // First line: Initialize parameters, make them available in CozoScript + use itertools::Itertools; + parameters_init += "?["; + parameters_init += ¶ms_map + .iter() + .map(|(name, _)| name) + .format(", ") + .to_string(); + parameters_init += "] <- [["; + parameters_init += ¶ms_map + .iter() + .map(|(name, _)| format!("${}", name)) + .format(", ") + .to_string(); + parameters_init += "]]"; } - }; -} + // Return query string and parameters map + (format!("{}\n\n{}", parameters_init, $payload), params_map) + }}; +} use build_query; #[macro_export] macro_rules! run_query { - ($db:expr, $payload:expr, $params:expr, $mutability:expr) => { - { - let (query, parameters) = crate::state::queries::build_query!($payload, $params); - $db.run_script(query.as_str(), parameters, $mutability) - } - }; + ($db:expr, $payload:expr, $params:expr, $mutability:expr) => {{ + let (query, parameters) = crate::state::queries::build_query!($payload, $params); + $db.run_script(query.as_str(), parameters, $mutability) + }}; } diff --git a/src/state/queries/peers.rs b/src/state/queries/peers.rs index eebfe89..2499a44 100644 --- a/src/state/queries/peers.rs +++ b/src/state/queries/peers.rs @@ -1,47 +1,49 @@ use anyhow::Error; -use cozo::{DbInstance, DataValue, ScriptMutability}; - -use crate::{state::types::PeerId, comm::Peer, run_query}; - +use cozo::{DataValue, DbInstance, ScriptMutability}; +use crate::{comm::Peer, run_query, state::types::PeerId}; pub fn put(db: &DbInstance, id: &PeerId, name: &str) -> anyhow::Result<()> { let params = vec![ ("id", DataValue::Str(serde_json::to_string(id)?.into())), - ("name", DataValue::Str(serde_json::to_string(name)?.into())) + ("name", DataValue::Str(serde_json::to_string(name)?.into())), ]; - - match run_query!(&db, ":put peers {id => name}", params, ScriptMutability::Mutable) { + + match run_query!( + &db, + ":put peers {id => name}", + params, + ScriptMutability::Mutable + ) { Ok(_) => Ok(()), Err(report) => Err(Error::msg(format!("Query failed: {}", report))), } } pub fn get(db: &DbInstance) -> anyhow::Result> { - let result = db.run_script(" + let result = db.run_script( + " ?[id, name] := *peers{id, name} - ", Default::default(), cozo::ScriptMutability::Immutable); + ", + Default::default(), + cozo::ScriptMutability::Immutable, + ); match result { - Ok(rows) => { - Ok( - rows.rows.into_iter().map( - |row| { - match row.as_slice() { - [DataValue::Str(id_string), DataValue::Str(name_string)] => { - if let Ok(id) = serde_json::from_str(&id_string) { - Some(Peer::new(id, name_string.as_str().to_string())) - } - else { - None - }}, - _ => None - } + Ok(rows) => Ok(rows + .rows + .into_iter() + .map(|row| match row.as_slice() { + [DataValue::Str(id_string), DataValue::Str(name_string)] => { + if let Ok(id) = serde_json::from_str(&id_string) { + Some(Peer::new(id, name_string.as_str().to_string())) + } else { + None } - ) - .flatten() - .collect() - ) - }, + } + _ => None, + }) + .flatten() + .collect()), Err(report) => Err(Error::msg(format!("Query failed: {}", report))), } } diff --git a/src/state/schema.rs b/src/state/schema.rs index 72a9572..2d1169c 100644 --- a/src/state/schema.rs +++ b/src/state/schema.rs @@ -3,11 +3,10 @@ use std::collections::BTreeMap; use anyhow::Error; use cozo::DbInstance; - - pub fn add_schema(db: &DbInstance) -> anyhow::Result<()> { let params = BTreeMap::new(); - match db.run_script(" + match db.run_script( + " {:create apps { id: String, => @@ -31,8 +30,11 @@ pub fn add_schema(db: &DbInstance) -> anyhow::Result<()> { tag: String, element: String, }} - ", params, cozo::ScriptMutability::Mutable) { + ", + params, + cozo::ScriptMutability::Mutable, + ) { Ok(_) => Ok(()), Err(e) => Err(Error::msg(format!("Failed to set up schema: {}", e))), } -} \ No newline at end of file +} diff --git a/src/state/types/element.rs b/src/state/types/element.rs index b8861db..269d951 100644 --- a/src/state/types/element.rs +++ b/src/state/types/element.rs @@ -1,8 +1,6 @@ -use serde::{Serialize, Deserialize}; - -use super::{ElementId, ElementContent, MessageId}; - +use serde::{Deserialize, Serialize}; +use super::{ElementContent, ElementId, MessageId}; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Element { @@ -15,14 +13,24 @@ pub struct Element { impl From<(ElementId, ElementContent, Option, bool)> for Element { fn from(value: (ElementId, ElementContent, Option, bool)) -> Self { - Element { id: value.0, content: value.1, latest_message: value.2, local_changes: value.3 } + Element { + id: value.0, + content: value.1, + latest_message: value.2, + local_changes: value.3, + } } } impl Element { pub fn new(id: ElementId, content: ElementContent) -> Self { // A new element with no latest message must have local changes - Element { id: id, content: content, latest_message: None, local_changes: true } + Element { + id: id, + content: content, + latest_message: None, + local_changes: true, + } } pub fn id(&self) -> &ElementId { @@ -31,4 +39,4 @@ impl Element { pub fn content(&self) -> &ElementContent { &self.content } -} \ No newline at end of file +} diff --git a/src/state/types/element_content.rs b/src/state/types/element_content.rs index 8655273..42a4d55 100644 --- a/src/state/types/element_content.rs +++ b/src/state/types/element_content.rs @@ -1,7 +1,4 @@ - -use serde::{Serialize, Deserialize}; - - +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum ElementContent { @@ -13,7 +10,7 @@ impl TryFrom for String { fn try_from(value: ElementContent) -> Result { match serde_json::to_string(&value) { Ok(s) => Ok(s), - Err(e) => Err(e) + Err(e) => Err(e), } } } @@ -23,7 +20,7 @@ impl TryFrom<&str> for ElementContent { fn try_from(value: &str) -> Result { match serde_json::from_str(value) { Ok(ec) => Ok(ec), - Err(e) => Err(e) + Err(e) => Err(e), } } -} \ No newline at end of file +} diff --git a/src/state/types/element_id.rs b/src/state/types/element_id.rs index 2f55f8a..e6c6a46 100644 --- a/src/state/types/element_id.rs +++ b/src/state/types/element_id.rs @@ -1,4 +1,4 @@ -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -26,4 +26,4 @@ impl TryFrom<&str> for ElementId { fn try_from(value: &str) -> Result { serde_json::from_str(value) } -} \ No newline at end of file +} diff --git a/src/state/types/message_id.rs b/src/state/types/message_id.rs index 9a2520c..8d253d7 100644 --- a/src/state/types/message_id.rs +++ b/src/state/types/message_id.rs @@ -1,10 +1,9 @@ -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct MessageId(Uuid); - impl MessageId { pub fn new() -> Self { MessageId(Uuid::new_v4()) @@ -28,4 +27,4 @@ impl TryFrom<&str> for MessageId { fn try_from(value: &str) -> Result { serde_json::from_str(value) } -} \ No newline at end of file +} diff --git a/src/state/types/peer_id.rs b/src/state/types/peer_id.rs index ed619e9..14f6d70 100644 --- a/src/state/types/peer_id.rs +++ b/src/state/types/peer_id.rs @@ -1,7 +1,6 @@ use anyhow::bail; use i2p::net::{I2pSocketAddr, ToI2pSocketAddrs}; -use serde::{Serialize, Deserialize}; - +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PeerId { @@ -30,7 +29,7 @@ impl TryFrom<&str> for PeerId { return Ok(PeerId { i2p_addr: addr }); } return Err(anyhow::Error::msg("No valid I2P address found")); - }, + } Err(e) => bail!(e), } } @@ -54,4 +53,4 @@ impl From for I2pSocketAddr { fn from(value: PeerId) -> Self { value.i2p_addr } -} \ No newline at end of file +} diff --git a/src/state/types/tag.rs b/src/state/types/tag.rs index 2c73e9d..c85c566 100644 --- a/src/state/types/tag.rs +++ b/src/state/types/tag.rs @@ -1,7 +1,6 @@ -use serde::{Serialize, Deserialize}; - +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Tag { tag: String, -} \ No newline at end of file +} diff --git a/tests/api.rs b/tests/api.rs index 74c970b..42d773c 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -1,40 +1,86 @@ use std::time::Duration; -use tracing::{Level, debug}; -use ubisync::{Ubisync, config::Config, state::types::{ElementContent, Element, ElementId}, api::AppDescription}; - +use tracing::{debug, Level}; +use ubisync::{ + api::v0::app::AppDescription, + config::Config, + state::types::{Element, ElementContent, ElementId}, + Ubisync, +}; #[tokio::test(flavor = "multi_thread")] async fn two_nodes_element_creation() { - tracing_subscriber::fmt().pretty().with_max_level(Level::DEBUG).init(); + tracing_subscriber::fmt() + .pretty() + .with_max_level(Level::DEBUG) + .init(); // Two nodes need to bind to different ports let mut c2 = Config::default(); c2.api_config.port = Some(9982); let ubi1 = Ubisync::new(&Config::default()).await.unwrap(); let ubi2 = Ubisync::new(&c2).await.unwrap(); - ubi1.add_peer_from_id(ubi2.get_destination().unwrap().into()).unwrap(); - + ubi1.add_peer_from_id(ubi2.get_destination().unwrap().into()) + .unwrap(); + let http_client = reqwest::Client::new(); - let register_response = http_client.put("http://localhost:9981/v0/app/register").json(&AppDescription{name: "Test".to_string(), desc_text: "desc".to_string()}).send().await.unwrap(); - let jwt1 = register_response.text().await.expect("Couldn't fetch token from response"); - let register_response = http_client.put("http://localhost:9982/v0/app/register").json(&AppDescription{name: "Test".to_string(), desc_text: "desc".to_string()}).send().await.unwrap(); - let jwt2 = register_response.text().await.expect("Couldn't fetch token from response"); + let register_response = http_client + .put("http://localhost:9981/v0/app/register") + .json(&AppDescription { + name: "Test".to_string(), + desc_text: "desc".to_string(), + }) + .send() + .await + .unwrap(); + let jwt1 = register_response + .text() + .await + .expect("Couldn't fetch token from response"); + let register_response = http_client + .put("http://localhost:9982/v0/app/register") + .json(&AppDescription { + name: "Test".to_string(), + desc_text: "desc".to_string(), + }) + .send() + .await + .unwrap(); + let jwt2 = register_response + .text() + .await + .expect("Couldn't fetch token from response"); let test_element_content = ElementContent::Text("Text".to_string()); - let put_resp = http_client.put(&format!("http://localhost:9981/v0/element")).json(&test_element_content).header("Authorization", &format!("Bearer {}", &jwt1)).send().await.unwrap(); + let put_resp = http_client + .put(&format!("http://localhost:9981/v0/element")) + .json(&test_element_content) + .header("Authorization", &format!("Bearer {}", &jwt1)) + .send() + .await + .unwrap(); debug!("{:?}", &put_resp); let put_resp_text = put_resp.text().await.expect("No put response body"); debug!("{}", put_resp_text); - let id = serde_json::from_str::(&put_resp_text).expect("Could not deserialize ElementId"); + let id = + serde_json::from_str::(&put_resp_text).expect("Could not deserialize ElementId"); tokio::time::sleep(Duration::from_millis(3000)).await; - - let get_resp = http_client.get(&format!("http://localhost:9982/v0/element/{}", Into::::into(&id))).header("Authorization", &format!("Bearer {}", &jwt2)).send().await.expect("Get request failed"); + + let get_resp = http_client + .get(&format!( + "http://localhost:9982/v0/element/{}", + Into::::into(&id) + )) + .header("Authorization", &format!("Bearer {}", &jwt2)) + .send() + .await + .expect("Get request failed"); let get_resp_text = get_resp.text().await.expect("No get request body"); debug!("{}", get_resp_text); - let received_element = serde_json::from_str::(&get_resp_text).expect("Could not deserialize Element"); + let received_element = + serde_json::from_str::(&get_resp_text).expect("Could not deserialize Element"); debug!("Other node received this element: {:?}", received_element); - + assert_eq!(&test_element_content, received_element.content()); std::process::exit(0); }