Compare commits

..

No commits in common. "636aff64b949d15983d94563879b1a3372434a2c" and "29ff183c087de461a4f384d8a97c7ff43822ce6d" have entirely different histories.

14 changed files with 156 additions and 437 deletions

View file

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use crate::types::{ElementContent, ElementId, Family, FamilyId, MessageId, PotId};
use crate::types::{ElementContent, ElementId, MessageId, PotId};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message {
@ -19,11 +19,6 @@ pub enum MessageContent {
Hello {
peer_name: String,
},
JoinFamily,
AddedToFamily {
family: Family,
},
LeaveFamily,
CreateElement {
id: ElementId,
content: ElementContent,
@ -39,7 +34,7 @@ pub enum MessageContent {
AddPot {
id: PotId,
app_type: String,
},
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]

View file

@ -1,10 +1,8 @@
use std::hash::Hash;
use anyhow::{anyhow, bail};
use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PeerId {
i2p_dest: I2pSocketAddr,
i2p_b32: Option<I2pAddr>,
@ -29,14 +27,6 @@ impl PeerId {
}
}
// The identity of the PeerId only depends on the i2p_dest (which is unique),
// and not on whether the b32 address has been computed before
impl Hash for PeerId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.i2p_dest.hash(state);
}
}
impl ToString for PeerId {
fn to_string(&self) -> String {
self.i2p_dest.to_string()

View file

@ -1,7 +1,7 @@
use tracing::debug;
use ubisync_lib::peer::Peer;
use ubisync_lib::types::{ContentUpdateStrategy, Family, PeerId};
use ubisync_lib::types::{ContentUpdateStrategy, PeerId};
use ubisync_lib::messages::{Message, MessageContent};
@ -9,34 +9,13 @@ use crate::comm::conflict_resolution::merge_element_contents;
use crate::state::CommState;
pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
debug!(
"Received message.\nFrom: {:?} (dest: {:?})\nTo: {:?} (dest: {:?})\nMessage: {message:?}",
peer.b32_addr_nocache(),
peer,
state.own_peer_id().unwrap().b32_addr_nocache(),
state.own_peer_id().unwrap()
);
debug!("Handling message now: {:?}", message);
match message.content() {
MessageContent::Hello { peer_name } => {
state
.set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string())))
.expect("State failed");
}
MessageContent::JoinFamily => state.request_family_join(peer.to_owned()),
MessageContent::AddedToFamily { family } => {
if state.has_family_join_request(peer.to_owned()) {
debug!("Own join request was accepted, setting family");
state
.set_own_family(family.to_owned())
.expect("State failed");
debug!("New own family: {:?}", state.get_family_of_peer(state.own_peer_id().unwrap()))
}
else {
debug!("Got AddedToFamily message, but no family join request was found")
}
}
MessageContent::LeaveFamily => state.remove_peer_from_family(peer.to_owned()),
MessageContent::CreateElement { id, content, pot } => {
state
.add_received_element(
@ -49,12 +28,9 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
.expect("State failed");
}
MessageContent::UpdateElement { id, content } => {
if let Some(new_content) = merge_element_contents(
state,
id.clone(),
content.to_owned(),
message.id().to_owned(),
) {
if let Some(new_content) =
merge_element_contents(state, id.clone(), content.to_owned(), message.id().to_owned())
{
state
.update_element_content(
id.to_owned(),

View file

@ -1,7 +1,6 @@
mod conflict_resolution;
pub mod message_processor;
mod conflict_resolution;
use i2p::sam::StreamForward;
use tracing::{debug, error, warn};
use ubisync_lib::messages::Message;
use ubisync_lib::types::PeerId;
@ -23,7 +22,7 @@ use crate::Config;
pub struct CommHandle {
state: Arc<CommState>,
i2p_server: Arc<I2pListener>,
peer_id: PeerId,
peer_id: RwLock<PeerId>,
// Maps peer addresses to existing connections to them
clients: Arc<RwLock<HashMap<I2pSocketAddr, Arc<RwLock<I2pStream>>>>>,
thread: RwLock<Option<JoinHandle<()>>>,
@ -39,13 +38,12 @@ impl CommHandle {
}
let listener = listener_builder.build().unwrap();
let mut own_peer_id: PeerId = (&listener).local_addr().map_err(|e| anyhow!(e))?.into();
own_peer_id.b32_addr();
let own_peer_id: PeerId = (&listener).local_addr().map_err(|e| anyhow!(e))?.into();
Ok(CommHandle {
state: Arc::new(state),
i2p_server: Arc::new(listener),
peer_id: own_peer_id,
peer_id: RwLock::new(own_peer_id),
clients: Default::default(),
thread: RwLock::new(None),
})
@ -55,17 +53,27 @@ impl CommHandle {
debug!("CommHandle is running now");
let state = self.state.clone();
let i2p_server = self.i2p_server.clone();
let clients = self.clients.clone();
let mut thread_writeguard = self.thread.write().await;
*thread_writeguard = Some(tokio::spawn(async move {
for incoming in i2p_server.incoming() {
if let Ok(stream) = incoming {
let state_arc = state.clone();
// Spawn a blocking task, which (in read_connection) will spawn a non-blocking task
// The "outer" blocking task exists, because the for loop's iterator will block until
// there is another stream - thus, the existing streams will not be read.
// `spawn_blocking` moves the reading task to a special pool of tasks which are
// executed _despite_ other tasks blocking for something.
tokio::task::spawn_blocking(move || Self::read_connection(stream, state_arc));
if let Ok(addr) = stream.peer_addr() {
// First, save a reference to the new stream in `clients` for later reuse
let wrapped_stream = Arc::new(RwLock::new(stream));
clients.write().await.insert(addr, wrapped_stream.clone());
// Reference to state to be passed to `read_connection()`
let state_arc = state.clone();
// Spawn a blocking task, which (in read_connection) will spawn a non-blocking task
// The "outer" blocking task exists, because the for loop's iterator will block until
// there is another stream - thus, the existing streams will not be read.
// `spawn_blocking` moves the reading task to a special pool of tasks which are
// executed _despite_ other tasks blocking for something.
tokio::task::spawn_blocking(move || {
Self::read_connection(wrapped_stream, state_arc)
});
}
}
}
}));
@ -96,7 +104,6 @@ impl CommHandle {
}
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
debug!("Sending message...\nFrom '{:?}' (dest: {:?})\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().b32_addr_nocache(), self.own_peer_id().unwrap());
match serde_json::to_string(&msg) {
Ok(msg_string) => {
self.send_to_addr(dest, msg_string.as_bytes()).await?;
@ -109,8 +116,7 @@ impl CommHandle {
pub async fn send_to_addr(&self, addr: &I2pSocketAddr, msg: &[u8]) -> anyhow::Result<()> {
// Create client for this connection if necessary
if !self.clients.read().await.contains_key(addr) {
debug!("No client exists for requested connection, creating one");
match I2pStream::connect_with_session(self.i2p_server.session(), addr) {
match I2pStream::connect(addr) {
Ok(client) => {
//client.inner.sam.conn.set_nodelay(true)?;
//client.inner.sam.conn.set_nonblocking(false)?;
@ -147,20 +153,23 @@ impl CommHandle {
}
pub fn i2p_address(&self) -> I2pSocketAddr {
self.peer_id.addr()
self.peer_id.blocking_read().addr()
}
pub fn i2p_b32_address(&self) -> anyhow::Result<I2pAddr> {
self.peer_id.b32_addr_nocache()
self.peer_id.blocking_write().b32_addr()
}
pub fn own_peer_id(&self) -> anyhow::Result<PeerId> {
Ok(self.peer_id.to_owned())
Ok(self.peer_id.blocking_read().to_owned())
}
fn read_connection(stream: I2pStream, state: Arc<CommState>) -> JoinHandle<()> {
let mut stream = stream;
fn read_connection(
wrapped_stream: Arc<RwLock<I2pStream>>,
state: Arc<CommState>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut stream = wrapped_stream.write().await;
let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into();
// All streams start with a \n byte which does not belong to the payload, take that from the stream.
@ -169,8 +178,8 @@ impl CommHandle {
return;
}
let iterator =
serde_json::Deserializer::from_reader(stream).into_iter::<serde_json::Value>();
let iterator = serde_json::Deserializer::from_reader(&mut *stream)
.into_iter::<serde_json::Value>();
for item in iterator {
match item {
Ok(value) => match serde_json::from_value::<Message>(value) {

View file

@ -1,4 +1,4 @@
use std::{future::Future, sync::Arc};
use std::sync::Arc;
use anyhow::bail;
use api::{v0::app::App, Api, ApiBuilder};
@ -9,9 +9,8 @@ use node_events::UbisyncNodeEvent;
use state::{ApiState, CommState, State};
use ubisync_lib::{
messages::{Message, MessageContent},
peer::Peer,
types::{AppId, Family, PeerId, PotId},
types::{AppId, PeerId, PotId},
};
pub mod api;
@ -77,39 +76,6 @@ impl Ubisync {
self.state_handle.add_family_member(id)
}
pub async fn request_family_join(&self, peer: PeerId) -> anyhow::Result<()> {
self.state_handle.add_family_join_request(peer.clone());
self.comm_handle
.send(peer.addr_ref(), Message::new(MessageContent::JoinFamily))
.await
}
pub fn accept_family_join(&self, peer: PeerId) -> anyhow::Result<()> {
self.add_family_member_from_id(peer.clone())?;
let family = self.get_family()?;
tokio::spawn({
let ch = self.comm_handle.clone();
async move {
ch.send(
peer.addr_ref(),
Message::new(MessageContent::AddedToFamily { family }),
)
.await
.expect("Could not send family join confirmation to peer");
}
});
Ok(())
}
pub fn get_family(&self) -> anyhow::Result<Family> {
self.state_handle
.get_family_of(self.comm_handle.own_peer_id()?)
}
pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result<Family> {
self.state_handle.get_family_of(peer)
}
pub fn get_apps(&self) -> Vec<App> {
self.state_handle.get_apps().unwrap_or(vec![])
}
@ -126,45 +92,3 @@ impl Ubisync {
self.state_handle.add_pot_member(pot, app)
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use tracing::{debug, Level};
use crate::{config::Config, node_events::UbisyncNodeEvent, Ubisync};
#[tokio::test(flavor = "multi_thread")]
async fn join_and_leave_family() {
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
// Two nodes need to bind to different ports
let mut c2 = Config::default();
c2.api_config.port = Some(9982);
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
ubi1.set_node_event_callback(
move |ev, node| {
if let UbisyncNodeEvent::FamilyJoinRequest { joiner } = ev {
debug!("Received join request, make member join");
node.accept_family_join(joiner).unwrap();
}
},
ubi1.clone(),
);
ubi2.request_family_join(ubi1.get_destination().into())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(5000)).await;
assert_eq!(ubi1.get_family().unwrap(), ubi2.get_family().unwrap());
std::process::exit(0);
}
}

View file

@ -1,6 +1,10 @@
use ubisync_lib::types::{PeerId, PotId};
use ubisync_lib::types::PotId;
pub enum UbisyncNodeEvent {
NewPot { id: PotId, app_type: String },
FamilyJoinRequest { joiner: PeerId },
}
NewPot {
id: PotId,
app_type: String,
}
}

View file

@ -6,7 +6,7 @@ use tracing::debug;
use ubisync_lib::{
api::events::AppEvent,
messages::MessageContent,
types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId},
types::{AppId, Element, ElementContent, ElementId, ContentUpdateStrategy, Pot, PotId},
};
use crate::api::v0::app::App;
@ -57,21 +57,10 @@ impl ApiState {
.map(|app_opt| app_opt.ok_or(Error::msg("Failed to find app")))?
}
pub fn create_element(
&self,
content: ElementContent,
pot: PotId,
update_strategy: ContentUpdateStrategy,
) -> anyhow::Result<ElementId> {
pub fn create_element(&self, content: ElementContent, pot: PotId, update_strategy: ContentUpdateStrategy) -> anyhow::Result<ElementId> {
let id = ElementId::new();
self.db().add_element(
id.clone(),
content.clone(),
update_strategy,
None,
false,
pot.clone(),
)?;
self.db()
.add_element(id.clone(), content.clone(), update_strategy, None, false, pot.clone())?;
debug!("Added element {{{}}}", id.to_string());
self.state.send_to_peers(

View file

@ -5,9 +5,7 @@ use tracing::debug;
use ubisync_lib::{
api::events::AppEvent,
peer::Peer,
types::{
ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, MessageId, PeerId, PotId
},
types::{Element, ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId},
};
use crate::node_events::UbisyncNodeEvent;
@ -100,31 +98,6 @@ impl CommState {
Ok(())
}
pub fn request_family_join(&self, peer: PeerId) {
self.state
.emit_node_event(UbisyncNodeEvent::FamilyJoinRequest { joiner: peer });
}
pub fn remove_peer_from_family(&self, peer: PeerId) {
self.db().remove_peer_from_family(peer);
}
pub fn has_family_join_request(&self, peer: PeerId) -> bool {
self.db().has_family_join_request(peer).unwrap_or(false)
}
pub fn set_own_family(&self, family: Family) -> anyhow::Result<()> {
self.db().add_peer_family(family)
}
pub fn get_family_of_peer(&self, peer: PeerId) -> anyhow::Result<Option<FamilyId>> {
self.db().get_family_of_peer(peer)
}
pub fn own_peer_id(&self) -> Option<PeerId> {
self.state.own_peer_id()
}
fn db(&self) -> &StateDB {
&self.state.db
}
@ -137,7 +110,7 @@ mod tests {
use super::CommState;
use tracing::Level;
use ubisync_lib::types::{ContentUpdateStrategy, ElementContent, ElementId, MessageId, PotId};
use ubisync_lib::types::{ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId};
#[tokio::test]
#[serial_test::serial]

View file

@ -86,21 +86,22 @@ impl From<DbPeerFamily> for Family {
}
impl StateDB {
pub fn add_peer_family(&self, family: Family) -> anyhow::Result<()> {
if self.get_peer_family(family.id.clone())?.is_some() {
Err(Error::msg("Peer family already exists"))
} else {
DbPeerFamily::push(
DbPeerFamily {
id: AsKey::new(family.id),
name: family.name,
members: HashSet::from_iter(family.members),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn add_peer_family(
&self,
id: FamilyId,
name: Option<String>,
initial_members: Vec<PeerId>,
) -> anyhow::Result<()> {
DbPeerFamily::push(
DbPeerFamily {
id: AsKey::new(id),
name,
members: HashSet::from_iter(initial_members),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn add_peer_to_family(&self, peer: PeerId, family: FamilyId) -> anyhow::Result<()> {
@ -165,24 +166,6 @@ impl StateDB {
})?
.map_err(|e| anyhow!(e))
}
pub fn remove_peer_from_family(&self, peer: PeerId) -> anyhow::Result<()> {
self.db
.view::<DbPeerFamilyIdByMemberId>()
.with_key(&AsKey::new(peer.clone()))
.query_with_collection_docs()
.map(|results| {
if let Some(family) = results.into_iter().next() {
family.document.to_owned().modify(&self.db, |doc| {
doc.contents.members.remove(&peer.clone());
})
} else {
Err(bonsaidb::core::Error::other("", "Could not find family"))
}
})
.map(|_| ())
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
@ -197,11 +180,11 @@ mod tests {
let family_id = FamilyId::new();
let peer_id = PeerId::default();
db.add_peer_family(Family {
id: family_id.clone(),
name: Some("My family name".to_string()),
members: vec![peer_id.clone()],
})
db.add_peer_family(
family_id.clone(),
Some("My family name".to_string()),
vec![peer_id.clone()],
)
.unwrap();
let retrieved_family = db.get_peer_family(family_id.clone()).unwrap();
@ -215,42 +198,17 @@ mod tests {
)
}
#[test]
fn add_remove() {
let db = StateDB::init(None);
let family_id = FamilyId::new();
let peer_id = PeerId::default();
db.add_peer_family(Family {
id: family_id.clone(),
name: Some("My family name".to_string()),
members: vec![peer_id.clone()],
})
.unwrap();
db.remove_peer_from_family(peer_id.clone()).unwrap();
let retrieved_family = db.get_peer_family(family_id.clone()).unwrap();
assert_eq!(
retrieved_family,
Some(Family::new(
family_id,
Some("My family name".to_string()),
vec![]
))
)
}
#[test]
fn set_name() {
let db = StateDB::init(None);
let family_id = FamilyId::new();
let peer_id = PeerId::default();
db.add_peer_family(Family {
id: family_id.clone(),
name: Some("My family name".to_string()),
members: vec![peer_id.clone()],
})
db.add_peer_family(
family_id.clone(),
Some("My family name".to_string()),
vec![peer_id.clone()],
)
.unwrap();
assert_eq!(
@ -281,11 +239,11 @@ mod tests {
let family_id = FamilyId::new();
let peer_id = PeerId::default();
db.add_peer_family(Family {
id: family_id.clone(),
name: Some("My family name".to_string()),
members: vec![peer_id.clone()],
})
db.add_peer_family(
family_id.clone(),
Some("My family name".to_string()),
vec![peer_id.clone()],
)
.unwrap();
assert_eq!(db.get_family_of_peer(peer_id).unwrap(), Some(family_id))

View file

@ -2,10 +2,7 @@ use anyhow::{anyhow, Error};
use bonsaidb::core::schema::{Collection, SerializedCollection};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ubisync_lib::{
peer::Peer,
types::{ElementId, PeerId},
};
use ubisync_lib::{peer::Peer, types::{ElementId, PeerId}};
use crate::state::database::{as_key::AsKey, StateDB};
@ -27,19 +24,15 @@ impl From<DbPeer> for Peer {
impl StateDB {
pub fn add_peer(&self, peer: Peer) -> anyhow::Result<()> {
if self.get_peer(peer.id())?.is_some() {
Err(Error::msg("Peer already exists"))
} else {
DbPeer::push(
DbPeer {
id: AsKey::new(peer.id()),
name: peer.name(),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
DbPeer::push(
DbPeer {
id: AsKey::new(peer.id()),
name: peer.name(),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn get_peer(&self, id: PeerId) -> anyhow::Result<Option<Peer>> {

View file

@ -104,23 +104,19 @@ impl StateDB {
"A member app which does not exist was meant to be added to a pot",
))
} else {
if self.get_pot_membership(pot.clone(), app.clone())?.is_some() {
Err(Error::msg("Pot membership already exists"))
} else {
DbPotMembership::push(
DbPotMembership {
pot_id: AsKey::new(pot),
app_id: AsKey::new(app),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
DbPotMembership::push(
DbPotMembership {
pot_id: AsKey::new(pot),
app_id: AsKey::new(app),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
}
pub fn get_pot_membership(&self, pot: PotId, app: AppId) -> anyhow::Result<Option<()>> {
pub fn get(&self, pot: PotId, app: AppId) -> anyhow::Result<Option<()>> {
self.db
.view::<DbPotMembershipsByBothIds>()
.with_key(&(AsKey::new(pot), AsKey::new(app)))

View file

@ -1,4 +1,4 @@
use anyhow::{Error, anyhow};
use anyhow::anyhow;
use bonsaidb::core::schema::{Collection, SerializedCollection};
use serde::{Deserialize, Serialize};
use ubisync_lib::types::{Pot, PotId};
@ -15,28 +15,21 @@ pub(super) struct DbPot {
impl From<DbPot> for Pot {
fn from(value: DbPot) -> Self {
Pot {
id: (*value.id).clone(),
app_type: value.app_type,
}
Pot {id: (*value.id).clone(), app_type: value.app_type}
}
}
impl StateDB {
pub fn add_pot(&self, id: PotId, app_type: String) -> anyhow::Result<()> {
if self.get_pot(id.clone())?.is_some() {
Err(Error::msg("Pot already exists"))
} else {
DbPot::push(
DbPot {
id: AsKey::new(id),
app_type,
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
DbPot::push(
DbPot {
id: AsKey::new(id),
app_type,
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn get_pot(&self, id: PotId) -> anyhow::Result<Option<Pot>> {
@ -52,6 +45,7 @@ mod tests {
use crate::state::database::StateDB;
#[test]
fn add_get() {
let db = StateDB::init(None);
@ -59,12 +53,6 @@ mod tests {
db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap();
let retrieved_pot = db.get_pot(pot_id.clone()).unwrap();
assert_eq!(
retrieved_pot,
Some(Pot {
id: pot_id,
app_type: "app_type".to_string()
})
)
assert_eq!(retrieved_pot, Some(Pot {id: pot_id, app_type: "app_type".to_string()}))
}
}
}

View file

@ -1,13 +1,7 @@
use anyhow::anyhow;
use bonsaidb::{
core::keyvalue::KeyValue,
local::{
config::{Builder, StorageConfiguration},
Database as BonsaiDb,
},
use bonsaidb::local::{
config::{Builder, StorageConfiguration},
Database as BonsaiDb,
};
use tracing::debug;
use ubisync_lib::types::PeerId;
use uuid::Uuid;
use self::collections::UbisyncSchema;
@ -30,44 +24,4 @@ impl StateDB {
db: BonsaiDb::open::<UbisyncSchema>(storage_conf).unwrap(),
}
}
pub fn add_family_join_request(&self, peer: PeerId) {
self.db.set_key(peer.to_string(), &"").execute();
debug!(
"Added join request: {:?}",
self.db.get_key(peer.to_string()).query()
)
}
pub fn has_family_join_request(&self, peer: PeerId) -> anyhow::Result<bool> {
Ok(self
.db
.get_key(peer.to_string())
.query()
.map_err(|e| anyhow!(e))?
.is_some())
}
pub fn remove_family_join_request(&self, peer: PeerId) -> anyhow::Result<()> {
self.db
.delete_key(peer.to_string())
.map(|_| ())
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::types::PeerId;
use super::StateDB;
#[test]
fn family_join_requests() {
let db = StateDB::init(None);
let peer = PeerId::default();
db.add_family_join_request(peer.clone());
assert!(db.has_family_join_request(peer).unwrap());
}
}

View file

@ -10,10 +10,10 @@ use ubisync_lib::{
api::events::AppEvent,
messages::{Message, MessageContent},
peer::Peer,
types::{AppId, Element, ElementContent, ElementId, Family, FamilyId, PeerId, PotId, Tag},
types::{AppId, Element, ElementContent, ElementId, FamilyId, PeerId, PotId, Tag},
};
use anyhow::{anyhow, Error};
use anyhow::Error;
use tracing::{debug, warn};
mod api_state;
@ -68,11 +68,10 @@ impl State {
pub fn own_peer_id(&self) -> Option<PeerId> {
if let Ok(guard) = self.comm_handle.read() {
if let Some(ch) = guard.as_ref() {
return ch.own_peer_id().ok();
}
guard.as_ref().map(|t| t.own_peer_id().ok()).flatten()
} else {
None
}
None
}
pub fn get_apps(&self) -> anyhow::Result<Vec<App>> {
@ -150,51 +149,24 @@ impl State {
pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> {
let my_id = self
.own_peer_id()
.ok_or(Error::msg("Could not get own PeerId"))?;
.comm_handle
.read()
.map_err(|_| Error::msg("Failed to lock on CommHandle"))?
.to_owned()
.ok_or(Error::msg("CommHandle not initialized"))?
.own_peer_id()?;
let my_family = match self.db.get_family_of_peer(my_id.clone())? {
Some(id) => {
self.db.add_peer_to_family(peer.clone(), id.clone())?;
self.db
.get_peer_family(id)?
.ok_or(Error::msg("Family not found"))?
}
None => {
debug!(
"This node does not have a family yet, creating one with this node and the added peer"
);
let family = Family {
id: FamilyId::new(),
name: None,
members: vec![my_id.clone(), peer.clone()],
};
self.db.add_peer_family(family.clone())?;
family
}
};
self.send_to_peers(
MessageContent::AddedToFamily { family: my_family },
vec![peer],
);
Ok(())
}
pub fn add_family_join_request(&self, peer: PeerId) {
self.db.add_family_join_request(peer)
}
pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result<Family> {
match self.db.get_peer_family(
if self.db.get_family_of_peer(my_id.clone())?.is_none() {
self.db
.get_family_of_peer(peer)?
.ok_or(Error::msg("Family of peer not found"))?,
) {
Ok(Some(family)) => Ok(family),
Ok(None) => Err(Error::msg("Family not found by its FamilyId")),
Err(e) => Err(e),
.add_peer_family(FamilyId::new(), None, vec![my_id.clone()])?;
}
self.db.add_peer_to_family(
peer,
self.db
.get_family_of_peer(my_id)?
.ok_or(Error::msg("Could not find own family"))?,
)
}
pub fn get_event_receiver(
@ -262,25 +234,23 @@ impl State {
}
}
pub fn send_to_peers(&self, ct: MessageContent, peers: Vec<PeerId>) {
fn send_to_peers(&self, ct: MessageContent, peers: Vec<PeerId>) {
match self.comm_handle.read() {
Ok(opt) => {
if opt.is_some() {
let arc = opt.as_ref().unwrap().clone();
tokio::task::spawn_blocking(|| {
tokio::spawn(async move {
for peer in peers {
let _ = arc
.send(peer.addr_ref(), Message::new(ct.clone()))
.await
.map_err(|e| {
debug!(
"Sending to peer '{:?}' returned an error:\n{e}",
peer
)
});
tokio::spawn(async move {
for peer in peers {
if let Err(e) =
arc.send(peer.addr_ref(), Message::new(ct.clone())).await
{
debug!(
"Sending to peer '{:?}' returned an error: {}",
peer.b32_addr_nocache(),
e
)
}
})
}
});
}
}