diff --git a/src/api/mod.rs b/src/api/mod.rs index a853aef..869f1ea 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use axum::Router; use tokio::{net::TcpListener, task::JoinHandle}; -use crate::{state::State, config::ApiConfig}; +use crate::{state::{State, ApiState}, config::ApiConfig}; mod v0; @@ -40,11 +40,11 @@ impl From for ApiBuilder { } impl ApiBuilder { - pub async fn build(&self, state: Arc) -> Api { + pub async fn build(&self, state: ApiState) -> Api { let mut app: Router = Router::new(); match &self.version { - Some(v) if v == "v0" => app = app.nest(&format!("/{}", v), v0::get_router(state.clone())), - _ => app = app.nest("/v0", v0::get_router(state.clone())), + Some(v) if v == "v0" => app = app.nest(&format!("/{}", v), v0::get_router(state)), + _ => app = app.nest("/v0", v0::get_router(state)), } let ip = match &self.bind_ip { diff --git a/src/api/v0.rs b/src/api/v0.rs index 480e66b..3741745 100644 --- a/src/api/v0.rs +++ b/src/api/v0.rs @@ -1,44 +1,49 @@ use std::sync::Arc; use axum::{Router, routing::{put, get}, extract::{Path, Json}, Extension, response::{IntoResponse, Response}, http::StatusCode}; +use tracing::{warn, debug}; -use crate::state::{State, types::{ElementId, ElementContent}}; +use crate::state::{types::{ElementId, ElementContent}, ApiState}; -pub fn get_router(state: Arc) -> Router { +pub fn get_router(state: ApiState) -> Router { Router::new() .route("/element", put(create_element)) .route("/element/:id", get(get_element).post(set_element).delete(remove_element)) - .layer(Extension(state)) + .layer(Extension(Arc::new(state))) } -async fn get_element(Path(id): Path, s: Extension>) -> Response { +async fn get_element(Path(id): Path, s: Extension>) -> Response { let element = s.get_element(&id); match element { - Some(el) => (StatusCode::OK, Json{0: el}).into_response(), - None => StatusCode::NOT_FOUND.into_response(), + Ok(el) => (StatusCode::OK, Json{0: el}).into_response(), + Err(e) => { + warn!("Element not found:\n{:?}", e); + StatusCode::NOT_FOUND.into_response() + } } } -async fn create_element(s: Extension>, Json(content): Json) -> Response { +async fn create_element(s: Extension>, Json(content): Json) -> Response { let element_id = s.create_element(&content); + debug!("{:?}", element_id); match element_id { Ok(id) => (StatusCode::OK, Json{0: &Into::::into(&id)}).into_response(), Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), } } -async fn set_element(Path(id): Path, s: Extension>, Json(content): Json) -> Response { - let res = s.set_element_content(&id, &content); +async fn set_element(Path(id): Path, s: Extension>, Json(content): Json) -> Response { + let res = s.write_element_content(&id, &content); match res { Ok(_) => StatusCode::OK.into_response(), Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), } } -async fn remove_element(Path(id): Path, s: Extension>) -> Response { +async fn remove_element(Path(id): Path, s: Extension>) -> Response { let res = s.remove_element(&id); match res { Ok(_) => StatusCode::OK.into_response(), diff --git a/src/comm/message_processor.rs b/src/comm/message_processor.rs index a16ef78..94d1979 100644 --- a/src/comm/message_processor.rs +++ b/src/comm/message_processor.rs @@ -1,24 +1,22 @@ -use std::sync::Arc; - use tracing::debug; -use crate::state::{State, types::PeerId}; +use crate::state::{types::PeerId, CommState}; -use super::{messages::{Message, MessageContent}, Peer}; +use super::messages::{Message, MessageContent}; -pub fn handle(state: Arc, peer: &PeerId, message: Message) { +pub fn handle(state: &CommState, peer: &PeerId, message: Message) { debug!("Handling message now: {:?}", message); match message.content() { MessageContent::Hello { peer_name } => { - state.set_peer(&Peer::new(peer.clone(), peer_name.clone())).expect("Couldn't set peer"); + state.set_peer(peer, peer_name).expect("State failed"); }, MessageContent::CreateElement { id, content } => { - state.set_element(id, content).expect("State failed"); + state.add_received_element(id, content, message.id()).expect("State failed"); }, MessageContent::SetElement { id, content } => { - state.set_element(id, content).expect("State failed"); + state.update_element_content(id, content, message.id()).expect("State failed"); }, MessageContent::RemoveElement { id } => { state.remove_element(id).expect("State failed"); diff --git a/src/comm/mod.rs b/src/comm/mod.rs index 4b7dc2d..c4c731a 100644 --- a/src/comm/mod.rs +++ b/src/comm/mod.rs @@ -5,6 +5,7 @@ use tracing::{warn, error, debug}; pub use types::*; use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use std::io::{Read, Write}; @@ -15,13 +16,13 @@ use tokio::sync::RwLock; use tokio::task::JoinHandle; use crate::Config; -use crate::state::State; +use crate::state::CommState; use crate::state::types::PeerId; use self::messages::Message; pub struct CommHandle { - state: Arc, + state: Arc, i2p_server: Arc, // Maps peer addresses to existing connections to them clients: Arc>>>>, @@ -29,7 +30,7 @@ pub struct CommHandle { } impl CommHandle { - pub fn new(state: Arc, config: &Config) -> anyhow::Result { + pub fn new(state: CommState, config: &Config) -> anyhow::Result { let mut listener_builder = I2pListenerBuilder::default() .with_options(SAMOptions::default()); @@ -42,7 +43,7 @@ impl CommHandle { .unwrap(); Ok(CommHandle { - state: state, + state: Arc::new(state), i2p_server: Arc::new(listener), clients: Default::default(), thread: RwLock::new(None), @@ -155,7 +156,7 @@ impl CommHandle { Ok(i2p_dest) } - fn read_connection(wrapped_stream: Arc>, state: Arc) -> JoinHandle<()> { + fn read_connection(wrapped_stream: Arc>, state: Arc) -> JoinHandle<()> { tokio::spawn(async move { let mut stream = wrapped_stream.write().await; let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into(); @@ -172,7 +173,7 @@ impl CommHandle { Ok(value) => { match serde_json::from_value::(value) { Ok(message) => { - message_processor::handle(state.clone(), &peer, message); + message_processor::handle(state.deref(), &peer, message); }, Err(e) => warn!("Deserialization failed: {:?}", e), } @@ -200,14 +201,14 @@ mod tests { use i2p::sam_options::SAMOptions; use crate::Config; - use crate::state::State; + use crate::state::{State, CommState}; use crate::comm::{messages, Message}; use crate::state::types::ElementId; use super::CommHandle; #[tokio::test(flavor = "multi_thread")] pub async fn msg() { - let ch = CommHandle::new(State::new().await.unwrap(), &Config::default() ).unwrap(); + let ch = CommHandle::new(CommState::new(State::new().await.unwrap()), &Config::default() ).unwrap(); ch.run().await; println!("My address: {:?}", ch.i2p_b32_address()); diff --git a/src/lib.rs b/src/lib.rs index 5c910c4..71d2dc9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ use comm::{CommHandle, Peer}; use config::Config; use i2p::net::I2pSocketAddr; use serde::{Serialize, Deserialize}; -use state::{State, types::{ElementContent, ElementId, MessageId, PeerId}}; +use state::{State, types::{ElementContent, ElementId, MessageId, PeerId}, CommState, ApiState}; mod api; pub mod comm; @@ -28,10 +28,10 @@ pub struct Ubisync { impl Ubisync { pub async fn new(config: &Config) -> anyhow::Result { let state = State::new().await?; - let comm_handle = Arc::new(CommHandle::new(state.clone(), config)?); + let comm_handle = Arc::new(CommHandle::new(CommState::new(state.clone()), config)?); state.set_comm_handle(comm_handle.clone()); - let api = Arc::new(ApiBuilder::from(config.api_config.clone()).build(state.clone()).await); + let api = Arc::new(ApiBuilder::from(config.api_config.clone()).build(ApiState::new(state.clone())).await); comm_handle.run().await; Ok(Ubisync { @@ -64,10 +64,6 @@ impl Ubisync { pub fn get_destination(&self) -> anyhow::Result { self.comm_handle.i2p_address() } - - pub fn create_element(&self, content: &ElementContent) -> anyhow::Result { - self.state_handle.create_element(content) - } } diff --git a/src/state/api_state.rs b/src/state/api_state.rs new file mode 100644 index 0000000..d80447f --- /dev/null +++ b/src/state/api_state.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use cozo::DbInstance; +use tracing::debug; + +use crate::{state::{types::ElementId, queries}, comm::messages::MessageContent}; + +use super::{State, types::{ElementContent, Element}}; + + + +pub struct ApiState { + state: Arc, +} + +impl ApiState { + pub fn new(state: Arc) -> Self { + ApiState { state: state } + } + + pub fn create_element(&self, content: &ElementContent) -> anyhow::Result { + let id = ElementId::new(); + queries::elements::add(self.db(), &id, &content, None, true)?; + debug!("Added element {{{}}}", &id.to_string()); + + self.state.send_to_peers(MessageContent::CreateElement { id: id.clone(), content: content.clone() }); + Ok(id) + } + + pub fn write_element_content(&self, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { + queries::elements::set_content(self.db(), id, content)?; + queries::elements::set_local_changes(self.db(), id, true)?; + debug!("Wrote element content {{{}}}", &id.to_string()); + + Ok(()) + } + + pub fn remove_element(&self, id: &ElementId) -> anyhow::Result<()> { + let res = self.state.remove_element(id); + debug!("Removed element {{{}}}", &id.to_string()); + + res + } + + pub fn get_element(&self, id: &ElementId) -> anyhow::Result { + self.state.get_element(id) + } + + + fn db(&self) -> &DbInstance { + &self.state.db + } +} + +#[cfg(test)] +mod tests { + use super::ApiState; + use crate::state::{types::ElementContent, State}; + + #[tokio::test] + #[serial_test::serial] + async fn test_element_create() { + tracing_subscriber::fmt().pretty().init(); + let state = ApiState::new(State::new().await.unwrap()); + let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap(); + let el = state.get_element(&id).unwrap(); + assert_eq!( + ElementContent::Text("Test-text".to_string()), + el.content().to_owned() + ) + } + + #[tokio::test] + #[serial_test::serial] + async fn test_element_write() { + tracing_subscriber::fmt().pretty().init(); + let state = ApiState::new(State::new().await.unwrap()); + let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap(); + state.write_element_content(&id,&ElementContent::Text("Test-text 2".to_string())).unwrap(); + let el = state.get_element(&id).unwrap(); + assert_eq!( + ElementContent::Text("Test-text 2".to_string()), + el.content().to_owned() + ) + } +} diff --git a/src/state/comm_state.rs b/src/state/comm_state.rs new file mode 100644 index 0000000..4884c62 --- /dev/null +++ b/src/state/comm_state.rs @@ -0,0 +1,98 @@ +use std::sync::Arc; + +use cozo::DbInstance; +use tracing::debug; + +use crate::{state::queries, comm::Peer}; + +use super::{State, types::{MessageId, ElementContent, ElementId, PeerId, Element}}; + + +//TODO: Notify API about changes +pub struct CommState { + state: Arc +} + +impl CommState { + pub fn new(state: Arc) -> Self { + CommState { state: state } + } + + pub fn add_received_element(&self, id: &ElementId, content: &ElementContent, latest_message: &MessageId) -> anyhow::Result<()> { + queries::elements::add(self.db(), &id, &content, Some(latest_message.to_owned()), false)?; + debug!("Added element {{{}}}", &id.to_string()); + + Ok(()) + } + + pub fn update_element_content(&self, id: &ElementId, content: &ElementContent, latest_message: &MessageId) -> anyhow::Result<()> { + //TODO: resolve potential conflicts with local changes + queries::elements::set_content(self.db(), id, content)?; + queries::elements::set_latest_message(self.db(), id, Some(latest_message.to_owned()))?; + debug!("Updated element {{{}}}", &id.to_string()); + + Ok(()) + } + + pub fn remove_element(&self, id: &ElementId) -> anyhow::Result<()> { + let res = self.state.remove_element(id); + debug!("Removed element {{{}}}", &id.to_string()); + + res + } + + pub fn get_element(&self, id: &ElementId) -> anyhow::Result { + self.state.get_element(id) + } + + pub fn set_peer(&self, id: &PeerId, name: &str) -> anyhow::Result<()> { + queries::peers::put(self.db(), id, name)?; + debug!("Set peer {{{}}}", id.to_string()); + + Ok(()) + } + + pub fn get_peers(&self) -> anyhow::Result> { + self.state.get_peers() + } + + + fn db(&self) -> &DbInstance { + &self.state.db + } +} + +#[cfg(test)] +mod tests { + use super::CommState; + use crate::state::{types::{ElementContent, ElementId, MessageId}, State}; + + #[tokio::test] + #[serial_test::serial] + async fn test_element_add() { + tracing_subscriber::fmt().pretty().init(); + let state = CommState::new(State::new().await.unwrap()); + let id = ElementId::new(); + state.add_received_element(&id, &ElementContent::Text("Test-text".to_string()), &MessageId::new()).unwrap(); + let el = state.get_element(&id).unwrap(); + assert_eq!( + ElementContent::Text("Test-text".to_string()), + el.content().to_owned() + ) + } + + #[tokio::test] + #[serial_test::serial] + async fn test_element_update() { + tracing_subscriber::fmt().pretty().init(); + let state = CommState::new(State::new().await.unwrap()); + let id = ElementId::new(); + state.add_received_element(&id, &ElementContent::Text("Test-text".to_string()), &MessageId::new()).unwrap(); + state.update_element_content(&id,&ElementContent::Text("Test-text 2".to_string()), &MessageId::new()).unwrap(); + let el = state.get_element(&id).unwrap(); + assert_eq!( + ElementContent::Text("Test-text 2".to_string()), + el.content().to_owned() + ) + } +} diff --git a/src/state/mod.rs b/src/state/mod.rs index 84387ca..8f00bc7 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -10,9 +10,14 @@ use self::types::{ElementContent, ElementId, Element, Tag}; pub mod types; +mod api_state; +mod comm_state; mod queries; mod schema; +pub use api_state::ApiState; +pub use comm_state::CommState; + pub struct State { db: DbInstance, comm_handle: RwLock>>, @@ -34,58 +39,37 @@ impl State { *self.comm_handle.write().as_deref_mut().expect("Could not set state's CommHandle") = Some(handle); } - - // Create an element and add it to the database - pub fn create_element(&self, content: &ElementContent) -> anyhow::Result { - let id = ElementId::new(); - queries::set_element(&self.db, &id, &content)?; - debug!("Created element with id {:?}: {:?}", &id, self.get_element(&id)); - - self.send_to_peers(MessageContent::CreateElement { id: id.clone(), content: content.clone() }); - Ok(id) - } - - // Anyone updated an element, update it in the database - pub fn set_element(&self, element_id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { - let res = queries::set_element(&self.db, element_id, content); - debug!("Set element with id {:?}: {:?}", element_id, self.get_element(element_id)); - - self.send_to_peers(MessageContent::SetElement { id: element_id.clone(), content: content.clone() }); - res - } - pub fn set_element_content(&self, element_id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { - let res = queries::set_element_content(&self.db, element_id, content); - debug!("Set element content with id {:?}: {:?}", element_id, self.get_element(element_id)); - + let res = queries::elements::set_content(&self.db, element_id, content); + debug!("Set content of element with id {:?}: {:?}", element_id, self.get_element(element_id)); self.send_to_peers(MessageContent::SetElement { id: element_id.clone(), content: content.clone() }); res } pub fn remove_element(&self, element_id: &ElementId) -> anyhow::Result<()> { - let res = queries::remove_element(&self.db, element_id); + let res = queries::elements::remove(&self.db, element_id); self.send_to_peers(MessageContent::RemoveElement { id: element_id.clone() }); res } - pub fn get_element(&self, id: &ElementId) -> Option { - queries::get_element(&self.db, id).ok() + pub fn get_element(&self, id: &ElementId) -> anyhow::Result { + queries::elements::get(&self.db, id) } pub fn get_elements_by_tag(&self, tag: &Tag) -> Vec { - queries::get_elements_by_tag(&self.db, tag) + queries::elements::get_by_tag(&self.db, tag) .map_err(|e| {error!("{}", e); e}) .unwrap_or(vec![]) } pub fn set_peer(&self, peer: &Peer) -> anyhow::Result<()> { - queries::add_peer(&self.db, &peer.id(), &peer.name()) + queries::peers::put(&self.db, &peer.id(), &peer.name()) } pub fn get_peers(&self) -> anyhow::Result> { - queries::get_peers(&self.db) + queries::peers::get(&self.db) } @@ -104,37 +88,3 @@ impl State { } } } - - -#[cfg(test)] -mod tests { - use crate::state::State; - use crate::state::types::ElementContent; - - #[tokio::test] - #[serial_test::serial] - async fn test_create() { - tracing_subscriber::fmt().pretty().init(); - let state = State::new().await.unwrap(); - let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap(); - let el = state.get_element(&id).unwrap(); - assert_eq!( - ElementContent::Text("Test-text".to_string()), - el.content().to_owned() - ) - } - - #[tokio::test] - #[serial_test::serial] - async fn test_update() { - tracing_subscriber::fmt().pretty().init(); - let state = State::new().await.unwrap(); - let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap(); - state.set_element(&id,&ElementContent::Text("Test-text 2".to_string())).unwrap(); - let el = state.get_element(&id).unwrap(); - assert_eq!( - ElementContent::Text("Test-text 2".to_string()), - el.content().to_owned() - ) - } -} diff --git a/src/state/queries/elements.rs b/src/state/queries/elements.rs index 91bac06..b82770c 100644 --- a/src/state/queries/elements.rs +++ b/src/state/queries/elements.rs @@ -3,65 +3,82 @@ use std::collections::BTreeMap; use anyhow::{Error, bail}; use cozo::{DbInstance, DataValue, JsonData, ScriptMutability}; use serde_json::Value; -use tracing::error; +use tracing::{error, debug}; -use crate::{state::{ElementContent, ElementId, Element, types::Tag}, run_query}; +use crate::{state::{ElementContent, ElementId, Element, types::{Tag, MessageId}}, run_query}; -pub fn set_element(db: &DbInstance, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { +pub fn add(db: &DbInstance, id: &ElementId, content: &ElementContent, latest_message: Option, local_changes: bool) -> anyhow::Result<()> { let params = vec![ ("id", DataValue::Str(serde_json::to_string(&id)?.into())), - ("content", DataValue::Str(serde_json::to_string(content)?.into())) + ("content", DataValue::Json(JsonData(serde_json::to_value(content)?))), + ("latest_message", match latest_message { + Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()), + None => DataValue::Null, + }), + ("local_changes", DataValue::Bool(local_changes)), ]; - match run_query!(&db, ":put elements {id => content}", params, ScriptMutability::Mutable) { + match run_query!(&db, ":insert elements {id => content, latest_message, local_changes}", params, ScriptMutability::Mutable) { Ok(_) => Ok(()), Err(report) => bail!(report), } } -pub fn set_element_content(db: &DbInstance, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { - let params = vec![ - ("id", DataValue::Str(serde_json::to_string(&id)?.into())), - ("content", DataValue::Str(serde_json::to_string(content)?.into())) - ]; - - match run_query!(&db, ":put elements {id => content}", params, ScriptMutability::Mutable) { - Ok(_) => Ok(()), - Err(report) => bail!(report), - } +pub fn set_content(db: &DbInstance, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> { + set_property(db, id, "content", DataValue::Json(JsonData(serde_json::to_value(content)?))) } -pub fn remove_element(db: &DbInstance, id: &ElementId) -> anyhow::Result<()> { +pub fn set_latest_message(db: &DbInstance, id: &ElementId, latest_message: Option) -> anyhow::Result<()> { + set_property(db, id, "latest_message", match latest_message { + Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()), + None => DataValue::Null, + }) +} + +pub fn set_local_changes(db: &DbInstance, id: &ElementId, local_changes: bool) -> anyhow::Result<()> { + set_property(db, id, "local_changes", DataValue::Bool(local_changes)) +} + +pub fn remove(db: &DbInstance, id: &ElementId) -> anyhow::Result<()> { match run_query!(&db, ":delete elements {id}", vec![("id", DataValue::Str(serde_json::to_string(&id)?.into()))], cozo::ScriptMutability::Mutable) { Ok(_) => Ok(()), Err(report) => bail!(report), } } -pub fn get_element(db: &DbInstance, id: &ElementId) -> anyhow::Result { +pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result { let mut params = BTreeMap::new(); params.insert("id".to_string(), DataValue::Str(serde_json::to_string(&id)?.into())); let result = db.run_script(" - ?[content] := *elements[$id, content] + ?[content, latest_message, local_changes] := *elements[$id, content, latest_message, local_changes] ", params, cozo::ScriptMutability::Immutable); match result { Ok(val) => { if let Some(firstrow) = val.rows.first() { - if let [ DataValue::Json(JsonData(Value::String(content))) ] = firstrow.as_slice() { - return Ok(Element::new( + debug!("db result: {:?}", &firstrow.as_slice()); + if let [ DataValue::Json(JsonData(content)), latest_message, DataValue::Bool(local_changes) ] = firstrow.as_slice() { + return Ok(Element::from(( id.to_owned(), - content.as_str().try_into()?, - )); + serde_json::from_value(content.to_owned())?, + match latest_message { + DataValue::Str(s) => Some(serde_json::from_str(s)?), + _ => None, + }, + local_changes.to_owned() + ))); } + return Err(Error::msg("Could not parse db result as Element")); + } + else { + return Err(Error::msg("No rows returned for element query")) } - return Err(Error::msg("Could not parse db result as Element")); }, Err(report) => bail!(report), } } -pub fn get_elements_by_tag(db: &DbInstance, tag: &Tag) -> anyhow::Result> { +pub fn get_by_tag(db: &DbInstance, tag: &Tag) -> anyhow::Result> { let mut params = BTreeMap::new(); params.insert("tag".to_string(), DataValue::Str(serde_json::to_string(tag)?.into())); @@ -87,4 +104,17 @@ pub fn get_elements_by_tag(db: &DbInstance, tag: &Tag) -> anyhow::Result bail!(report), } +} + + +fn set_property(db: &DbInstance, id: &ElementId, key: &str, value: DataValue) -> anyhow::Result<()> { + let params = vec![ + ("id", DataValue::Str(serde_json::to_string(id)?.into())), + (key, value) + ]; + + match run_query!(&db, format!(":update elements {{id => {key}}}"), params, ScriptMutability::Mutable) { + Ok(_) => Ok(()), + Err(report) => bail!(report), + } } \ No newline at end of file diff --git a/src/state/queries/mod.rs b/src/state/queries/mod.rs index 6b98d9a..fe8c0a7 100644 --- a/src/state/queries/mod.rs +++ b/src/state/queries/mod.rs @@ -1,9 +1,6 @@ -mod elements; +pub mod elements; -pub use elements::*; - -mod peers; -pub use peers::*; +pub mod peers; #[macro_export] @@ -49,4 +46,4 @@ macro_rules! run_query { $db.run_script(query.as_str(), parameters, $mutability) } }; -} \ No newline at end of file +} diff --git a/src/state/queries/peers.rs b/src/state/queries/peers.rs index 70883fc..3eae8be 100644 --- a/src/state/queries/peers.rs +++ b/src/state/queries/peers.rs @@ -7,7 +7,7 @@ use crate::{state::types::PeerId, comm::Peer, run_query}; -pub fn add_peer(db: &DbInstance, id: &PeerId, name: &str) -> anyhow::Result<()> { +pub fn put(db: &DbInstance, id: &PeerId, name: &str) -> anyhow::Result<()> { let params = vec![ ("id", DataValue::Str(serde_json::to_string(id)?.into())), ("name", DataValue::Str(serde_json::to_string(name)?.into())) @@ -19,7 +19,7 @@ pub fn add_peer(db: &DbInstance, id: &PeerId, name: &str) -> anyhow::Result<()> } } -pub fn get_peers(db: &DbInstance) -> anyhow::Result> { +pub fn get(db: &DbInstance) -> anyhow::Result> { let result = db.run_script(" ?[id, name] := *peers{id, name} ", Default::default(), cozo::ScriptMutability::Immutable); diff --git a/src/state/schema.rs b/src/state/schema.rs index e6d2f0c..e4bd905 100644 --- a/src/state/schema.rs +++ b/src/state/schema.rs @@ -17,10 +17,12 @@ pub fn add_schema(db: &DbInstance) -> anyhow::Result<()> { id: String, => content: Json, + latest_message: String?, + local_changes: Bool, }} {:create tags { tag: String, - element: String + element: String, }} ", params, cozo::ScriptMutability::Mutable) { Ok(_) => Ok(()), diff --git a/src/state/types/element.rs b/src/state/types/element.rs index 0ba4e87..b8861db 100644 --- a/src/state/types/element.rs +++ b/src/state/types/element.rs @@ -1,6 +1,6 @@ use serde::{Serialize, Deserialize}; -use super::{ElementId, ElementContent}; +use super::{ElementId, ElementContent, MessageId}; @@ -8,12 +8,21 @@ use super::{ElementId, ElementContent}; pub struct Element { // Uuid identifying the element itself id: ElementId, - content: ElementContent + content: ElementContent, + latest_message: Option, + local_changes: bool, +} + +impl From<(ElementId, ElementContent, Option, bool)> for Element { + fn from(value: (ElementId, ElementContent, Option, bool)) -> Self { + Element { id: value.0, content: value.1, latest_message: value.2, local_changes: value.3 } + } } impl Element { pub fn new(id: ElementId, content: ElementContent) -> Self { - Element { id: id, content: content } + // A new element with no latest message must have local changes + Element { id: id, content: content, latest_message: None, local_changes: true } } pub fn id(&self) -> &ElementId { diff --git a/src/state/types/element_id.rs b/src/state/types/element_id.rs index 5d73f7e..2f55f8a 100644 --- a/src/state/types/element_id.rs +++ b/src/state/types/element_id.rs @@ -9,6 +9,12 @@ impl ElementId { } } +impl ToString for ElementId { + fn to_string(&self) -> String { + self.0.to_string() + } +} + impl From<&ElementId> for String { fn from(value: &ElementId) -> Self { value.0.to_string() diff --git a/src/state/types/peer_id.rs b/src/state/types/peer_id.rs index ff795d9..ed619e9 100644 --- a/src/state/types/peer_id.rs +++ b/src/state/types/peer_id.rs @@ -14,6 +14,12 @@ impl PeerId { } } +impl ToString for PeerId { + fn to_string(&self) -> String { + self.i2p_addr.to_string() + } +} + impl TryFrom<&str> for PeerId { type Error = anyhow::Error; diff --git a/tests/api.rs b/tests/api.rs index aaa2ac7..71af727 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -19,12 +19,16 @@ async fn two_nodes_element_creation() { let test_element_content = ElementContent::Text("Text".to_string()); let put_resp = http_client.put(&format!("http://localhost:9981/v0/element")).json(&test_element_content).send().await.unwrap(); debug!("{:?}", &put_resp); - let id = serde_json::from_str::(&put_resp.text().await.expect("No put response body")).expect("Could not deserialize ElementId"); + let put_resp_text = put_resp.text().await.expect("No put response body"); + debug!("{}", put_resp_text); + let id = serde_json::from_str::(&put_resp_text).expect("Could not deserialize ElementId"); tokio::time::sleep(Duration::from_millis(3000)).await; let get_resp = http_client.get(&format!("http://localhost:9982/v0/element/{}", Into::::into(&id))).send().await.expect("Get request failed"); - let received_element = serde_json::from_str::(&get_resp.text().await.expect("No get request body")).expect("Could not deserialize Element"); + let get_resp_text = get_resp.text().await.expect("No get request body"); + debug!("{}", get_resp_text); + let received_element = serde_json::from_str::(&get_resp_text).expect("Could not deserialize Element"); debug!("Other node received this element: {:?}", received_element); assert_eq!(&test_element_content, received_element.content());