From ec0a55b2866ede740185243b590bde136a67117c Mon Sep 17 00:00:00 2001 From: "Philip (a-0)" <@ph:a-0.me> Date: Sun, 25 Feb 2024 12:35:35 +0100 Subject: [PATCH 1/2] Store own `PeerId` in `CommHandle` --- ubisync/src/comm/mod.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ubisync/src/comm/mod.rs b/ubisync/src/comm/mod.rs index 75412f7..9dcc4b4 100644 --- a/ubisync/src/comm/mod.rs +++ b/ubisync/src/comm/mod.rs @@ -22,7 +22,7 @@ use crate::Config; pub struct CommHandle { state: Arc, i2p_server: Arc, - peer_id: RwLock, + peer_id: PeerId, // Maps peer addresses to existing connections to them clients: Arc>>>>, thread: RwLock>>, @@ -38,12 +38,13 @@ impl CommHandle { } let listener = listener_builder.build().unwrap(); - let own_peer_id: PeerId = (&listener).local_addr().map_err(|e| anyhow!(e))?.into(); + let mut own_peer_id: PeerId = (&listener).local_addr().map_err(|e| anyhow!(e))?.into(); + own_peer_id.b32_addr(); Ok(CommHandle { state: Arc::new(state), i2p_server: Arc::new(listener), - peer_id: RwLock::new(own_peer_id), + peer_id: own_peer_id, clients: Default::default(), thread: RwLock::new(None), }) @@ -104,6 +105,7 @@ impl CommHandle { } pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> { + debug!("To '{dest:?}': '{msg:?}"); match serde_json::to_string(&msg) { Ok(msg_string) => { self.send_to_addr(dest, msg_string.as_bytes()).await?; @@ -116,6 +118,7 @@ impl CommHandle { pub async fn send_to_addr(&self, addr: &I2pSocketAddr, msg: &[u8]) -> anyhow::Result<()> { // Create client for this connection if necessary if !self.clients.read().await.contains_key(addr) { + debug!("No client exists for requested connection, creating one"); match I2pStream::connect(addr) { Ok(client) => { //client.inner.sam.conn.set_nodelay(true)?; @@ -153,15 +156,15 @@ impl CommHandle { } pub fn i2p_address(&self) -> I2pSocketAddr { - self.peer_id.blocking_read().addr() + self.peer_id.addr() } pub fn i2p_b32_address(&self) -> anyhow::Result { - self.peer_id.blocking_write().b32_addr() + self.peer_id.b32_addr_nocache() } pub fn own_peer_id(&self) -> anyhow::Result { - Ok(self.peer_id.blocking_read().to_owned()) + Ok(self.peer_id.to_owned()) } fn read_connection( From 636aff64b949d15983d94563879b1a3372434a2c Mon Sep 17 00:00:00 2001 From: "Philip (a-0)" <@ph:a-0.me> Date: Sat, 23 Mar 2024 17:46:25 +0100 Subject: [PATCH 2/2] Basic family join/leave implementation, tests not fully working --- ubisync-lib/src/messages/mod.rs | 9 +- ubisync-lib/src/types/peer_id.rs | 12 +- ubisync/src/comm/message_processor.rs | 34 +++++- ubisync/src/comm/mod.rs | 42 +++---- ubisync/src/lib.rs | 80 +++++++++++++- ubisync/src/node_events/mod.rs | 12 +- ubisync/src/state/api_state.rs | 19 +++- ubisync/src/state/comm_state.rs | 31 +++++- .../database/collections/peer_families.rs | 104 ++++++++++++------ .../src/state/database/collections/peers.rs | 27 +++-- .../database/collections/pot_memberships.rs | 24 ++-- .../src/state/database/collections/pots.rs | 40 ++++--- ubisync/src/state/database/mod.rs | 52 ++++++++- ubisync/src/state/mod.rs | 94 ++++++++++------ 14 files changed, 429 insertions(+), 151 deletions(-) diff --git a/ubisync-lib/src/messages/mod.rs b/ubisync-lib/src/messages/mod.rs index 679f897..64e4a34 100644 --- a/ubisync-lib/src/messages/mod.rs +++ b/ubisync-lib/src/messages/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use crate::types::{ElementContent, ElementId, MessageId, PotId}; +use crate::types::{ElementContent, ElementId, Family, FamilyId, MessageId, PotId}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Message { @@ -19,6 +19,11 @@ pub enum MessageContent { Hello { peer_name: String, }, + JoinFamily, + AddedToFamily { + family: Family, + }, + LeaveFamily, CreateElement { id: ElementId, content: ElementContent, @@ -34,7 +39,7 @@ pub enum MessageContent { AddPot { id: PotId, app_type: String, - } + }, } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/ubisync-lib/src/types/peer_id.rs b/ubisync-lib/src/types/peer_id.rs index 45a275b..5cf7e29 100644 --- a/ubisync-lib/src/types/peer_id.rs +++ b/ubisync-lib/src/types/peer_id.rs @@ -1,8 +1,10 @@ +use std::hash::Hash; + use anyhow::{anyhow, bail}; use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct PeerId { i2p_dest: I2pSocketAddr, i2p_b32: Option, @@ -27,6 +29,14 @@ impl PeerId { } } +// The identity of the PeerId only depends on the i2p_dest (which is unique), +// and not on whether the b32 address has been computed before +impl Hash for PeerId { + fn hash(&self, state: &mut H) { + self.i2p_dest.hash(state); + } +} + impl ToString for PeerId { fn to_string(&self) -> String { self.i2p_dest.to_string() diff --git a/ubisync/src/comm/message_processor.rs b/ubisync/src/comm/message_processor.rs index 41030ac..f677104 100644 --- a/ubisync/src/comm/message_processor.rs +++ b/ubisync/src/comm/message_processor.rs @@ -1,7 +1,7 @@ use tracing::debug; use ubisync_lib::peer::Peer; -use ubisync_lib::types::{ContentUpdateStrategy, PeerId}; +use ubisync_lib::types::{ContentUpdateStrategy, Family, PeerId}; use ubisync_lib::messages::{Message, MessageContent}; @@ -9,13 +9,34 @@ use crate::comm::conflict_resolution::merge_element_contents; use crate::state::CommState; pub fn handle(state: &CommState, peer: &PeerId, message: Message) { - debug!("Handling message now: {:?}", message); + debug!( + "Received message.\nFrom: {:?} (dest: {:?})\nTo: {:?} (dest: {:?})\nMessage: {message:?}", + peer.b32_addr_nocache(), + peer, + state.own_peer_id().unwrap().b32_addr_nocache(), + state.own_peer_id().unwrap() + ); match message.content() { MessageContent::Hello { peer_name } => { state .set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string()))) .expect("State failed"); } + MessageContent::JoinFamily => state.request_family_join(peer.to_owned()), + MessageContent::AddedToFamily { family } => { + if state.has_family_join_request(peer.to_owned()) { + debug!("Own join request was accepted, setting family"); + state + .set_own_family(family.to_owned()) + .expect("State failed"); + + debug!("New own family: {:?}", state.get_family_of_peer(state.own_peer_id().unwrap())) + } + else { + debug!("Got AddedToFamily message, but no family join request was found") + } + } + MessageContent::LeaveFamily => state.remove_peer_from_family(peer.to_owned()), MessageContent::CreateElement { id, content, pot } => { state .add_received_element( @@ -28,9 +49,12 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) { .expect("State failed"); } MessageContent::UpdateElement { id, content } => { - if let Some(new_content) = - merge_element_contents(state, id.clone(), content.to_owned(), message.id().to_owned()) - { + if let Some(new_content) = merge_element_contents( + state, + id.clone(), + content.to_owned(), + message.id().to_owned(), + ) { state .update_element_content( id.to_owned(), diff --git a/ubisync/src/comm/mod.rs b/ubisync/src/comm/mod.rs index 9dcc4b4..c23ac29 100644 --- a/ubisync/src/comm/mod.rs +++ b/ubisync/src/comm/mod.rs @@ -1,6 +1,7 @@ -pub mod message_processor; mod conflict_resolution; +pub mod message_processor; +use i2p::sam::StreamForward; use tracing::{debug, error, warn}; use ubisync_lib::messages::Message; use ubisync_lib::types::PeerId; @@ -54,27 +55,17 @@ impl CommHandle { debug!("CommHandle is running now"); let state = self.state.clone(); let i2p_server = self.i2p_server.clone(); - let clients = self.clients.clone(); let mut thread_writeguard = self.thread.write().await; *thread_writeguard = Some(tokio::spawn(async move { for incoming in i2p_server.incoming() { if let Ok(stream) = incoming { - if let Ok(addr) = stream.peer_addr() { - // First, save a reference to the new stream in `clients` for later reuse - let wrapped_stream = Arc::new(RwLock::new(stream)); - clients.write().await.insert(addr, wrapped_stream.clone()); - // Reference to state to be passed to `read_connection()` - let state_arc = state.clone(); - - // Spawn a blocking task, which (in read_connection) will spawn a non-blocking task - // The "outer" blocking task exists, because the for loop's iterator will block until - // there is another stream - thus, the existing streams will not be read. - // `spawn_blocking` moves the reading task to a special pool of tasks which are - // executed _despite_ other tasks blocking for something. - tokio::task::spawn_blocking(move || { - Self::read_connection(wrapped_stream, state_arc) - }); - } + let state_arc = state.clone(); + // Spawn a blocking task, which (in read_connection) will spawn a non-blocking task + // The "outer" blocking task exists, because the for loop's iterator will block until + // there is another stream - thus, the existing streams will not be read. + // `spawn_blocking` moves the reading task to a special pool of tasks which are + // executed _despite_ other tasks blocking for something. + tokio::task::spawn_blocking(move || Self::read_connection(stream, state_arc)); } } })); @@ -105,7 +96,7 @@ impl CommHandle { } pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> { - debug!("To '{dest:?}': '{msg:?}"); + debug!("Sending message...\nFrom '{:?}' (dest: {:?})\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().b32_addr_nocache(), self.own_peer_id().unwrap()); match serde_json::to_string(&msg) { Ok(msg_string) => { self.send_to_addr(dest, msg_string.as_bytes()).await?; @@ -119,7 +110,7 @@ impl CommHandle { // Create client for this connection if necessary if !self.clients.read().await.contains_key(addr) { debug!("No client exists for requested connection, creating one"); - match I2pStream::connect(addr) { + match I2pStream::connect_with_session(self.i2p_server.session(), addr) { Ok(client) => { //client.inner.sam.conn.set_nodelay(true)?; //client.inner.sam.conn.set_nonblocking(false)?; @@ -167,12 +158,9 @@ impl CommHandle { Ok(self.peer_id.to_owned()) } - fn read_connection( - wrapped_stream: Arc>, - state: Arc, - ) -> JoinHandle<()> { + fn read_connection(stream: I2pStream, state: Arc) -> JoinHandle<()> { + let mut stream = stream; tokio::spawn(async move { - let mut stream = wrapped_stream.write().await; let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into(); // All streams start with a \n byte which does not belong to the payload, take that from the stream. @@ -181,8 +169,8 @@ impl CommHandle { return; } - let iterator = serde_json::Deserializer::from_reader(&mut *stream) - .into_iter::(); + let iterator = + serde_json::Deserializer::from_reader(stream).into_iter::(); for item in iterator { match item { Ok(value) => match serde_json::from_value::(value) { diff --git a/ubisync/src/lib.rs b/ubisync/src/lib.rs index 7f8bb35..71859b7 100644 --- a/ubisync/src/lib.rs +++ b/ubisync/src/lib.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{future::Future, sync::Arc}; use anyhow::bail; use api::{v0::app::App, Api, ApiBuilder}; @@ -9,8 +9,9 @@ use node_events::UbisyncNodeEvent; use state::{ApiState, CommState, State}; use ubisync_lib::{ + messages::{Message, MessageContent}, peer::Peer, - types::{AppId, PeerId, PotId}, + types::{AppId, Family, PeerId, PotId}, }; pub mod api; @@ -76,6 +77,39 @@ impl Ubisync { self.state_handle.add_family_member(id) } + pub async fn request_family_join(&self, peer: PeerId) -> anyhow::Result<()> { + self.state_handle.add_family_join_request(peer.clone()); + self.comm_handle + .send(peer.addr_ref(), Message::new(MessageContent::JoinFamily)) + .await + } + + pub fn accept_family_join(&self, peer: PeerId) -> anyhow::Result<()> { + self.add_family_member_from_id(peer.clone())?; + let family = self.get_family()?; + tokio::spawn({ + let ch = self.comm_handle.clone(); + async move { + ch.send( + peer.addr_ref(), + Message::new(MessageContent::AddedToFamily { family }), + ) + .await + .expect("Could not send family join confirmation to peer"); + } + }); + Ok(()) + } + + pub fn get_family(&self) -> anyhow::Result { + self.state_handle + .get_family_of(self.comm_handle.own_peer_id()?) + } + + pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result { + self.state_handle.get_family_of(peer) + } + pub fn get_apps(&self) -> Vec { self.state_handle.get_apps().unwrap_or(vec![]) } @@ -92,3 +126,45 @@ impl Ubisync { self.state_handle.add_pot_member(pot, app) } } + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use tracing::{debug, Level}; + + use crate::{config::Config, node_events::UbisyncNodeEvent, Ubisync}; + + #[tokio::test(flavor = "multi_thread")] + async fn join_and_leave_family() { + tracing_subscriber::fmt() + .pretty() + .with_max_level(Level::DEBUG) + .init(); + + // Two nodes need to bind to different ports + let mut c2 = Config::default(); + c2.api_config.port = Some(9982); + let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap()); + let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap()); + + ubi1.set_node_event_callback( + move |ev, node| { + if let UbisyncNodeEvent::FamilyJoinRequest { joiner } = ev { + debug!("Received join request, make member join"); + node.accept_family_join(joiner).unwrap(); + } + }, + ubi1.clone(), + ); + + ubi2.request_family_join(ubi1.get_destination().into()) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(5000)).await; + + assert_eq!(ubi1.get_family().unwrap(), ubi2.get_family().unwrap()); + std::process::exit(0); + } +} diff --git a/ubisync/src/node_events/mod.rs b/ubisync/src/node_events/mod.rs index 756ac24..cf9403c 100644 --- a/ubisync/src/node_events/mod.rs +++ b/ubisync/src/node_events/mod.rs @@ -1,10 +1,6 @@ -use ubisync_lib::types::PotId; - - +use ubisync_lib::types::{PeerId, PotId}; pub enum UbisyncNodeEvent { - NewPot { - id: PotId, - app_type: String, - } -} \ No newline at end of file + NewPot { id: PotId, app_type: String }, + FamilyJoinRequest { joiner: PeerId }, +} diff --git a/ubisync/src/state/api_state.rs b/ubisync/src/state/api_state.rs index fa350ca..7c73fe6 100644 --- a/ubisync/src/state/api_state.rs +++ b/ubisync/src/state/api_state.rs @@ -6,7 +6,7 @@ use tracing::debug; use ubisync_lib::{ api::events::AppEvent, messages::MessageContent, - types::{AppId, Element, ElementContent, ElementId, ContentUpdateStrategy, Pot, PotId}, + types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId}, }; use crate::api::v0::app::App; @@ -57,10 +57,21 @@ impl ApiState { .map(|app_opt| app_opt.ok_or(Error::msg("Failed to find app")))? } - pub fn create_element(&self, content: ElementContent, pot: PotId, update_strategy: ContentUpdateStrategy) -> anyhow::Result { + pub fn create_element( + &self, + content: ElementContent, + pot: PotId, + update_strategy: ContentUpdateStrategy, + ) -> anyhow::Result { let id = ElementId::new(); - self.db() - .add_element(id.clone(), content.clone(), update_strategy, None, false, pot.clone())?; + self.db().add_element( + id.clone(), + content.clone(), + update_strategy, + None, + false, + pot.clone(), + )?; debug!("Added element {{{}}}", id.to_string()); self.state.send_to_peers( diff --git a/ubisync/src/state/comm_state.rs b/ubisync/src/state/comm_state.rs index dd85674..eb29544 100644 --- a/ubisync/src/state/comm_state.rs +++ b/ubisync/src/state/comm_state.rs @@ -5,7 +5,9 @@ use tracing::debug; use ubisync_lib::{ api::events::AppEvent, peer::Peer, - types::{Element, ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId}, + types::{ + ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, MessageId, PeerId, PotId + }, }; use crate::node_events::UbisyncNodeEvent; @@ -98,6 +100,31 @@ impl CommState { Ok(()) } + pub fn request_family_join(&self, peer: PeerId) { + self.state + .emit_node_event(UbisyncNodeEvent::FamilyJoinRequest { joiner: peer }); + } + + pub fn remove_peer_from_family(&self, peer: PeerId) { + self.db().remove_peer_from_family(peer); + } + + pub fn has_family_join_request(&self, peer: PeerId) -> bool { + self.db().has_family_join_request(peer).unwrap_or(false) + } + + pub fn set_own_family(&self, family: Family) -> anyhow::Result<()> { + self.db().add_peer_family(family) + } + + pub fn get_family_of_peer(&self, peer: PeerId) -> anyhow::Result> { + self.db().get_family_of_peer(peer) + } + + pub fn own_peer_id(&self) -> Option { + self.state.own_peer_id() + } + fn db(&self) -> &StateDB { &self.state.db } @@ -110,7 +137,7 @@ mod tests { use super::CommState; use tracing::Level; - use ubisync_lib::types::{ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId}; + use ubisync_lib::types::{ContentUpdateStrategy, ElementContent, ElementId, MessageId, PotId}; #[tokio::test] #[serial_test::serial] diff --git a/ubisync/src/state/database/collections/peer_families.rs b/ubisync/src/state/database/collections/peer_families.rs index ac15b7d..f204819 100644 --- a/ubisync/src/state/database/collections/peer_families.rs +++ b/ubisync/src/state/database/collections/peer_families.rs @@ -86,22 +86,21 @@ impl From for Family { } impl StateDB { - pub fn add_peer_family( - &self, - id: FamilyId, - name: Option, - initial_members: Vec, - ) -> anyhow::Result<()> { - DbPeerFamily::push( - DbPeerFamily { - id: AsKey::new(id), - name, - members: HashSet::from_iter(initial_members), - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) + pub fn add_peer_family(&self, family: Family) -> anyhow::Result<()> { + if self.get_peer_family(family.id.clone())?.is_some() { + Err(Error::msg("Peer family already exists")) + } else { + DbPeerFamily::push( + DbPeerFamily { + id: AsKey::new(family.id), + name: family.name, + members: HashSet::from_iter(family.members), + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) + } } pub fn add_peer_to_family(&self, peer: PeerId, family: FamilyId) -> anyhow::Result<()> { @@ -166,6 +165,24 @@ impl StateDB { })? .map_err(|e| anyhow!(e)) } + + pub fn remove_peer_from_family(&self, peer: PeerId) -> anyhow::Result<()> { + self.db + .view::() + .with_key(&AsKey::new(peer.clone())) + .query_with_collection_docs() + .map(|results| { + if let Some(family) = results.into_iter().next() { + family.document.to_owned().modify(&self.db, |doc| { + doc.contents.members.remove(&peer.clone()); + }) + } else { + Err(bonsaidb::core::Error::other("", "Could not find family")) + } + }) + .map(|_| ()) + .map_err(|e| anyhow!(e)) + } } #[cfg(test)] @@ -180,11 +197,11 @@ mod tests { let family_id = FamilyId::new(); let peer_id = PeerId::default(); - db.add_peer_family( - family_id.clone(), - Some("My family name".to_string()), - vec![peer_id.clone()], - ) + db.add_peer_family(Family { + id: family_id.clone(), + name: Some("My family name".to_string()), + members: vec![peer_id.clone()], + }) .unwrap(); let retrieved_family = db.get_peer_family(family_id.clone()).unwrap(); @@ -198,17 +215,42 @@ mod tests { ) } + #[test] + fn add_remove() { + let db = StateDB::init(None); + let family_id = FamilyId::new(); + let peer_id = PeerId::default(); + + db.add_peer_family(Family { + id: family_id.clone(), + name: Some("My family name".to_string()), + members: vec![peer_id.clone()], + }) + .unwrap(); + db.remove_peer_from_family(peer_id.clone()).unwrap(); + let retrieved_family = db.get_peer_family(family_id.clone()).unwrap(); + + assert_eq!( + retrieved_family, + Some(Family::new( + family_id, + Some("My family name".to_string()), + vec![] + )) + ) + } + #[test] fn set_name() { let db = StateDB::init(None); let family_id = FamilyId::new(); let peer_id = PeerId::default(); - db.add_peer_family( - family_id.clone(), - Some("My family name".to_string()), - vec![peer_id.clone()], - ) + db.add_peer_family(Family { + id: family_id.clone(), + name: Some("My family name".to_string()), + members: vec![peer_id.clone()], + }) .unwrap(); assert_eq!( @@ -239,11 +281,11 @@ mod tests { let family_id = FamilyId::new(); let peer_id = PeerId::default(); - db.add_peer_family( - family_id.clone(), - Some("My family name".to_string()), - vec![peer_id.clone()], - ) + db.add_peer_family(Family { + id: family_id.clone(), + name: Some("My family name".to_string()), + members: vec![peer_id.clone()], + }) .unwrap(); assert_eq!(db.get_family_of_peer(peer_id).unwrap(), Some(family_id)) diff --git a/ubisync/src/state/database/collections/peers.rs b/ubisync/src/state/database/collections/peers.rs index 387d26d..60f61f6 100644 --- a/ubisync/src/state/database/collections/peers.rs +++ b/ubisync/src/state/database/collections/peers.rs @@ -2,7 +2,10 @@ use anyhow::{anyhow, Error}; use bonsaidb::core::schema::{Collection, SerializedCollection}; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use ubisync_lib::{peer::Peer, types::{ElementId, PeerId}}; +use ubisync_lib::{ + peer::Peer, + types::{ElementId, PeerId}, +}; use crate::state::database::{as_key::AsKey, StateDB}; @@ -24,15 +27,19 @@ impl From for Peer { impl StateDB { pub fn add_peer(&self, peer: Peer) -> anyhow::Result<()> { - DbPeer::push( - DbPeer { - id: AsKey::new(peer.id()), - name: peer.name(), - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) + if self.get_peer(peer.id())?.is_some() { + Err(Error::msg("Peer already exists")) + } else { + DbPeer::push( + DbPeer { + id: AsKey::new(peer.id()), + name: peer.name(), + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) + } } pub fn get_peer(&self, id: PeerId) -> anyhow::Result> { diff --git a/ubisync/src/state/database/collections/pot_memberships.rs b/ubisync/src/state/database/collections/pot_memberships.rs index bfa6d23..9c554f3 100644 --- a/ubisync/src/state/database/collections/pot_memberships.rs +++ b/ubisync/src/state/database/collections/pot_memberships.rs @@ -104,19 +104,23 @@ impl StateDB { "A member app which does not exist was meant to be added to a pot", )) } else { - DbPotMembership::push( - DbPotMembership { - pot_id: AsKey::new(pot), - app_id: AsKey::new(app), - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) + if self.get_pot_membership(pot.clone(), app.clone())?.is_some() { + Err(Error::msg("Pot membership already exists")) + } else { + DbPotMembership::push( + DbPotMembership { + pot_id: AsKey::new(pot), + app_id: AsKey::new(app), + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) + } } } - pub fn get(&self, pot: PotId, app: AppId) -> anyhow::Result> { + pub fn get_pot_membership(&self, pot: PotId, app: AppId) -> anyhow::Result> { self.db .view::() .with_key(&(AsKey::new(pot), AsKey::new(app))) diff --git a/ubisync/src/state/database/collections/pots.rs b/ubisync/src/state/database/collections/pots.rs index 7dfcfcd..8548f99 100644 --- a/ubisync/src/state/database/collections/pots.rs +++ b/ubisync/src/state/database/collections/pots.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::{Error, anyhow}; use bonsaidb::core::schema::{Collection, SerializedCollection}; use serde::{Deserialize, Serialize}; use ubisync_lib::types::{Pot, PotId}; @@ -15,21 +15,28 @@ pub(super) struct DbPot { impl From for Pot { fn from(value: DbPot) -> Self { - Pot {id: (*value.id).clone(), app_type: value.app_type} + Pot { + id: (*value.id).clone(), + app_type: value.app_type, + } } } impl StateDB { pub fn add_pot(&self, id: PotId, app_type: String) -> anyhow::Result<()> { - DbPot::push( - DbPot { - id: AsKey::new(id), - app_type, - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) + if self.get_pot(id.clone())?.is_some() { + Err(Error::msg("Pot already exists")) + } else { + DbPot::push( + DbPot { + id: AsKey::new(id), + app_type, + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) + } } pub fn get_pot(&self, id: PotId) -> anyhow::Result> { @@ -45,7 +52,6 @@ mod tests { use crate::state::database::StateDB; - #[test] fn add_get() { let db = StateDB::init(None); @@ -53,6 +59,12 @@ mod tests { db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap(); let retrieved_pot = db.get_pot(pot_id.clone()).unwrap(); - assert_eq!(retrieved_pot, Some(Pot {id: pot_id, app_type: "app_type".to_string()})) + assert_eq!( + retrieved_pot, + Some(Pot { + id: pot_id, + app_type: "app_type".to_string() + }) + ) } -} \ No newline at end of file +} diff --git a/ubisync/src/state/database/mod.rs b/ubisync/src/state/database/mod.rs index bd78728..a8eb415 100644 --- a/ubisync/src/state/database/mod.rs +++ b/ubisync/src/state/database/mod.rs @@ -1,7 +1,13 @@ -use bonsaidb::local::{ - config::{Builder, StorageConfiguration}, - Database as BonsaiDb, +use anyhow::anyhow; +use bonsaidb::{ + core::keyvalue::KeyValue, + local::{ + config::{Builder, StorageConfiguration}, + Database as BonsaiDb, + }, }; +use tracing::debug; +use ubisync_lib::types::PeerId; use uuid::Uuid; use self::collections::UbisyncSchema; @@ -24,4 +30,44 @@ impl StateDB { db: BonsaiDb::open::(storage_conf).unwrap(), } } + + pub fn add_family_join_request(&self, peer: PeerId) { + self.db.set_key(peer.to_string(), &"").execute(); + debug!( + "Added join request: {:?}", + self.db.get_key(peer.to_string()).query() + ) + } + + pub fn has_family_join_request(&self, peer: PeerId) -> anyhow::Result { + Ok(self + .db + .get_key(peer.to_string()) + .query() + .map_err(|e| anyhow!(e))? + .is_some()) + } + + pub fn remove_family_join_request(&self, peer: PeerId) -> anyhow::Result<()> { + self.db + .delete_key(peer.to_string()) + .map(|_| ()) + .map_err(|e| anyhow!(e)) + } +} + +#[cfg(test)] +mod tests { + use ubisync_lib::types::PeerId; + + use super::StateDB; + + #[test] + fn family_join_requests() { + let db = StateDB::init(None); + let peer = PeerId::default(); + db.add_family_join_request(peer.clone()); + + assert!(db.has_family_join_request(peer).unwrap()); + } } diff --git a/ubisync/src/state/mod.rs b/ubisync/src/state/mod.rs index df123b1..ca4f031 100644 --- a/ubisync/src/state/mod.rs +++ b/ubisync/src/state/mod.rs @@ -10,10 +10,10 @@ use ubisync_lib::{ api::events::AppEvent, messages::{Message, MessageContent}, peer::Peer, - types::{AppId, Element, ElementContent, ElementId, FamilyId, PeerId, PotId, Tag}, + types::{AppId, Element, ElementContent, ElementId, Family, FamilyId, PeerId, PotId, Tag}, }; -use anyhow::Error; +use anyhow::{anyhow, Error}; use tracing::{debug, warn}; mod api_state; @@ -68,10 +68,11 @@ impl State { pub fn own_peer_id(&self) -> Option { if let Ok(guard) = self.comm_handle.read() { - guard.as_ref().map(|t| t.own_peer_id().ok()).flatten() - } else { - None + if let Some(ch) = guard.as_ref() { + return ch.own_peer_id().ok(); + } } + None } pub fn get_apps(&self) -> anyhow::Result> { @@ -149,24 +150,51 @@ impl State { pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> { let my_id = self - .comm_handle - .read() - .map_err(|_| Error::msg("Failed to lock on CommHandle"))? - .to_owned() - .ok_or(Error::msg("CommHandle not initialized"))? - .own_peer_id()?; + .own_peer_id() + .ok_or(Error::msg("Could not get own PeerId"))?; - if self.db.get_family_of_peer(my_id.clone())?.is_none() { + let my_family = match self.db.get_family_of_peer(my_id.clone())? { + Some(id) => { + self.db.add_peer_to_family(peer.clone(), id.clone())?; + self.db + .get_peer_family(id)? + .ok_or(Error::msg("Family not found"))? + } + None => { + debug!( + "This node does not have a family yet, creating one with this node and the added peer" + ); + let family = Family { + id: FamilyId::new(), + name: None, + members: vec![my_id.clone(), peer.clone()], + }; + self.db.add_peer_family(family.clone())?; + family + } + }; + + self.send_to_peers( + MessageContent::AddedToFamily { family: my_family }, + vec![peer], + ); + Ok(()) + } + + pub fn add_family_join_request(&self, peer: PeerId) { + self.db.add_family_join_request(peer) + } + + pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result { + match self.db.get_peer_family( self.db - .add_peer_family(FamilyId::new(), None, vec![my_id.clone()])?; + .get_family_of_peer(peer)? + .ok_or(Error::msg("Family of peer not found"))?, + ) { + Ok(Some(family)) => Ok(family), + Ok(None) => Err(Error::msg("Family not found by its FamilyId")), + Err(e) => Err(e), } - - self.db.add_peer_to_family( - peer, - self.db - .get_family_of_peer(my_id)? - .ok_or(Error::msg("Could not find own family"))?, - ) } pub fn get_event_receiver( @@ -234,23 +262,25 @@ impl State { } } - fn send_to_peers(&self, ct: MessageContent, peers: Vec) { + pub fn send_to_peers(&self, ct: MessageContent, peers: Vec) { match self.comm_handle.read() { Ok(opt) => { if opt.is_some() { let arc = opt.as_ref().unwrap().clone(); - tokio::spawn(async move { - for peer in peers { - if let Err(e) = - arc.send(peer.addr_ref(), Message::new(ct.clone())).await - { - debug!( - "Sending to peer '{:?}' returned an error: {}", - peer.b32_addr_nocache(), - e - ) + tokio::task::spawn_blocking(|| { + tokio::spawn(async move { + for peer in peers { + let _ = arc + .send(peer.addr_ref(), Message::new(ct.clone())) + .await + .map_err(|e| { + debug!( + "Sending to peer '{:?}' returned an error:\n{e}", + peer + ) + }); } - } + }) }); } }