diff --git a/ubisync/src/comm/message_processor.rs b/ubisync/src/comm/message_processor.rs index 13e94d9..7069c48 100644 --- a/ubisync/src/comm/message_processor.rs +++ b/ubisync/src/comm/message_processor.rs @@ -37,6 +37,7 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) { .set_own_family(family.to_owned()) .expect("State failed"); state.set_metastate_share(metastate_share.to_owned()); + debug!("Joined family, requesting metastate"); state.send_to_peers( MessageContent::Get { shares: vec![metastate_share.to_owned()], diff --git a/ubisync/src/comm/mod.rs b/ubisync/src/comm/mod.rs index fe9a197..7a9864c 100644 --- a/ubisync/src/comm/mod.rs +++ b/ubisync/src/comm/mod.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::ops::Deref; use std::sync::Arc; +use std::time::Duration; use anyhow::{anyhow, bail}; use i2p::net::{I2pAddr, I2pListener, I2pListenerBuilder, I2pSocketAddr, I2pStream}; @@ -93,11 +94,34 @@ impl CommHandle { } } - pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> { - debug!("Sending message...\nFrom '{:?}'\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().addr()); + pub async fn send(&self, dest: &I2pSocketAddr, msg: Message, custom_retry_settings: Option<(u64, u64)>) -> anyhow::Result<()> { match serde_json::to_string(&msg) { Ok(msg_string) => { - self.send_to_addr(dest, msg_string.as_bytes()).await?; + // Default retry settings + let (tries, base_delay) = match custom_retry_settings { + Some((ct, d)) => (ct, d), + None => (3, 200), + }; + + // Send message until success or too many retries + let mut ctr = 0; + while ctr < tries { + debug!("Sending message...\nFrom '{:?}'\nTo '{dest:?}'\nAttempt loop ctr: {ctr}\nMessage: '{msg:?}", self.own_peer_id().unwrap().addr()); + match self.send_to_addr(dest, msg_string.as_bytes()).await { + Ok(_) => break, + Err(e) => { + debug!("{e:?}"); + // Return last error if still unsuccessful + if ctr >= tries - 1 { + bail!(e) + } + + // Continue loop otherwise + ctr = ctr + 1; + tokio::time::sleep(Duration::from_millis(base_delay * ctr)).await; + } + } + } Ok(()) } Err(e) => bail!(e), diff --git a/ubisync/src/lib.rs b/ubisync/src/lib.rs index e66c3c7..78858a3 100644 --- a/ubisync/src/lib.rs +++ b/ubisync/src/lib.rs @@ -79,7 +79,11 @@ impl Ubisync { pub async fn request_family_join(&self, peer: PeerId) -> anyhow::Result<()> { self.state_handle.add_family_join_request(peer.clone()); self.comm_handle - .send(peer.addr_ref(), Message::new(MessageContent::JoinFamily)) + .send( + peer.addr_ref(), + Message::new(MessageContent::JoinFamily), + None, + ) .await } @@ -99,6 +103,7 @@ impl Ubisync { family, metastate_share, }), + None, ) .await .expect("Could not send family join confirmation to peer"); @@ -180,14 +185,14 @@ mod tests { std::process::exit(0); } - #[tokio::test(flavor = "multi_thread")] async fn sync_metastate() { /* - create & join family - */ + create & join family + */ tracing_subscriber::fmt() - .pretty() + .compact() + .with_file(true) .with_max_level(Level::DEBUG) .init(); @@ -214,8 +219,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(5000)).await; /* - create & sync metastate - */ - + create & sync metastate + */ } } diff --git a/ubisync/src/state/database/mod.rs b/ubisync/src/state/database/mod.rs index a35f1b0..85ae25e 100644 --- a/ubisync/src/state/database/mod.rs +++ b/ubisync/src/state/database/mod.rs @@ -58,17 +58,18 @@ impl StateDB { } pub fn set_metastate_share(&self, share: ShareId) { - let _ = self.db.set_key("metastate_share", &share).execute(); + self.db + .set_key("metastate_share", &share) + .execute() + .expect("Could not set metastate share"); } pub fn get_metastate_share(&self) -> Option { self.db .get_key("metastate_share") .query() - .ok() - .flatten() - .map(|val| val.deserialize().ok()) - .flatten() + .ok()? + .and_then(|val| val.deserialize().ok())? } pub const fn apps(&self) -> Apps { diff --git a/ubisync/src/state/mod.rs b/ubisync/src/state/mod.rs index d416e13..2f976ee 100644 --- a/ubisync/src/state/mod.rs +++ b/ubisync/src/state/mod.rs @@ -15,7 +15,7 @@ use ubisync_lib::{ }, }; -use anyhow::Error; +use anyhow::{anyhow, bail, Error}; use tracing::{debug, warn}; mod api_state; @@ -177,11 +177,8 @@ impl State { }, members: metastate_members, }); - let _ = self - .db - .pots() - .add(metastate_potid.clone(), "".to_string()); - + let _ = self.db.pots().add(metastate_potid.clone(), "".to_string()); + debug!("{:?}", self.db.pots().get_all().unwrap()); let _ = self.db.elements().add( metastate_elementid, @@ -224,20 +221,42 @@ impl State { members: HashSet::from([my_id.clone(), peer.clone()]), }; self.db.families().add(family.clone())?; + + let metastate_pot = PotId::new(); + self.db + .pots() + .add(metastate_pot.clone(), "ubisync".to_string()) + .expect("Could not add metastate pot"); + let metastate_share = ShareId::new(); + let mut metastate_members = HashMap::new(); + metastate_members.insert(family.id.clone(), SharePermissions::Owner); + self.db + .shares() + .add(Share { + id: metastate_share.clone(), + content: ShareContent { + pots: vec![metastate_pot], + }, + members: metastate_members, + }) + .expect("Could not add metastate share"); + + self.db.set_metastate_share(metastate_share); + family } }; - self.send_to_peers( - MessageContent::AddedToFamily { - family: my_family, - metastate_share: self - .db - .get_metastate_share() - .expect("Node is in a family, but does not know metastate ShareId"), + let mc = MessageContent::AddedToFamily { + family: my_family, + metastate_share: match self.db.get_metastate_share() { + Some(s) => s, + None => { + bail!("Metastate not found") + } }, - vec![peer], - ); + }; + self.send_to_peers(mc, vec![peer]); Ok(()) } @@ -277,14 +296,10 @@ impl State { } pub fn emit_node_event(&self, ev: UbisyncNodeEvent) -> anyhow::Result<()> { - debug!("1"); match self.node_event_callback.read() { Ok(readguard) => { - debug!("2"); - debug!("{:?}", readguard.is_some()); // If a callback is set, call it. if let Some(cb) = readguard.as_ref() { - debug!("3"); (*cb)(ev); } // Whether a callback is set or not, return Ok(_) @@ -328,15 +343,21 @@ impl State { } pub fn send_to_peers(&self, ct: MessageContent, peers: Vec) { + debug!("1"); match self.comm_handle.read() { Ok(opt) => { + debug!("2"); if opt.is_some() { + debug!("3"); let arc = opt.as_ref().unwrap().clone(); tokio::task::spawn_blocking(|| { + debug!("4"); tokio::spawn(async move { + debug!("5"); for peer in peers { + debug!("6"); let _ = arc - .send(peer.addr_ref(), Message::new(ct.clone())) + .send(peer.addr_ref(), Message::new(ct.clone()), None) .await .map_err(|e| { debug!(