Basic family join/leave implementation, tests not fully working

This commit is contained in:
Philip (a-0) 2024-03-23 17:46:25 +01:00
parent ec0a55b286
commit 636aff64b9
14 changed files with 429 additions and 151 deletions

View file

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::types::{ElementContent, ElementId, MessageId, PotId}; use crate::types::{ElementContent, ElementId, Family, FamilyId, MessageId, PotId};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message { pub struct Message {
@ -19,6 +19,11 @@ pub enum MessageContent {
Hello { Hello {
peer_name: String, peer_name: String,
}, },
JoinFamily,
AddedToFamily {
family: Family,
},
LeaveFamily,
CreateElement { CreateElement {
id: ElementId, id: ElementId,
content: ElementContent, content: ElementContent,
@ -34,7 +39,7 @@ pub enum MessageContent {
AddPot { AddPot {
id: PotId, id: PotId,
app_type: String, app_type: String,
} },
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]

View file

@ -1,8 +1,10 @@
use std::hash::Hash;
use anyhow::{anyhow, bail}; use anyhow::{anyhow, bail};
use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs}; use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct PeerId { pub struct PeerId {
i2p_dest: I2pSocketAddr, i2p_dest: I2pSocketAddr,
i2p_b32: Option<I2pAddr>, i2p_b32: Option<I2pAddr>,
@ -27,6 +29,14 @@ impl PeerId {
} }
} }
// The identity of the PeerId only depends on the i2p_dest (which is unique),
// and not on whether the b32 address has been computed before
impl Hash for PeerId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.i2p_dest.hash(state);
}
}
impl ToString for PeerId { impl ToString for PeerId {
fn to_string(&self) -> String { fn to_string(&self) -> String {
self.i2p_dest.to_string() self.i2p_dest.to_string()

View file

@ -1,7 +1,7 @@
use tracing::debug; use tracing::debug;
use ubisync_lib::peer::Peer; use ubisync_lib::peer::Peer;
use ubisync_lib::types::{ContentUpdateStrategy, PeerId}; use ubisync_lib::types::{ContentUpdateStrategy, Family, PeerId};
use ubisync_lib::messages::{Message, MessageContent}; use ubisync_lib::messages::{Message, MessageContent};
@ -9,13 +9,34 @@ use crate::comm::conflict_resolution::merge_element_contents;
use crate::state::CommState; use crate::state::CommState;
pub fn handle(state: &CommState, peer: &PeerId, message: Message) { pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
debug!("Handling message now: {:?}", message); debug!(
"Received message.\nFrom: {:?} (dest: {:?})\nTo: {:?} (dest: {:?})\nMessage: {message:?}",
peer.b32_addr_nocache(),
peer,
state.own_peer_id().unwrap().b32_addr_nocache(),
state.own_peer_id().unwrap()
);
match message.content() { match message.content() {
MessageContent::Hello { peer_name } => { MessageContent::Hello { peer_name } => {
state state
.set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string()))) .set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string())))
.expect("State failed"); .expect("State failed");
} }
MessageContent::JoinFamily => state.request_family_join(peer.to_owned()),
MessageContent::AddedToFamily { family } => {
if state.has_family_join_request(peer.to_owned()) {
debug!("Own join request was accepted, setting family");
state
.set_own_family(family.to_owned())
.expect("State failed");
debug!("New own family: {:?}", state.get_family_of_peer(state.own_peer_id().unwrap()))
}
else {
debug!("Got AddedToFamily message, but no family join request was found")
}
}
MessageContent::LeaveFamily => state.remove_peer_from_family(peer.to_owned()),
MessageContent::CreateElement { id, content, pot } => { MessageContent::CreateElement { id, content, pot } => {
state state
.add_received_element( .add_received_element(
@ -28,9 +49,12 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
.expect("State failed"); .expect("State failed");
} }
MessageContent::UpdateElement { id, content } => { MessageContent::UpdateElement { id, content } => {
if let Some(new_content) = if let Some(new_content) = merge_element_contents(
merge_element_contents(state, id.clone(), content.to_owned(), message.id().to_owned()) state,
{ id.clone(),
content.to_owned(),
message.id().to_owned(),
) {
state state
.update_element_content( .update_element_content(
id.to_owned(), id.to_owned(),

View file

@ -1,6 +1,7 @@
pub mod message_processor;
mod conflict_resolution; mod conflict_resolution;
pub mod message_processor;
use i2p::sam::StreamForward;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use ubisync_lib::messages::Message; use ubisync_lib::messages::Message;
use ubisync_lib::types::PeerId; use ubisync_lib::types::PeerId;
@ -54,27 +55,17 @@ impl CommHandle {
debug!("CommHandle is running now"); debug!("CommHandle is running now");
let state = self.state.clone(); let state = self.state.clone();
let i2p_server = self.i2p_server.clone(); let i2p_server = self.i2p_server.clone();
let clients = self.clients.clone();
let mut thread_writeguard = self.thread.write().await; let mut thread_writeguard = self.thread.write().await;
*thread_writeguard = Some(tokio::spawn(async move { *thread_writeguard = Some(tokio::spawn(async move {
for incoming in i2p_server.incoming() { for incoming in i2p_server.incoming() {
if let Ok(stream) = incoming { if let Ok(stream) = incoming {
if let Ok(addr) = stream.peer_addr() {
// First, save a reference to the new stream in `clients` for later reuse
let wrapped_stream = Arc::new(RwLock::new(stream));
clients.write().await.insert(addr, wrapped_stream.clone());
// Reference to state to be passed to `read_connection()`
let state_arc = state.clone(); let state_arc = state.clone();
// Spawn a blocking task, which (in read_connection) will spawn a non-blocking task // Spawn a blocking task, which (in read_connection) will spawn a non-blocking task
// The "outer" blocking task exists, because the for loop's iterator will block until // The "outer" blocking task exists, because the for loop's iterator will block until
// there is another stream - thus, the existing streams will not be read. // there is another stream - thus, the existing streams will not be read.
// `spawn_blocking` moves the reading task to a special pool of tasks which are // `spawn_blocking` moves the reading task to a special pool of tasks which are
// executed _despite_ other tasks blocking for something. // executed _despite_ other tasks blocking for something.
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || Self::read_connection(stream, state_arc));
Self::read_connection(wrapped_stream, state_arc)
});
}
} }
} }
})); }));
@ -105,7 +96,7 @@ impl CommHandle {
} }
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> { pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
debug!("To '{dest:?}': '{msg:?}"); debug!("Sending message...\nFrom '{:?}' (dest: {:?})\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().b32_addr_nocache(), self.own_peer_id().unwrap());
match serde_json::to_string(&msg) { match serde_json::to_string(&msg) {
Ok(msg_string) => { Ok(msg_string) => {
self.send_to_addr(dest, msg_string.as_bytes()).await?; self.send_to_addr(dest, msg_string.as_bytes()).await?;
@ -119,7 +110,7 @@ impl CommHandle {
// Create client for this connection if necessary // Create client for this connection if necessary
if !self.clients.read().await.contains_key(addr) { if !self.clients.read().await.contains_key(addr) {
debug!("No client exists for requested connection, creating one"); debug!("No client exists for requested connection, creating one");
match I2pStream::connect(addr) { match I2pStream::connect_with_session(self.i2p_server.session(), addr) {
Ok(client) => { Ok(client) => {
//client.inner.sam.conn.set_nodelay(true)?; //client.inner.sam.conn.set_nodelay(true)?;
//client.inner.sam.conn.set_nonblocking(false)?; //client.inner.sam.conn.set_nonblocking(false)?;
@ -167,12 +158,9 @@ impl CommHandle {
Ok(self.peer_id.to_owned()) Ok(self.peer_id.to_owned())
} }
fn read_connection( fn read_connection(stream: I2pStream, state: Arc<CommState>) -> JoinHandle<()> {
wrapped_stream: Arc<RwLock<I2pStream>>, let mut stream = stream;
state: Arc<CommState>,
) -> JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
let mut stream = wrapped_stream.write().await;
let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into(); let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into();
// All streams start with a \n byte which does not belong to the payload, take that from the stream. // All streams start with a \n byte which does not belong to the payload, take that from the stream.
@ -181,8 +169,8 @@ impl CommHandle {
return; return;
} }
let iterator = serde_json::Deserializer::from_reader(&mut *stream) let iterator =
.into_iter::<serde_json::Value>(); serde_json::Deserializer::from_reader(stream).into_iter::<serde_json::Value>();
for item in iterator { for item in iterator {
match item { match item {
Ok(value) => match serde_json::from_value::<Message>(value) { Ok(value) => match serde_json::from_value::<Message>(value) {

View file

@ -1,4 +1,4 @@
use std::sync::Arc; use std::{future::Future, sync::Arc};
use anyhow::bail; use anyhow::bail;
use api::{v0::app::App, Api, ApiBuilder}; use api::{v0::app::App, Api, ApiBuilder};
@ -9,8 +9,9 @@ use node_events::UbisyncNodeEvent;
use state::{ApiState, CommState, State}; use state::{ApiState, CommState, State};
use ubisync_lib::{ use ubisync_lib::{
messages::{Message, MessageContent},
peer::Peer, peer::Peer,
types::{AppId, PeerId, PotId}, types::{AppId, Family, PeerId, PotId},
}; };
pub mod api; pub mod api;
@ -76,6 +77,39 @@ impl Ubisync {
self.state_handle.add_family_member(id) self.state_handle.add_family_member(id)
} }
pub async fn request_family_join(&self, peer: PeerId) -> anyhow::Result<()> {
self.state_handle.add_family_join_request(peer.clone());
self.comm_handle
.send(peer.addr_ref(), Message::new(MessageContent::JoinFamily))
.await
}
pub fn accept_family_join(&self, peer: PeerId) -> anyhow::Result<()> {
self.add_family_member_from_id(peer.clone())?;
let family = self.get_family()?;
tokio::spawn({
let ch = self.comm_handle.clone();
async move {
ch.send(
peer.addr_ref(),
Message::new(MessageContent::AddedToFamily { family }),
)
.await
.expect("Could not send family join confirmation to peer");
}
});
Ok(())
}
pub fn get_family(&self) -> anyhow::Result<Family> {
self.state_handle
.get_family_of(self.comm_handle.own_peer_id()?)
}
pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result<Family> {
self.state_handle.get_family_of(peer)
}
pub fn get_apps(&self) -> Vec<App> { pub fn get_apps(&self) -> Vec<App> {
self.state_handle.get_apps().unwrap_or(vec![]) self.state_handle.get_apps().unwrap_or(vec![])
} }
@ -92,3 +126,45 @@ impl Ubisync {
self.state_handle.add_pot_member(pot, app) self.state_handle.add_pot_member(pot, app)
} }
} }
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use tracing::{debug, Level};
use crate::{config::Config, node_events::UbisyncNodeEvent, Ubisync};
#[tokio::test(flavor = "multi_thread")]
async fn join_and_leave_family() {
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
// Two nodes need to bind to different ports
let mut c2 = Config::default();
c2.api_config.port = Some(9982);
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
ubi1.set_node_event_callback(
move |ev, node| {
if let UbisyncNodeEvent::FamilyJoinRequest { joiner } = ev {
debug!("Received join request, make member join");
node.accept_family_join(joiner).unwrap();
}
},
ubi1.clone(),
);
ubi2.request_family_join(ubi1.get_destination().into())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(5000)).await;
assert_eq!(ubi1.get_family().unwrap(), ubi2.get_family().unwrap());
std::process::exit(0);
}
}

View file

@ -1,10 +1,6 @@
use ubisync_lib::types::PotId; use ubisync_lib::types::{PeerId, PotId};
pub enum UbisyncNodeEvent { pub enum UbisyncNodeEvent {
NewPot { NewPot { id: PotId, app_type: String },
id: PotId, FamilyJoinRequest { joiner: PeerId },
app_type: String,
}
} }

View file

@ -6,7 +6,7 @@ use tracing::debug;
use ubisync_lib::{ use ubisync_lib::{
api::events::AppEvent, api::events::AppEvent,
messages::MessageContent, messages::MessageContent,
types::{AppId, Element, ElementContent, ElementId, ContentUpdateStrategy, Pot, PotId}, types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId},
}; };
use crate::api::v0::app::App; use crate::api::v0::app::App;
@ -57,10 +57,21 @@ impl ApiState {
.map(|app_opt| app_opt.ok_or(Error::msg("Failed to find app")))? .map(|app_opt| app_opt.ok_or(Error::msg("Failed to find app")))?
} }
pub fn create_element(&self, content: ElementContent, pot: PotId, update_strategy: ContentUpdateStrategy) -> anyhow::Result<ElementId> { pub fn create_element(
&self,
content: ElementContent,
pot: PotId,
update_strategy: ContentUpdateStrategy,
) -> anyhow::Result<ElementId> {
let id = ElementId::new(); let id = ElementId::new();
self.db() self.db().add_element(
.add_element(id.clone(), content.clone(), update_strategy, None, false, pot.clone())?; id.clone(),
content.clone(),
update_strategy,
None,
false,
pot.clone(),
)?;
debug!("Added element {{{}}}", id.to_string()); debug!("Added element {{{}}}", id.to_string());
self.state.send_to_peers( self.state.send_to_peers(

View file

@ -5,7 +5,9 @@ use tracing::debug;
use ubisync_lib::{ use ubisync_lib::{
api::events::AppEvent, api::events::AppEvent,
peer::Peer, peer::Peer,
types::{Element, ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId}, types::{
ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, MessageId, PeerId, PotId
},
}; };
use crate::node_events::UbisyncNodeEvent; use crate::node_events::UbisyncNodeEvent;
@ -98,6 +100,31 @@ impl CommState {
Ok(()) Ok(())
} }
pub fn request_family_join(&self, peer: PeerId) {
self.state
.emit_node_event(UbisyncNodeEvent::FamilyJoinRequest { joiner: peer });
}
pub fn remove_peer_from_family(&self, peer: PeerId) {
self.db().remove_peer_from_family(peer);
}
pub fn has_family_join_request(&self, peer: PeerId) -> bool {
self.db().has_family_join_request(peer).unwrap_or(false)
}
pub fn set_own_family(&self, family: Family) -> anyhow::Result<()> {
self.db().add_peer_family(family)
}
pub fn get_family_of_peer(&self, peer: PeerId) -> anyhow::Result<Option<FamilyId>> {
self.db().get_family_of_peer(peer)
}
pub fn own_peer_id(&self) -> Option<PeerId> {
self.state.own_peer_id()
}
fn db(&self) -> &StateDB { fn db(&self) -> &StateDB {
&self.state.db &self.state.db
} }
@ -110,7 +137,7 @@ mod tests {
use super::CommState; use super::CommState;
use tracing::Level; use tracing::Level;
use ubisync_lib::types::{ElementContent, ElementId, ContentUpdateStrategy, MessageId, PotId}; use ubisync_lib::types::{ContentUpdateStrategy, ElementContent, ElementId, MessageId, PotId};
#[tokio::test] #[tokio::test]
#[serial_test::serial] #[serial_test::serial]

View file

@ -86,23 +86,22 @@ impl From<DbPeerFamily> for Family {
} }
impl StateDB { impl StateDB {
pub fn add_peer_family( pub fn add_peer_family(&self, family: Family) -> anyhow::Result<()> {
&self, if self.get_peer_family(family.id.clone())?.is_some() {
id: FamilyId, Err(Error::msg("Peer family already exists"))
name: Option<String>, } else {
initial_members: Vec<PeerId>,
) -> anyhow::Result<()> {
DbPeerFamily::push( DbPeerFamily::push(
DbPeerFamily { DbPeerFamily {
id: AsKey::new(id), id: AsKey::new(family.id),
name, name: family.name,
members: HashSet::from_iter(initial_members), members: HashSet::from_iter(family.members),
}, },
&self.db, &self.db,
) )
.map(|_| ()) .map(|_| ())
.map_err(|e| anyhow!(e)) .map_err(|e| anyhow!(e))
} }
}
pub fn add_peer_to_family(&self, peer: PeerId, family: FamilyId) -> anyhow::Result<()> { pub fn add_peer_to_family(&self, peer: PeerId, family: FamilyId) -> anyhow::Result<()> {
DbPeerFamily::get(&AsKey::new(family), &self.db) DbPeerFamily::get(&AsKey::new(family), &self.db)
@ -166,6 +165,24 @@ impl StateDB {
})? })?
.map_err(|e| anyhow!(e)) .map_err(|e| anyhow!(e))
} }
pub fn remove_peer_from_family(&self, peer: PeerId) -> anyhow::Result<()> {
self.db
.view::<DbPeerFamilyIdByMemberId>()
.with_key(&AsKey::new(peer.clone()))
.query_with_collection_docs()
.map(|results| {
if let Some(family) = results.into_iter().next() {
family.document.to_owned().modify(&self.db, |doc| {
doc.contents.members.remove(&peer.clone());
})
} else {
Err(bonsaidb::core::Error::other("", "Could not find family"))
}
})
.map(|_| ())
.map_err(|e| anyhow!(e))
}
} }
#[cfg(test)] #[cfg(test)]
@ -180,11 +197,11 @@ mod tests {
let family_id = FamilyId::new(); let family_id = FamilyId::new();
let peer_id = PeerId::default(); let peer_id = PeerId::default();
db.add_peer_family( db.add_peer_family(Family {
family_id.clone(), id: family_id.clone(),
Some("My family name".to_string()), name: Some("My family name".to_string()),
vec![peer_id.clone()], members: vec![peer_id.clone()],
) })
.unwrap(); .unwrap();
let retrieved_family = db.get_peer_family(family_id.clone()).unwrap(); let retrieved_family = db.get_peer_family(family_id.clone()).unwrap();
@ -198,17 +215,42 @@ mod tests {
) )
} }
#[test]
fn add_remove() {
let db = StateDB::init(None);
let family_id = FamilyId::new();
let peer_id = PeerId::default();
db.add_peer_family(Family {
id: family_id.clone(),
name: Some("My family name".to_string()),
members: vec![peer_id.clone()],
})
.unwrap();
db.remove_peer_from_family(peer_id.clone()).unwrap();
let retrieved_family = db.get_peer_family(family_id.clone()).unwrap();
assert_eq!(
retrieved_family,
Some(Family::new(
family_id,
Some("My family name".to_string()),
vec![]
))
)
}
#[test] #[test]
fn set_name() { fn set_name() {
let db = StateDB::init(None); let db = StateDB::init(None);
let family_id = FamilyId::new(); let family_id = FamilyId::new();
let peer_id = PeerId::default(); let peer_id = PeerId::default();
db.add_peer_family( db.add_peer_family(Family {
family_id.clone(), id: family_id.clone(),
Some("My family name".to_string()), name: Some("My family name".to_string()),
vec![peer_id.clone()], members: vec![peer_id.clone()],
) })
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@ -239,11 +281,11 @@ mod tests {
let family_id = FamilyId::new(); let family_id = FamilyId::new();
let peer_id = PeerId::default(); let peer_id = PeerId::default();
db.add_peer_family( db.add_peer_family(Family {
family_id.clone(), id: family_id.clone(),
Some("My family name".to_string()), name: Some("My family name".to_string()),
vec![peer_id.clone()], members: vec![peer_id.clone()],
) })
.unwrap(); .unwrap();
assert_eq!(db.get_family_of_peer(peer_id).unwrap(), Some(family_id)) assert_eq!(db.get_family_of_peer(peer_id).unwrap(), Some(family_id))

View file

@ -2,7 +2,10 @@ use anyhow::{anyhow, Error};
use bonsaidb::core::schema::{Collection, SerializedCollection}; use bonsaidb::core::schema::{Collection, SerializedCollection};
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use ubisync_lib::{peer::Peer, types::{ElementId, PeerId}}; use ubisync_lib::{
peer::Peer,
types::{ElementId, PeerId},
};
use crate::state::database::{as_key::AsKey, StateDB}; use crate::state::database::{as_key::AsKey, StateDB};
@ -24,6 +27,9 @@ impl From<DbPeer> for Peer {
impl StateDB { impl StateDB {
pub fn add_peer(&self, peer: Peer) -> anyhow::Result<()> { pub fn add_peer(&self, peer: Peer) -> anyhow::Result<()> {
if self.get_peer(peer.id())?.is_some() {
Err(Error::msg("Peer already exists"))
} else {
DbPeer::push( DbPeer::push(
DbPeer { DbPeer {
id: AsKey::new(peer.id()), id: AsKey::new(peer.id()),
@ -34,6 +40,7 @@ impl StateDB {
.map(|_| ()) .map(|_| ())
.map_err(|e| anyhow!(e)) .map_err(|e| anyhow!(e))
} }
}
pub fn get_peer(&self, id: PeerId) -> anyhow::Result<Option<Peer>> { pub fn get_peer(&self, id: PeerId) -> anyhow::Result<Option<Peer>> {
DbPeer::get(&AsKey::new(id), &self.db) DbPeer::get(&AsKey::new(id), &self.db)

View file

@ -103,6 +103,9 @@ impl StateDB {
Err(Error::msg( Err(Error::msg(
"A member app which does not exist was meant to be added to a pot", "A member app which does not exist was meant to be added to a pot",
)) ))
} else {
if self.get_pot_membership(pot.clone(), app.clone())?.is_some() {
Err(Error::msg("Pot membership already exists"))
} else { } else {
DbPotMembership::push( DbPotMembership::push(
DbPotMembership { DbPotMembership {
@ -115,8 +118,9 @@ impl StateDB {
.map_err(|e| anyhow!(e)) .map_err(|e| anyhow!(e))
} }
} }
}
pub fn get(&self, pot: PotId, app: AppId) -> anyhow::Result<Option<()>> { pub fn get_pot_membership(&self, pot: PotId, app: AppId) -> anyhow::Result<Option<()>> {
self.db self.db
.view::<DbPotMembershipsByBothIds>() .view::<DbPotMembershipsByBothIds>()
.with_key(&(AsKey::new(pot), AsKey::new(app))) .with_key(&(AsKey::new(pot), AsKey::new(app)))

View file

@ -1,4 +1,4 @@
use anyhow::anyhow; use anyhow::{Error, anyhow};
use bonsaidb::core::schema::{Collection, SerializedCollection}; use bonsaidb::core::schema::{Collection, SerializedCollection};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use ubisync_lib::types::{Pot, PotId}; use ubisync_lib::types::{Pot, PotId};
@ -15,12 +15,18 @@ pub(super) struct DbPot {
impl From<DbPot> for Pot { impl From<DbPot> for Pot {
fn from(value: DbPot) -> Self { fn from(value: DbPot) -> Self {
Pot {id: (*value.id).clone(), app_type: value.app_type} Pot {
id: (*value.id).clone(),
app_type: value.app_type,
}
} }
} }
impl StateDB { impl StateDB {
pub fn add_pot(&self, id: PotId, app_type: String) -> anyhow::Result<()> { pub fn add_pot(&self, id: PotId, app_type: String) -> anyhow::Result<()> {
if self.get_pot(id.clone())?.is_some() {
Err(Error::msg("Pot already exists"))
} else {
DbPot::push( DbPot::push(
DbPot { DbPot {
id: AsKey::new(id), id: AsKey::new(id),
@ -31,6 +37,7 @@ impl StateDB {
.map(|_| ()) .map(|_| ())
.map_err(|e| anyhow!(e)) .map_err(|e| anyhow!(e))
} }
}
pub fn get_pot(&self, id: PotId) -> anyhow::Result<Option<Pot>> { pub fn get_pot(&self, id: PotId) -> anyhow::Result<Option<Pot>> {
DbPot::get(&AsKey::new(id), &self.db) DbPot::get(&AsKey::new(id), &self.db)
@ -45,7 +52,6 @@ mod tests {
use crate::state::database::StateDB; use crate::state::database::StateDB;
#[test] #[test]
fn add_get() { fn add_get() {
let db = StateDB::init(None); let db = StateDB::init(None);
@ -53,6 +59,12 @@ mod tests {
db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap(); db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap();
let retrieved_pot = db.get_pot(pot_id.clone()).unwrap(); let retrieved_pot = db.get_pot(pot_id.clone()).unwrap();
assert_eq!(retrieved_pot, Some(Pot {id: pot_id, app_type: "app_type".to_string()})) assert_eq!(
retrieved_pot,
Some(Pot {
id: pot_id,
app_type: "app_type".to_string()
})
)
} }
} }

View file

@ -1,7 +1,13 @@
use bonsaidb::local::{ use anyhow::anyhow;
use bonsaidb::{
core::keyvalue::KeyValue,
local::{
config::{Builder, StorageConfiguration}, config::{Builder, StorageConfiguration},
Database as BonsaiDb, Database as BonsaiDb,
},
}; };
use tracing::debug;
use ubisync_lib::types::PeerId;
use uuid::Uuid; use uuid::Uuid;
use self::collections::UbisyncSchema; use self::collections::UbisyncSchema;
@ -24,4 +30,44 @@ impl StateDB {
db: BonsaiDb::open::<UbisyncSchema>(storage_conf).unwrap(), db: BonsaiDb::open::<UbisyncSchema>(storage_conf).unwrap(),
} }
} }
pub fn add_family_join_request(&self, peer: PeerId) {
self.db.set_key(peer.to_string(), &"").execute();
debug!(
"Added join request: {:?}",
self.db.get_key(peer.to_string()).query()
)
}
pub fn has_family_join_request(&self, peer: PeerId) -> anyhow::Result<bool> {
Ok(self
.db
.get_key(peer.to_string())
.query()
.map_err(|e| anyhow!(e))?
.is_some())
}
pub fn remove_family_join_request(&self, peer: PeerId) -> anyhow::Result<()> {
self.db
.delete_key(peer.to_string())
.map(|_| ())
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::types::PeerId;
use super::StateDB;
#[test]
fn family_join_requests() {
let db = StateDB::init(None);
let peer = PeerId::default();
db.add_family_join_request(peer.clone());
assert!(db.has_family_join_request(peer).unwrap());
}
} }

View file

@ -10,10 +10,10 @@ use ubisync_lib::{
api::events::AppEvent, api::events::AppEvent,
messages::{Message, MessageContent}, messages::{Message, MessageContent},
peer::Peer, peer::Peer,
types::{AppId, Element, ElementContent, ElementId, FamilyId, PeerId, PotId, Tag}, types::{AppId, Element, ElementContent, ElementId, Family, FamilyId, PeerId, PotId, Tag},
}; };
use anyhow::Error; use anyhow::{anyhow, Error};
use tracing::{debug, warn}; use tracing::{debug, warn};
mod api_state; mod api_state;
@ -68,11 +68,12 @@ impl State {
pub fn own_peer_id(&self) -> Option<PeerId> { pub fn own_peer_id(&self) -> Option<PeerId> {
if let Ok(guard) = self.comm_handle.read() { if let Ok(guard) = self.comm_handle.read() {
guard.as_ref().map(|t| t.own_peer_id().ok()).flatten() if let Some(ch) = guard.as_ref() {
} else { return ch.own_peer_id().ok();
None
} }
} }
None
}
pub fn get_apps(&self) -> anyhow::Result<Vec<App>> { pub fn get_apps(&self) -> anyhow::Result<Vec<App>> {
self.db.get_all_apps() self.db.get_all_apps()
@ -149,24 +150,51 @@ impl State {
pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> { pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> {
let my_id = self let my_id = self
.comm_handle .own_peer_id()
.read() .ok_or(Error::msg("Could not get own PeerId"))?;
.map_err(|_| Error::msg("Failed to lock on CommHandle"))?
.to_owned()
.ok_or(Error::msg("CommHandle not initialized"))?
.own_peer_id()?;
if self.db.get_family_of_peer(my_id.clone())?.is_none() { let my_family = match self.db.get_family_of_peer(my_id.clone())? {
Some(id) => {
self.db.add_peer_to_family(peer.clone(), id.clone())?;
self.db self.db
.add_peer_family(FamilyId::new(), None, vec![my_id.clone()])?; .get_peer_family(id)?
.ok_or(Error::msg("Family not found"))?
}
None => {
debug!(
"This node does not have a family yet, creating one with this node and the added peer"
);
let family = Family {
id: FamilyId::new(),
name: None,
members: vec![my_id.clone(), peer.clone()],
};
self.db.add_peer_family(family.clone())?;
family
}
};
self.send_to_peers(
MessageContent::AddedToFamily { family: my_family },
vec![peer],
);
Ok(())
} }
self.db.add_peer_to_family( pub fn add_family_join_request(&self, peer: PeerId) {
peer, self.db.add_family_join_request(peer)
}
pub fn get_family_of(&self, peer: PeerId) -> anyhow::Result<Family> {
match self.db.get_peer_family(
self.db self.db
.get_family_of_peer(my_id)? .get_family_of_peer(peer)?
.ok_or(Error::msg("Could not find own family"))?, .ok_or(Error::msg("Family of peer not found"))?,
) ) {
Ok(Some(family)) => Ok(family),
Ok(None) => Err(Error::msg("Family not found by its FamilyId")),
Err(e) => Err(e),
}
} }
pub fn get_event_receiver( pub fn get_event_receiver(
@ -234,23 +262,25 @@ impl State {
} }
} }
fn send_to_peers(&self, ct: MessageContent, peers: Vec<PeerId>) { pub fn send_to_peers(&self, ct: MessageContent, peers: Vec<PeerId>) {
match self.comm_handle.read() { match self.comm_handle.read() {
Ok(opt) => { Ok(opt) => {
if opt.is_some() { if opt.is_some() {
let arc = opt.as_ref().unwrap().clone(); let arc = opt.as_ref().unwrap().clone();
tokio::task::spawn_blocking(|| {
tokio::spawn(async move { tokio::spawn(async move {
for peer in peers { for peer in peers {
if let Err(e) = let _ = arc
arc.send(peer.addr_ref(), Message::new(ct.clone())).await .send(peer.addr_ref(), Message::new(ct.clone()))
{ .await
.map_err(|e| {
debug!( debug!(
"Sending to peer '{:?}' returned an error: {}", "Sending to peer '{:?}' returned an error:\n{e}",
peer.b32_addr_nocache(), peer
e
) )
});
} }
} })
}); });
} }
} }