diff --git a/ubisync-lib/src/messages/mod.rs b/ubisync-lib/src/messages/mod.rs index 64e4a34..679f897 100644 --- a/ubisync-lib/src/messages/mod.rs +++ b/ubisync-lib/src/messages/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use crate::types::{ElementContent, ElementId, Family, FamilyId, MessageId, PotId}; +use crate::types::{ElementContent, ElementId, MessageId, PotId}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Message { @@ -19,11 +19,6 @@ pub enum MessageContent { Hello { peer_name: String, }, - JoinFamily, - AddedToFamily { - family: Family, - }, - LeaveFamily, CreateElement { id: ElementId, content: ElementContent, @@ -39,7 +34,7 @@ pub enum MessageContent { AddPot { id: PotId, app_type: String, - }, + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/ubisync-lib/src/types/peer_id.rs b/ubisync-lib/src/types/peer_id.rs index 5cf7e29..45a275b 100644 --- a/ubisync-lib/src/types/peer_id.rs +++ b/ubisync-lib/src/types/peer_id.rs @@ -1,10 +1,8 @@ -use std::hash::Hash; - use anyhow::{anyhow, bail}; use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct PeerId { i2p_dest: I2pSocketAddr, i2p_b32: Option, @@ -29,14 +27,6 @@ impl PeerId { } } -// The identity of the PeerId only depends on the i2p_dest (which is unique), -// and not on whether the b32 address has been computed before -impl Hash for PeerId { - fn hash(&self, state: &mut H) { - self.i2p_dest.hash(state); - } -} - impl ToString for PeerId { fn to_string(&self) -> String { self.i2p_dest.to_string() diff --git a/ubisync/src/comm/message_processor.rs b/ubisync/src/comm/message_processor.rs index f677104..41030ac 100644 --- a/ubisync/src/comm/message_processor.rs +++ b/ubisync/src/comm/message_processor.rs @@ -1,7 +1,7 @@ use tracing::debug; use ubisync_lib::peer::Peer; -use ubisync_lib::types::{ContentUpdateStrategy, Family, PeerId}; +use ubisync_lib::types::{ContentUpdateStrategy, PeerId}; use ubisync_lib::messages::{Message, MessageContent}; @@ -9,34 +9,13 @@ use crate::comm::conflict_resolution::merge_element_contents; use crate::state::CommState; pub fn handle(state: &CommState, peer: &PeerId, message: Message) { - debug!( - "Received message.\nFrom: {:?} (dest: {:?})\nTo: {:?} (dest: {:?})\nMessage: {message:?}", - peer.b32_addr_nocache(), - peer, - state.own_peer_id().unwrap().b32_addr_nocache(), - state.own_peer_id().unwrap() - ); + debug!("Handling message now: {:?}", message); match message.content() { MessageContent::Hello { peer_name } => { state .set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string()))) .expect("State failed"); } - MessageContent::JoinFamily => state.request_family_join(peer.to_owned()), - MessageContent::AddedToFamily { family } => { - if state.has_family_join_request(peer.to_owned()) { - debug!("Own join request was accepted, setting family"); - state - .set_own_family(family.to_owned()) - .expect("State failed"); - - debug!("New own family: {:?}", state.get_family_of_peer(state.own_peer_id().unwrap())) - } - else { - debug!("Got AddedToFamily message, but no family join request was found") - } - } - MessageContent::LeaveFamily => state.remove_peer_from_family(peer.to_owned()), MessageContent::CreateElement { id, content, pot } => { state .add_received_element( @@ -49,12 +28,9 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) { .expect("State failed"); } MessageContent::UpdateElement { id, content } => { - if let Some(new_content) = merge_element_contents( - state, - id.clone(), - content.to_owned(), - message.id().to_owned(), - ) { + if let Some(new_content) = + merge_element_contents(state, id.clone(), content.to_owned(), message.id().to_owned()) + { state .update_element_content( id.to_owned(), diff --git a/ubisync/src/comm/mod.rs b/ubisync/src/comm/mod.rs index c23ac29..75412f7 100644 --- a/ubisync/src/comm/mod.rs +++ b/ubisync/src/comm/mod.rs @@ -1,7 +1,6 @@ -mod conflict_resolution; pub mod message_processor; +mod conflict_resolution; -use i2p::sam::StreamForward; use tracing::{debug, error, warn}; use ubisync_lib::messages::Message; use ubisync_lib::types::PeerId; @@ -23,7 +22,7 @@ use crate::Config; pub struct CommHandle { state: Arc, i2p_server: Arc, - peer_id: PeerId, + peer_id: RwLock, // Maps peer addresses to existing connections to them clients: Arc>>>>, thread: RwLock>>, @@ -39,13 +38,12 @@ impl CommHandle { } let listener = listener_builder.build().unwrap(); - let mut own_peer_id: PeerId = (&listener).local_addr().map_err(|e| anyhow!(e))?.into(); - own_peer_id.b32_addr(); + let own_peer_id: PeerId = (&listener).local_addr().map_err(|e| anyhow!(e))?.into(); Ok(CommHandle { state: Arc::new(state), i2p_server: Arc::new(listener), - peer_id: own_peer_id, + peer_id: RwLock::new(own_peer_id), clients: Default::default(), thread: RwLock::new(None), }) @@ -55,17 +53,27 @@ impl CommHandle { debug!("CommHandle is running now"); let state = self.state.clone(); let i2p_server = self.i2p_server.clone(); + let clients = self.clients.clone(); let mut thread_writeguard = self.thread.write().await; *thread_writeguard = Some(tokio::spawn(async move { for incoming in i2p_server.incoming() { if let Ok(stream) = incoming { - let state_arc = state.clone(); - // Spawn a blocking task, which (in read_connection) will spawn a non-blocking task - // The "outer" blocking task exists, because the for loop's iterator will block until - // there is another stream - thus, the existing streams will not be read. - // `spawn_blocking` moves the reading task to a special pool of tasks which are - // executed _despite_ other tasks blocking for something. - tokio::task::spawn_blocking(move || Self::read_connection(stream, state_arc)); + if let Ok(addr) = stream.peer_addr() { + // First, save a reference to the new stream in `clients` for later reuse + let wrapped_stream = Arc::new(RwLock::new(stream)); + clients.write().await.insert(addr, wrapped_stream.clone()); + // Reference to state to be passed to `read_connection()` + let state_arc = state.clone(); + + // Spawn a blocking task, which (in read_connection) will spawn a non-blocking task + // The "outer" blocking task exists, because the for loop's iterator will block until + // there is another stream - thus, the existing streams will not be read. + // `spawn_blocking` moves the reading task to a special pool of tasks which are + // executed _despite_ other tasks blocking for something. + tokio::task::spawn_blocking(move || { + Self::read_connection(wrapped_stream, state_arc) + }); + } } } })); @@ -96,7 +104,6 @@ impl CommHandle { } pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> { - debug!("Sending message...\nFrom '{:?}' (dest: {:?})\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().b32_addr_nocache(), self.own_peer_id().unwrap()); match serde_json::to_string(&msg) { Ok(msg_string) => { self.send_to_addr(dest, msg_string.as_bytes()).await?; @@ -109,8 +116,7 @@ impl CommHandle { pub async fn send_to_addr(&self, addr: &I2pSocketAddr, msg: &[u8]) -> anyhow::Result<()> { // Create client for this connection if necessary if !self.clients.read().await.contains_key(addr) { - debug!("No client exists for requested connection, creating one"); - match I2pStream::connect_with_session(self.i2p_server.session(), addr) { + match I2pStream::connect(addr) { Ok(client) => { //client.inner.sam.conn.set_nodelay(true)?; //client.inner.sam.conn.set_nonblocking(false)?; @@ -147,20 +153,23 @@ impl CommHandle { } pub fn i2p_address(&self) -> I2pSocketAddr { - self.peer_id.addr() + self.peer_id.blocking_read().addr() } pub fn i2p_b32_address(&self) -> anyhow::Result { - self.peer_id.b32_addr_nocache() + self.peer_id.blocking_write().b32_addr() } pub fn own_peer_id(&self) -> anyhow::Result { - Ok(self.peer_id.to_owned()) + Ok(self.peer_id.blocking_read().to_owned()) } - fn read_connection(stream: I2pStream, state: Arc) -> JoinHandle<()> { - let mut stream = stream; + fn read_connection( + wrapped_stream: Arc>, + state: Arc, + ) -> JoinHandle<()> { tokio::spawn(async move { + let mut stream = wrapped_stream.write().await; let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into(); // All streams start with a \n byte which does not belong to the payload, take that from the stream. @@ -169,8 +178,8 @@ impl CommHandle { return; } - let iterator = - serde_json::Deserializer::from_reader(stream).into_iter::(); + let iterator = serde_json::Deserializer::from_reader(&mut *stream) + .into_iter::(); for item in iterator { match item { Ok(value) => match serde_json::from_value::(value) { diff --git a/ubisync/src/lib.rs b/ubisync/src/lib.rs index 71859b7..7f8bb35 100644 --- a/ubisync/src/lib.rs +++ b/ubisync/src/lib.rs @@ -1,4 +1,4 @@ -use std::{future::Future, sync::Arc}; +use std::sync::Arc; use anyhow::bail; use api::{v0::app::App, Api, ApiBuilder}; @@ -9,9 +9,8 @@ use node_events::UbisyncNodeEvent; use state::{ApiState, CommState, State}; use ubisync_lib::{ - messages::{Message, MessageContent}, peer::Peer, - types::{AppId, Family, PeerId, PotId}, + types::{AppId, PeerId, PotId}, }; pub mod api; @@ -77,39 +76,6 @@ impl Ubisync { self.state_handle.add_family_member(id) } - pub async fn request_family_join(&self, peer: PeerId) -> anyhow::Result<()> { - self.state_handle.add_family_join_request(peer.clone()); - self.comm_handle - .send(peer.addr_ref(), Message::new(MessageContent::JoinFamily)) - .await - } - - pub fn accept_family_join(&self, peer: PeerId) -> anyhow::Result<()> { - self.add_family_member_from_id(peer.clone())?; - let family = self.get_family()?; - tokio::spawn({ - let ch = self.comm_handle.clone(); - async move { - ch.send( - peer.addr_ref(), - Message::new(MessageContent::AddedToFamily { family }), - ) - .await - .expect("Could not send family join confirmation to peer"); - } - }); - Ok(()) - } - - pub fn get_family(&self) -> anyhow::Result { - self.state_handle - .get_family_of(self.comm_handle.own_peer_id()?) - } - - pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result { - self.state_handle.get_family_of(peer) - } - pub fn get_apps(&self) -> Vec { self.state_handle.get_apps().unwrap_or(vec![]) } @@ -126,45 +92,3 @@ impl Ubisync { self.state_handle.add_pot_member(pot, app) } } - -#[cfg(test)] -mod tests { - use std::{sync::Arc, time::Duration}; - - use tracing::{debug, Level}; - - use crate::{config::Config, node_events::UbisyncNodeEvent, Ubisync}; - - #[tokio::test(flavor = "multi_thread")] - async fn join_and_leave_family() { - tracing_subscriber::fmt() - .pretty() - .with_max_level(Level::DEBUG) - .init(); - - // Two nodes need to bind to different ports - let mut c2 = Config::default(); - c2.api_config.port = Some(9982); - let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap()); - let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap()); - - ubi1.set_node_event_callback( - move |ev, node| { - if let UbisyncNodeEvent::FamilyJoinRequest { joiner } = ev { - debug!("Received join request, make member join"); - node.accept_family_join(joiner).unwrap(); - } - }, - ubi1.clone(), - ); - - ubi2.request_family_join(ubi1.get_destination().into()) - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(5000)).await; - - assert_eq!(ubi1.get_family().unwrap(), ubi2.get_family().unwrap()); - std::process::exit(0); - } -} diff --git a/ubisync/src/node_events/mod.rs b/ubisync/src/node_events/mod.rs index cf9403c..756ac24 100644 --- a/ubisync/src/node_events/mod.rs +++ b/ubisync/src/node_events/mod.rs @@ -1,6 +1,10 @@ -use ubisync_lib::types::{PeerId, PotId}; +use ubisync_lib::types::PotId; + + pub enum UbisyncNodeEvent { - NewPot { id: PotId, app_type: String }, - FamilyJoinRequest { joiner: PeerId }, -} + NewPot { + id: PotId, + app_type: String, + } +} \ No newline at end of file diff --git a/ubisync/src/state/api_state.rs b/ubisync/src/state/api_state.rs index 7c73fe6..fa350ca 100644 --- a/ubisync/src/state/api_state.rs +++ b/ubisync/src/state/api_state.rs @@ -6,7 +6,7 @@ use tracing::debug; use ubisync_lib::{ api::events::AppEvent, messages::MessageContent, - types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId}, + types::{AppId, Element, ElementContent, ElementId, ContentUpdateStrategy, Pot, PotId}, }; use crate::api::v0::app::App; @@ -57,21 +57,10 @@ impl ApiState { .map(|app_opt| app_opt.ok_or(Error::msg("Failed to find app")))? } - pub fn create_element( - &self, - content: ElementContent, - pot: PotId, - update_strategy: ContentUpdateStrategy, - ) -> anyhow::Result { + pub fn create_element(&self, content: ElementContent, pot: PotId, update_strategy: ContentUpdateStrategy) -> anyhow::Result { let id = ElementId::new(); - self.db().add_element( - id.clone(), - content.clone(), - update_strategy, - None, - false, - pot.clone(), - )?; + self.db() + .add_element(id.clone(), content.clone(), update_strategy, None, false, pot.clone())?; debug!("Added element {{{}}}", id.to_string()); self.state.send_to_peers( diff --git a/ubisync/src/state/comm_state.rs b/ubisync/src/state/comm_state.rs index eb29544..dd85674 100644 --- a/ubisync/src/state/comm_state.rs +++ b/ubisync/src/state/comm_state.rs @@ -5,9 +5,7 @@ use tracing::debug; use ubisync_lib::{ api::events::AppEvent, peer::Peer, - types::{ - ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, MessageId, PeerId, PotId - }, + types::{Element, ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId}, }; use crate::node_events::UbisyncNodeEvent; @@ -100,31 +98,6 @@ impl CommState { Ok(()) } - pub fn request_family_join(&self, peer: PeerId) { - self.state - .emit_node_event(UbisyncNodeEvent::FamilyJoinRequest { joiner: peer }); - } - - pub fn remove_peer_from_family(&self, peer: PeerId) { - self.db().remove_peer_from_family(peer); - } - - pub fn has_family_join_request(&self, peer: PeerId) -> bool { - self.db().has_family_join_request(peer).unwrap_or(false) - } - - pub fn set_own_family(&self, family: Family) -> anyhow::Result<()> { - self.db().add_peer_family(family) - } - - pub fn get_family_of_peer(&self, peer: PeerId) -> anyhow::Result> { - self.db().get_family_of_peer(peer) - } - - pub fn own_peer_id(&self) -> Option { - self.state.own_peer_id() - } - fn db(&self) -> &StateDB { &self.state.db } @@ -137,7 +110,7 @@ mod tests { use super::CommState; use tracing::Level; - use ubisync_lib::types::{ContentUpdateStrategy, ElementContent, ElementId, MessageId, PotId}; + use ubisync_lib::types::{ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId}; #[tokio::test] #[serial_test::serial] diff --git a/ubisync/src/state/database/collections/peer_families.rs b/ubisync/src/state/database/collections/peer_families.rs index f204819..ac15b7d 100644 --- a/ubisync/src/state/database/collections/peer_families.rs +++ b/ubisync/src/state/database/collections/peer_families.rs @@ -86,21 +86,22 @@ impl From for Family { } impl StateDB { - pub fn add_peer_family(&self, family: Family) -> anyhow::Result<()> { - if self.get_peer_family(family.id.clone())?.is_some() { - Err(Error::msg("Peer family already exists")) - } else { - DbPeerFamily::push( - DbPeerFamily { - id: AsKey::new(family.id), - name: family.name, - members: HashSet::from_iter(family.members), - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) - } + pub fn add_peer_family( + &self, + id: FamilyId, + name: Option, + initial_members: Vec, + ) -> anyhow::Result<()> { + DbPeerFamily::push( + DbPeerFamily { + id: AsKey::new(id), + name, + members: HashSet::from_iter(initial_members), + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) } pub fn add_peer_to_family(&self, peer: PeerId, family: FamilyId) -> anyhow::Result<()> { @@ -165,24 +166,6 @@ impl StateDB { })? .map_err(|e| anyhow!(e)) } - - pub fn remove_peer_from_family(&self, peer: PeerId) -> anyhow::Result<()> { - self.db - .view::() - .with_key(&AsKey::new(peer.clone())) - .query_with_collection_docs() - .map(|results| { - if let Some(family) = results.into_iter().next() { - family.document.to_owned().modify(&self.db, |doc| { - doc.contents.members.remove(&peer.clone()); - }) - } else { - Err(bonsaidb::core::Error::other("", "Could not find family")) - } - }) - .map(|_| ()) - .map_err(|e| anyhow!(e)) - } } #[cfg(test)] @@ -197,11 +180,11 @@ mod tests { let family_id = FamilyId::new(); let peer_id = PeerId::default(); - db.add_peer_family(Family { - id: family_id.clone(), - name: Some("My family name".to_string()), - members: vec![peer_id.clone()], - }) + db.add_peer_family( + family_id.clone(), + Some("My family name".to_string()), + vec![peer_id.clone()], + ) .unwrap(); let retrieved_family = db.get_peer_family(family_id.clone()).unwrap(); @@ -215,42 +198,17 @@ mod tests { ) } - #[test] - fn add_remove() { - let db = StateDB::init(None); - let family_id = FamilyId::new(); - let peer_id = PeerId::default(); - - db.add_peer_family(Family { - id: family_id.clone(), - name: Some("My family name".to_string()), - members: vec![peer_id.clone()], - }) - .unwrap(); - db.remove_peer_from_family(peer_id.clone()).unwrap(); - let retrieved_family = db.get_peer_family(family_id.clone()).unwrap(); - - assert_eq!( - retrieved_family, - Some(Family::new( - family_id, - Some("My family name".to_string()), - vec![] - )) - ) - } - #[test] fn set_name() { let db = StateDB::init(None); let family_id = FamilyId::new(); let peer_id = PeerId::default(); - db.add_peer_family(Family { - id: family_id.clone(), - name: Some("My family name".to_string()), - members: vec![peer_id.clone()], - }) + db.add_peer_family( + family_id.clone(), + Some("My family name".to_string()), + vec![peer_id.clone()], + ) .unwrap(); assert_eq!( @@ -281,11 +239,11 @@ mod tests { let family_id = FamilyId::new(); let peer_id = PeerId::default(); - db.add_peer_family(Family { - id: family_id.clone(), - name: Some("My family name".to_string()), - members: vec![peer_id.clone()], - }) + db.add_peer_family( + family_id.clone(), + Some("My family name".to_string()), + vec![peer_id.clone()], + ) .unwrap(); assert_eq!(db.get_family_of_peer(peer_id).unwrap(), Some(family_id)) diff --git a/ubisync/src/state/database/collections/peers.rs b/ubisync/src/state/database/collections/peers.rs index 60f61f6..387d26d 100644 --- a/ubisync/src/state/database/collections/peers.rs +++ b/ubisync/src/state/database/collections/peers.rs @@ -2,10 +2,7 @@ use anyhow::{anyhow, Error}; use bonsaidb::core::schema::{Collection, SerializedCollection}; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use ubisync_lib::{ - peer::Peer, - types::{ElementId, PeerId}, -}; +use ubisync_lib::{peer::Peer, types::{ElementId, PeerId}}; use crate::state::database::{as_key::AsKey, StateDB}; @@ -27,19 +24,15 @@ impl From for Peer { impl StateDB { pub fn add_peer(&self, peer: Peer) -> anyhow::Result<()> { - if self.get_peer(peer.id())?.is_some() { - Err(Error::msg("Peer already exists")) - } else { - DbPeer::push( - DbPeer { - id: AsKey::new(peer.id()), - name: peer.name(), - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) - } + DbPeer::push( + DbPeer { + id: AsKey::new(peer.id()), + name: peer.name(), + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) } pub fn get_peer(&self, id: PeerId) -> anyhow::Result> { diff --git a/ubisync/src/state/database/collections/pot_memberships.rs b/ubisync/src/state/database/collections/pot_memberships.rs index 9c554f3..bfa6d23 100644 --- a/ubisync/src/state/database/collections/pot_memberships.rs +++ b/ubisync/src/state/database/collections/pot_memberships.rs @@ -104,23 +104,19 @@ impl StateDB { "A member app which does not exist was meant to be added to a pot", )) } else { - if self.get_pot_membership(pot.clone(), app.clone())?.is_some() { - Err(Error::msg("Pot membership already exists")) - } else { - DbPotMembership::push( - DbPotMembership { - pot_id: AsKey::new(pot), - app_id: AsKey::new(app), - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) - } + DbPotMembership::push( + DbPotMembership { + pot_id: AsKey::new(pot), + app_id: AsKey::new(app), + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) } } - pub fn get_pot_membership(&self, pot: PotId, app: AppId) -> anyhow::Result> { + pub fn get(&self, pot: PotId, app: AppId) -> anyhow::Result> { self.db .view::() .with_key(&(AsKey::new(pot), AsKey::new(app))) diff --git a/ubisync/src/state/database/collections/pots.rs b/ubisync/src/state/database/collections/pots.rs index 8548f99..7dfcfcd 100644 --- a/ubisync/src/state/database/collections/pots.rs +++ b/ubisync/src/state/database/collections/pots.rs @@ -1,4 +1,4 @@ -use anyhow::{Error, anyhow}; +use anyhow::anyhow; use bonsaidb::core::schema::{Collection, SerializedCollection}; use serde::{Deserialize, Serialize}; use ubisync_lib::types::{Pot, PotId}; @@ -15,28 +15,21 @@ pub(super) struct DbPot { impl From for Pot { fn from(value: DbPot) -> Self { - Pot { - id: (*value.id).clone(), - app_type: value.app_type, - } + Pot {id: (*value.id).clone(), app_type: value.app_type} } } impl StateDB { pub fn add_pot(&self, id: PotId, app_type: String) -> anyhow::Result<()> { - if self.get_pot(id.clone())?.is_some() { - Err(Error::msg("Pot already exists")) - } else { - DbPot::push( - DbPot { - id: AsKey::new(id), - app_type, - }, - &self.db, - ) - .map(|_| ()) - .map_err(|e| anyhow!(e)) - } + DbPot::push( + DbPot { + id: AsKey::new(id), + app_type, + }, + &self.db, + ) + .map(|_| ()) + .map_err(|e| anyhow!(e)) } pub fn get_pot(&self, id: PotId) -> anyhow::Result> { @@ -52,6 +45,7 @@ mod tests { use crate::state::database::StateDB; + #[test] fn add_get() { let db = StateDB::init(None); @@ -59,12 +53,6 @@ mod tests { db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap(); let retrieved_pot = db.get_pot(pot_id.clone()).unwrap(); - assert_eq!( - retrieved_pot, - Some(Pot { - id: pot_id, - app_type: "app_type".to_string() - }) - ) + assert_eq!(retrieved_pot, Some(Pot {id: pot_id, app_type: "app_type".to_string()})) } -} +} \ No newline at end of file diff --git a/ubisync/src/state/database/mod.rs b/ubisync/src/state/database/mod.rs index a8eb415..bd78728 100644 --- a/ubisync/src/state/database/mod.rs +++ b/ubisync/src/state/database/mod.rs @@ -1,13 +1,7 @@ -use anyhow::anyhow; -use bonsaidb::{ - core::keyvalue::KeyValue, - local::{ - config::{Builder, StorageConfiguration}, - Database as BonsaiDb, - }, +use bonsaidb::local::{ + config::{Builder, StorageConfiguration}, + Database as BonsaiDb, }; -use tracing::debug; -use ubisync_lib::types::PeerId; use uuid::Uuid; use self::collections::UbisyncSchema; @@ -30,44 +24,4 @@ impl StateDB { db: BonsaiDb::open::(storage_conf).unwrap(), } } - - pub fn add_family_join_request(&self, peer: PeerId) { - self.db.set_key(peer.to_string(), &"").execute(); - debug!( - "Added join request: {:?}", - self.db.get_key(peer.to_string()).query() - ) - } - - pub fn has_family_join_request(&self, peer: PeerId) -> anyhow::Result { - Ok(self - .db - .get_key(peer.to_string()) - .query() - .map_err(|e| anyhow!(e))? - .is_some()) - } - - pub fn remove_family_join_request(&self, peer: PeerId) -> anyhow::Result<()> { - self.db - .delete_key(peer.to_string()) - .map(|_| ()) - .map_err(|e| anyhow!(e)) - } -} - -#[cfg(test)] -mod tests { - use ubisync_lib::types::PeerId; - - use super::StateDB; - - #[test] - fn family_join_requests() { - let db = StateDB::init(None); - let peer = PeerId::default(); - db.add_family_join_request(peer.clone()); - - assert!(db.has_family_join_request(peer).unwrap()); - } } diff --git a/ubisync/src/state/mod.rs b/ubisync/src/state/mod.rs index ca4f031..df123b1 100644 --- a/ubisync/src/state/mod.rs +++ b/ubisync/src/state/mod.rs @@ -10,10 +10,10 @@ use ubisync_lib::{ api::events::AppEvent, messages::{Message, MessageContent}, peer::Peer, - types::{AppId, Element, ElementContent, ElementId, Family, FamilyId, PeerId, PotId, Tag}, + types::{AppId, Element, ElementContent, ElementId, FamilyId, PeerId, PotId, Tag}, }; -use anyhow::{anyhow, Error}; +use anyhow::Error; use tracing::{debug, warn}; mod api_state; @@ -68,11 +68,10 @@ impl State { pub fn own_peer_id(&self) -> Option { if let Ok(guard) = self.comm_handle.read() { - if let Some(ch) = guard.as_ref() { - return ch.own_peer_id().ok(); - } + guard.as_ref().map(|t| t.own_peer_id().ok()).flatten() + } else { + None } - None } pub fn get_apps(&self) -> anyhow::Result> { @@ -150,51 +149,24 @@ impl State { pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> { let my_id = self - .own_peer_id() - .ok_or(Error::msg("Could not get own PeerId"))?; + .comm_handle + .read() + .map_err(|_| Error::msg("Failed to lock on CommHandle"))? + .to_owned() + .ok_or(Error::msg("CommHandle not initialized"))? + .own_peer_id()?; - let my_family = match self.db.get_family_of_peer(my_id.clone())? { - Some(id) => { - self.db.add_peer_to_family(peer.clone(), id.clone())?; - self.db - .get_peer_family(id)? - .ok_or(Error::msg("Family not found"))? - } - None => { - debug!( - "This node does not have a family yet, creating one with this node and the added peer" - ); - let family = Family { - id: FamilyId::new(), - name: None, - members: vec![my_id.clone(), peer.clone()], - }; - self.db.add_peer_family(family.clone())?; - family - } - }; - - self.send_to_peers( - MessageContent::AddedToFamily { family: my_family }, - vec![peer], - ); - Ok(()) - } - - pub fn add_family_join_request(&self, peer: PeerId) { - self.db.add_family_join_request(peer) - } - - pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result { - match self.db.get_peer_family( + if self.db.get_family_of_peer(my_id.clone())?.is_none() { self.db - .get_family_of_peer(peer)? - .ok_or(Error::msg("Family of peer not found"))?, - ) { - Ok(Some(family)) => Ok(family), - Ok(None) => Err(Error::msg("Family not found by its FamilyId")), - Err(e) => Err(e), + .add_peer_family(FamilyId::new(), None, vec![my_id.clone()])?; } + + self.db.add_peer_to_family( + peer, + self.db + .get_family_of_peer(my_id)? + .ok_or(Error::msg("Could not find own family"))?, + ) } pub fn get_event_receiver( @@ -262,25 +234,23 @@ impl State { } } - pub fn send_to_peers(&self, ct: MessageContent, peers: Vec) { + fn send_to_peers(&self, ct: MessageContent, peers: Vec) { match self.comm_handle.read() { Ok(opt) => { if opt.is_some() { let arc = opt.as_ref().unwrap().clone(); - tokio::task::spawn_blocking(|| { - tokio::spawn(async move { - for peer in peers { - let _ = arc - .send(peer.addr_ref(), Message::new(ct.clone())) - .await - .map_err(|e| { - debug!( - "Sending to peer '{:?}' returned an error:\n{e}", - peer - ) - }); + tokio::spawn(async move { + for peer in peers { + if let Err(e) = + arc.send(peer.addr_ref(), Message::new(ct.clone())).await + { + debug!( + "Sending to peer '{:?}' returned an error: {}", + peer.b32_addr_nocache(), + e + ) } - }) + } }); } }