diff --git a/ubisync-lib/src/types/peer_id.rs b/ubisync-lib/src/types/peer_id.rs index 37df5cb..5c267bf 100644 --- a/ubisync-lib/src/types/peer_id.rs +++ b/ubisync-lib/src/types/peer_id.rs @@ -1,4 +1,4 @@ -use anyhow::bail; +use anyhow::{anyhow, bail}; use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs}; use serde::{Deserialize, Serialize}; @@ -11,6 +11,12 @@ impl PeerId { pub fn addr(&self) -> I2pSocketAddr { self.i2p_addr.to_owned() } + pub fn addr_ref(&self) -> &I2pSocketAddr { + &self.i2p_addr + } + pub fn b32_addr(&self) -> anyhow::Result { + I2pAddr::from_b64(&self.i2p_addr.dest().string()).map_err(|e| anyhow!(e)) + } } impl ToString for PeerId { diff --git a/ubisync/src/state/api_state.rs b/ubisync/src/state/api_state.rs index 49d881d..6a52b4c 100644 --- a/ubisync/src/state/api_state.rs +++ b/ubisync/src/state/api_state.rs @@ -63,11 +63,17 @@ impl ApiState { .add_element(id.clone(), content.clone(), None, false, pot.clone())?; debug!("Added element {{{}}}", id.to_string()); - self.state.send_to_peers(MessageContent::CreateElement { - id: id.clone(), - content: content, - pot: pot, - }); + self.state.send_to_peers( + MessageContent::CreateElement { + id: id.clone(), + content: content, + pot: pot, + }, + self.state + .own_peer_id() + .map(|id| self.db().get_peer_family_members(id).unwrap_or_default()) + .unwrap_or_default(), + ); Ok(id) } @@ -110,10 +116,16 @@ impl ApiState { self.db().add_pot(pot_id.clone(), app_type.clone())?; self.db().add_pot_membership(pot_id.clone(), app_id)?; - self.state.send_to_peers(MessageContent::AddPot { - id: pot_id.clone(), - app_type: app_type, - }); + self.state.send_to_peers( + MessageContent::AddPot { + id: pot_id.clone(), + app_type: app_type, + }, + self.state + .own_peer_id() + .map(|id| self.db().get_peer_family_members(id).unwrap_or_default()) + .unwrap_or_default(), + ); Ok(pot_id) } diff --git a/ubisync/src/state/database/collections/peer_families.rs b/ubisync/src/state/database/collections/peer_families.rs index 2cd6880..ac15b7d 100644 --- a/ubisync/src/state/database/collections/peer_families.rs +++ b/ubisync/src/state/database/collections/peer_families.rs @@ -13,7 +13,7 @@ use ubisync_lib::types::{Family, FamilyId, PeerId}; use crate::state::database::{as_key::AsKey, StateDB}; #[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)] -#[collection(name = "peer-families", views = [DbPeerFamilyByMemberId])] +#[collection(name = "peer-families", views = [DbPeerFamilyIdByMemberId, DbPeerFamilyByMemberId])] pub(super) struct DbPeerFamily { #[natural_id] pub(super) id: AsKey, @@ -22,7 +22,34 @@ pub(super) struct DbPeerFamily { } #[derive(Debug, Clone, View, ViewSchema)] -#[view(collection = DbPeerFamily, key = AsKey, value = Option, name = "by-member-id")] +#[view(collection = DbPeerFamily, key = AsKey, value = Option, name = "id-by-member-id")] +pub(super) struct DbPeerFamilyIdByMemberId; + +impl MapReduce for DbPeerFamilyIdByMemberId { + fn map<'doc>( + &self, + document: &'doc bonsaidb::core::document::BorrowedDocument<'_>, + ) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> { + let content = DbPeerFamily::document_contents(document)?; + Ok(content + .members + .iter() + .fold(Mappings::default(), |acc, member| { + acc.and( + document + .header + .emit_key_and_value( + AsKey::new(member.to_owned()), + Some((*content.id).clone()), + ) + .unwrap_or(Mappings::none()), + ) + })) + } +} + +#[derive(Debug, Clone, View, ViewSchema)] +#[view(collection = DbPeerFamily, key = AsKey, value = Option, name = "by-member-id")] pub(super) struct DbPeerFamilyByMemberId; impl MapReduce for DbPeerFamilyByMemberId { @@ -40,7 +67,7 @@ impl MapReduce for DbPeerFamilyByMemberId { .header .emit_key_and_value( AsKey::new(member.to_owned()), - Some((*content.id).clone()), + Some(content.clone().into()), ) .unwrap_or(Mappings::none()), ) @@ -109,7 +136,7 @@ impl StateDB { pub fn get_family_of_peer(&self, peer: PeerId) -> anyhow::Result> { self.db - .view::() + .view::() .with_key(&AsKey::new(peer)) .query_with_collection_docs() .map(|results| match results.mappings.len() { @@ -119,6 +146,26 @@ impl StateDB { })? .map_err(|e| anyhow!(e)) } + + pub fn get_peer_family_members(&self, peer: PeerId) -> anyhow::Result> { + self.db + .view::() + .with_key(&AsKey::new(peer)) + .query_with_collection_docs() + .map(|results| match results.mappings.len() { + 0 => Ok(vec![]), + 1 => Ok(results + .mappings + .first() + .unwrap() + .value + .clone() + .map(|family| family.members) + .unwrap_or(vec![])), + _ => Err(Error::msg("Peer appears to be member of multiple families")), + })? + .map_err(|e| anyhow!(e)) + } } #[cfg(test)] diff --git a/ubisync/src/state/database/collections/peers.rs b/ubisync/src/state/database/collections/peers.rs index 0f3193a..387d26d 100644 --- a/ubisync/src/state/database/collections/peers.rs +++ b/ubisync/src/state/database/collections/peers.rs @@ -2,10 +2,12 @@ use anyhow::{anyhow, Error}; use bonsaidb::core::schema::{Collection, SerializedCollection}; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use ubisync_lib::{peer::Peer, types::PeerId}; +use ubisync_lib::{peer::Peer, types::{ElementId, PeerId}}; use crate::state::database::{as_key::AsKey, StateDB}; +use super::peer_families::DbPeerFamily; + #[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)] #[collection(name = "peers", views = [])] pub(super) struct DbPeer { diff --git a/ubisync/src/state/mod.rs b/ubisync/src/state/mod.rs index fd04812..768f409 100644 --- a/ubisync/src/state/mod.rs +++ b/ubisync/src/state/mod.rs @@ -66,6 +66,14 @@ impl State { } } + pub fn own_peer_id(&self) -> Option { + if let Ok(guard) = self.comm_handle.read() { + guard.as_ref().map(|t| t.own_peer_id().ok()).flatten() + } else { + None + } + } + pub fn get_apps(&self) -> anyhow::Result> { self.db.get_all_apps() } @@ -78,18 +86,30 @@ impl State { self.db .set_element_content(element_id.clone(), content.clone()) .inspect(|_| { - self.send_to_peers(MessageContent::SetElement { - id: element_id.clone(), - content: content.clone(), - }) + //TODO: Get all peers interested in the element, e.g. because they subscribe to the element's pot, a share, etc. + self.send_to_peers( + MessageContent::SetElement { + id: element_id.clone(), + content: content.clone(), + }, + self.own_peer_id() + .map(|id| self.db.get_peer_family_members(id).unwrap_or_default()) + .unwrap_or_default(), + ) }) } pub fn remove_element(&self, element_id: ElementId) -> anyhow::Result<()> { self.db.remove_element(element_id.clone()).inspect(|_| { - self.send_to_peers(MessageContent::RemoveElement { - id: element_id.clone(), - }) + //TODO: Get all peers interested in the element, e.g. because they subscribe to the element's pot, a share, etc. + self.send_to_peers( + MessageContent::RemoveElement { + id: element_id.clone(), + }, + self.own_peer_id() + .map(|id| self.db.get_peer_family_members(id).unwrap_or_default()) + .unwrap_or_default(), + ) }) } @@ -214,13 +234,23 @@ impl State { } } - fn send_to_peers(&self, ct: MessageContent) { + fn send_to_peers(&self, ct: MessageContent, peers: Vec) { match self.comm_handle.read() { Ok(opt) => { if opt.is_some() { let arc = opt.as_ref().unwrap().clone(); tokio::spawn(async move { - let _ = arc.broadcast(Message::new(ct)).await; + for peer in peers { + if let Err(e) = + arc.send(peer.addr_ref(), Message::new(ct.clone())).await + { + debug!( + "Sending to peer '{:?}' returned an error: {}", + peer.b32_addr(), + e + ) + } + } }); } }