Share events with own family by default, instead of all known peers. Relates to #5

This commit is contained in:
Philip (a-0) 2024-02-13 16:34:53 +01:00
parent 6581c3b9c6
commit 4381cc82cb
5 changed files with 121 additions and 24 deletions

View file

@ -1,4 +1,4 @@
use anyhow::bail; use anyhow::{anyhow, bail};
use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs}; use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -11,6 +11,12 @@ impl PeerId {
pub fn addr(&self) -> I2pSocketAddr { pub fn addr(&self) -> I2pSocketAddr {
self.i2p_addr.to_owned() self.i2p_addr.to_owned()
} }
pub fn addr_ref(&self) -> &I2pSocketAddr {
&self.i2p_addr
}
pub fn b32_addr(&self) -> anyhow::Result<I2pAddr> {
I2pAddr::from_b64(&self.i2p_addr.dest().string()).map_err(|e| anyhow!(e))
}
} }
impl ToString for PeerId { impl ToString for PeerId {

View file

@ -63,11 +63,17 @@ impl ApiState {
.add_element(id.clone(), content.clone(), None, false, pot.clone())?; .add_element(id.clone(), content.clone(), None, false, pot.clone())?;
debug!("Added element {{{}}}", id.to_string()); debug!("Added element {{{}}}", id.to_string());
self.state.send_to_peers(MessageContent::CreateElement { self.state.send_to_peers(
id: id.clone(), MessageContent::CreateElement {
content: content, id: id.clone(),
pot: pot, content: content,
}); pot: pot,
},
self.state
.own_peer_id()
.map(|id| self.db().get_peer_family_members(id).unwrap_or_default())
.unwrap_or_default(),
);
Ok(id) Ok(id)
} }
@ -110,10 +116,16 @@ impl ApiState {
self.db().add_pot(pot_id.clone(), app_type.clone())?; self.db().add_pot(pot_id.clone(), app_type.clone())?;
self.db().add_pot_membership(pot_id.clone(), app_id)?; self.db().add_pot_membership(pot_id.clone(), app_id)?;
self.state.send_to_peers(MessageContent::AddPot { self.state.send_to_peers(
id: pot_id.clone(), MessageContent::AddPot {
app_type: app_type, id: pot_id.clone(),
}); app_type: app_type,
},
self.state
.own_peer_id()
.map(|id| self.db().get_peer_family_members(id).unwrap_or_default())
.unwrap_or_default(),
);
Ok(pot_id) Ok(pot_id)
} }

View file

@ -13,7 +13,7 @@ use ubisync_lib::types::{Family, FamilyId, PeerId};
use crate::state::database::{as_key::AsKey, StateDB}; use crate::state::database::{as_key::AsKey, StateDB};
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)] #[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "peer-families", views = [DbPeerFamilyByMemberId])] #[collection(name = "peer-families", views = [DbPeerFamilyIdByMemberId, DbPeerFamilyByMemberId])]
pub(super) struct DbPeerFamily { pub(super) struct DbPeerFamily {
#[natural_id] #[natural_id]
pub(super) id: AsKey<FamilyId>, pub(super) id: AsKey<FamilyId>,
@ -22,7 +22,34 @@ pub(super) struct DbPeerFamily {
} }
#[derive(Debug, Clone, View, ViewSchema)] #[derive(Debug, Clone, View, ViewSchema)]
#[view(collection = DbPeerFamily, key = AsKey<PeerId>, value = Option<FamilyId>, name = "by-member-id")] #[view(collection = DbPeerFamily, key = AsKey<PeerId>, value = Option<FamilyId>, name = "id-by-member-id")]
pub(super) struct DbPeerFamilyIdByMemberId;
impl MapReduce for DbPeerFamilyIdByMemberId {
fn map<'doc>(
&self,
document: &'doc bonsaidb::core::document::BorrowedDocument<'_>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let content = DbPeerFamily::document_contents(document)?;
Ok(content
.members
.iter()
.fold(Mappings::default(), |acc, member| {
acc.and(
document
.header
.emit_key_and_value(
AsKey::new(member.to_owned()),
Some((*content.id).clone()),
)
.unwrap_or(Mappings::none()),
)
}))
}
}
#[derive(Debug, Clone, View, ViewSchema)]
#[view(collection = DbPeerFamily, key = AsKey<PeerId>, value = Option<Family>, name = "by-member-id")]
pub(super) struct DbPeerFamilyByMemberId; pub(super) struct DbPeerFamilyByMemberId;
impl MapReduce for DbPeerFamilyByMemberId { impl MapReduce for DbPeerFamilyByMemberId {
@ -40,7 +67,7 @@ impl MapReduce for DbPeerFamilyByMemberId {
.header .header
.emit_key_and_value( .emit_key_and_value(
AsKey::new(member.to_owned()), AsKey::new(member.to_owned()),
Some((*content.id).clone()), Some(content.clone().into()),
) )
.unwrap_or(Mappings::none()), .unwrap_or(Mappings::none()),
) )
@ -109,7 +136,7 @@ impl StateDB {
pub fn get_family_of_peer(&self, peer: PeerId) -> anyhow::Result<Option<FamilyId>> { pub fn get_family_of_peer(&self, peer: PeerId) -> anyhow::Result<Option<FamilyId>> {
self.db self.db
.view::<DbPeerFamilyByMemberId>() .view::<DbPeerFamilyIdByMemberId>()
.with_key(&AsKey::new(peer)) .with_key(&AsKey::new(peer))
.query_with_collection_docs() .query_with_collection_docs()
.map(|results| match results.mappings.len() { .map(|results| match results.mappings.len() {
@ -119,6 +146,26 @@ impl StateDB {
})? })?
.map_err(|e| anyhow!(e)) .map_err(|e| anyhow!(e))
} }
pub fn get_peer_family_members(&self, peer: PeerId) -> anyhow::Result<Vec<PeerId>> {
self.db
.view::<DbPeerFamilyByMemberId>()
.with_key(&AsKey::new(peer))
.query_with_collection_docs()
.map(|results| match results.mappings.len() {
0 => Ok(vec![]),
1 => Ok(results
.mappings
.first()
.unwrap()
.value
.clone()
.map(|family| family.members)
.unwrap_or(vec![])),
_ => Err(Error::msg("Peer appears to be member of multiple families")),
})?
.map_err(|e| anyhow!(e))
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -2,10 +2,12 @@ use anyhow::{anyhow, Error};
use bonsaidb::core::schema::{Collection, SerializedCollection}; use bonsaidb::core::schema::{Collection, SerializedCollection};
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use ubisync_lib::{peer::Peer, types::PeerId}; use ubisync_lib::{peer::Peer, types::{ElementId, PeerId}};
use crate::state::database::{as_key::AsKey, StateDB}; use crate::state::database::{as_key::AsKey, StateDB};
use super::peer_families::DbPeerFamily;
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)] #[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "peers", views = [])] #[collection(name = "peers", views = [])]
pub(super) struct DbPeer { pub(super) struct DbPeer {

View file

@ -66,6 +66,14 @@ impl State {
} }
} }
pub fn own_peer_id(&self) -> Option<PeerId> {
if let Ok(guard) = self.comm_handle.read() {
guard.as_ref().map(|t| t.own_peer_id().ok()).flatten()
} else {
None
}
}
pub fn get_apps(&self) -> anyhow::Result<Vec<App>> { pub fn get_apps(&self) -> anyhow::Result<Vec<App>> {
self.db.get_all_apps() self.db.get_all_apps()
} }
@ -78,18 +86,30 @@ impl State {
self.db self.db
.set_element_content(element_id.clone(), content.clone()) .set_element_content(element_id.clone(), content.clone())
.inspect(|_| { .inspect(|_| {
self.send_to_peers(MessageContent::SetElement { //TODO: Get all peers interested in the element, e.g. because they subscribe to the element's pot, a share, etc.
id: element_id.clone(), self.send_to_peers(
content: content.clone(), MessageContent::SetElement {
}) id: element_id.clone(),
content: content.clone(),
},
self.own_peer_id()
.map(|id| self.db.get_peer_family_members(id).unwrap_or_default())
.unwrap_or_default(),
)
}) })
} }
pub fn remove_element(&self, element_id: ElementId) -> anyhow::Result<()> { pub fn remove_element(&self, element_id: ElementId) -> anyhow::Result<()> {
self.db.remove_element(element_id.clone()).inspect(|_| { self.db.remove_element(element_id.clone()).inspect(|_| {
self.send_to_peers(MessageContent::RemoveElement { //TODO: Get all peers interested in the element, e.g. because they subscribe to the element's pot, a share, etc.
id: element_id.clone(), self.send_to_peers(
}) MessageContent::RemoveElement {
id: element_id.clone(),
},
self.own_peer_id()
.map(|id| self.db.get_peer_family_members(id).unwrap_or_default())
.unwrap_or_default(),
)
}) })
} }
@ -214,13 +234,23 @@ impl State {
} }
} }
fn send_to_peers(&self, ct: MessageContent) { fn send_to_peers(&self, ct: MessageContent, peers: Vec<PeerId>) {
match self.comm_handle.read() { match self.comm_handle.read() {
Ok(opt) => { Ok(opt) => {
if opt.is_some() { if opt.is_some() {
let arc = opt.as_ref().unwrap().clone(); let arc = opt.as_ref().unwrap().clone();
tokio::spawn(async move { tokio::spawn(async move {
let _ = arc.broadcast(Message::new(ct)).await; for peer in peers {
if let Err(e) =
arc.send(peer.addr_ref(), Message::new(ct.clone())).await
{
debug!(
"Sending to peer '{:?}' returned an error: {}",
peer.b32_addr(),
e
)
}
}
}); });
} }
} }