Compare commits

..

No commits in common. "95050d5c61cfa5d552a1b56544cc8eb05f65405f" and "43006e9a5345ac6a832a01f44d254c7bf25e625c" have entirely different histories.

19 changed files with 22 additions and 479 deletions

View file

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use crate::types::{ElementContent, ElementId, Family, MessageId, PotId, Share, ShareId};
use crate::types::{ElementContent, ElementId, Family, MessageId, PotId};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message {
@ -23,7 +23,6 @@ pub enum MessageContent {
JoinFamily,
AddedToFamily {
family: Family,
metastate_share: ShareId,
},
LeaveFamily,
CreateElement {
@ -42,12 +41,6 @@ pub enum MessageContent {
id: PotId,
app_type: String,
},
Get {
shares: Vec<ShareId>,
},
UpdateShare {
share: Share
},
}
#[derive(Serialize, Deserialize, Debug, Clone)]

View file

@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use super::{ElementContent, ElementId, MessageId, PotId};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Element {
// Uuid identifying the element itself
pub id: ElementId,
@ -43,11 +43,6 @@ impl Element {
}
}
impl PartialEq for Element {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]

View file

@ -1,14 +1,7 @@
use serde::{Deserialize, Serialize};
use super::{Family, Pot, Share};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum ElementContent {
MetaState {
family: Family,
pots: Vec<Pot>,
shares: Vec<Share>,
},
Text(String),
}

View file

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default, Hash)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct ElementId(Uuid);
impl ElementId {
pub fn new() -> Self {

View file

@ -8,7 +8,7 @@ mod element_id;
pub use element_id::ElementId;
mod element;
pub use element::{ContentUpdateStrategy, Element};
pub use element::{Element, ContentUpdateStrategy};
mod family_id;
pub use family_id::FamilyId;
@ -31,8 +31,5 @@ pub use pot_id::PotId;
mod pot;
pub use pot::Pot;
mod share;
pub use share::{Share, ShareContent, ShareId, SharePermissions};
mod tag;
pub use tag::Tag;

View file

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct PotId(Uuid);
impl PotId {
pub fn new() -> Self {

View file

@ -1,52 +0,0 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{FamilyId, PotId};
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct ShareId(Uuid);
impl ShareId {
pub fn new() -> Self {
ShareId(Uuid::new_v4())
}
}
impl ToString for ShareId {
fn to_string(&self) -> String {
self.0.to_string()
}
}
impl From<&ShareId> for String {
fn from(value: &ShareId) -> Self {
value.0.to_string()
}
}
impl TryFrom<&str> for ShareId {
type Error = serde_json::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
serde_json::from_str(value)
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ShareContent {
pub pots: Vec<PotId>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum SharePermissions {
Read,
ReadWrite,
Owner
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Share {
pub id: ShareId,
pub content: ShareContent,
pub members: HashMap<FamilyId, SharePermissions>,
}

View file

@ -1,4 +1,3 @@
use itertools::Itertools;
use tracing::debug;
use ubisync_lib::types::Peer;
@ -20,29 +19,15 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
match message.content() {
MessageContent::Hello { peer_name, family } => {
state
.set_peer(Peer::new(
peer.to_owned(),
Some(peer_name.to_string()),
family.to_owned(),
))
.set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string()), family.to_owned()))
.expect("State failed");
}
MessageContent::JoinFamily => state.request_family_join(peer.to_owned()),
MessageContent::AddedToFamily {
family,
metastate_share,
} => {
MessageContent::AddedToFamily { family } => {
if state.has_family_join_request(peer.to_owned()) {
state
.set_own_family(family.to_owned())
.expect("State failed");
state.set_metastate_share(metastate_share.to_owned());
state.send_to_peers(
MessageContent::Get {
shares: vec![metastate_share.to_owned()],
},
vec![peer.to_owned()],
)
}
}
MessageContent::LeaveFamily => state.remove_peer_from_family(peer.to_owned()),
@ -81,26 +66,5 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
.add_pot(id.to_owned(), app_type.to_string())
.expect("State failed");
}
MessageContent::Get { shares } => {
let _ = shares
.iter()
.map(|share| state.elements_of_share(share.to_owned()))
.concat()
.iter()
.map(|element_id| state.get_element(element_id.to_owned()))
.flatten()
.map(|element| {
state.send_to_peers(
MessageContent::UpdateElement {
id: element.id.to_owned(),
content: element.content.to_owned(),
},
vec![peer.to_owned()],
)
});
},
MessageContent::UpdateShare { share } => {
state.set_share(share.to_owned())
}
}
}

View file

@ -94,7 +94,7 @@ impl CommHandle {
}
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
debug!("Sending message...\nFrom '{:?}'\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().addr());
debug!("Sending message...\nFrom '{:?}' (dest: {:?})\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().addr(), self.own_peer_id().unwrap());
match serde_json::to_string(&msg) {
Ok(msg_string) => {
self.send_to_addr(dest, msg_string.as_bytes()).await?;

View file

@ -10,7 +10,7 @@ use state::{ApiState, CommState, State};
use ubisync_lib::{
messages::{Message, MessageContent},
types::{AppId, Family, Peer, PeerId, PotId, ShareId},
types::{AppId, Family, Peer, PeerId, PotId},
};
pub mod api;
@ -86,19 +86,12 @@ impl Ubisync {
pub fn accept_family_join(&self, peer: PeerId) -> anyhow::Result<()> {
self.add_family_member_from_id(peer.clone())?;
let family = self.get_family()?;
let metastate_share = self
.state_handle
.get_metastate_share()
.expect("Node does not know its metastate ShareId");
tokio::spawn({
let ch = self.comm_handle.clone();
async move {
ch.send(
peer.addr_ref(),
Message::new(MessageContent::AddedToFamily {
family,
metastate_share,
}),
Message::new(MessageContent::AddedToFamily { family }),
)
.await
.expect("Could not send family join confirmation to peer");
@ -107,10 +100,6 @@ impl Ubisync {
Ok(())
}
pub fn create_own_family(&self) {
self.state_handle.create_own_family()
}
pub fn get_family(&self) -> anyhow::Result<Family> {
self.state_handle
.get_family_of(self.comm_handle.own_peer_id()?)
@ -157,8 +146,6 @@ mod tests {
c2.api_config.port = Some(9982);
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
ubi1.create_own_family();
ubi2.create_own_family();
ubi1.set_node_event_callback(
move |ev, node| {
@ -179,43 +166,4 @@ mod tests {
assert_eq!(ubi1.get_family().unwrap(), ubi2.get_family().unwrap());
std::process::exit(0);
}
#[tokio::test(flavor = "multi_thread")]
async fn sync_metastate() {
/*
create & join family
*/
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
// Two nodes need to bind to different ports
let mut c2 = Config::default();
c2.api_config.port = Some(9982);
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
ubi1.set_node_event_callback(
move |ev, node| {
if let UbisyncNodeEvent::FamilyJoinRequest { joiner } = ev {
debug!("Received join request, make member join");
node.accept_family_join(joiner).unwrap();
}
},
ubi1.clone(),
);
ubi2.request_family_join(ubi1.get_destination().into())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(5000)).await;
/*
create & sync metastate
*/
}
}

View file

@ -6,7 +6,7 @@ use tracing::debug;
use ubisync_lib::{
api::events::AppEvent,
messages::MessageContent,
types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId, ShareId},
types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId},
};
use crate::api::v0::app::App;

View file

@ -2,18 +2,14 @@ use std::sync::Arc;
use tracing::debug;
use itertools::Itertools;
use ubisync_lib::{
api::events::AppEvent,
messages::MessageContent,
types::{
ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, MessageId,
Peer, PeerId, Pot, PotId, Share, ShareId,
Peer, PeerId, PotId,
},
};
use anyhow::Error;
use crate::node_events::UbisyncNodeEvent;
use super::{database::StateDB, State};
@ -134,48 +130,6 @@ impl CommState {
self.state.own_peer_id()
}
pub fn elements_of_share(&self, id: ShareId) -> Vec<ElementId> {
self.state
.db
.shares()
.get(id)
.iter()
.flatten()
.map(|share| share.content.pots.to_owned())
.concat()
.iter()
.unique()
.map(|pot| self.state.db.elements().ids_by_pot_id(pot.to_owned()))
.flatten()
.concat()
.iter()
.unique()
.map(|eid| eid.to_owned())
.collect_vec()
}
pub fn set_metastate_share(&self, share: ShareId) {
self.db().set_metastate_share(share)
}
pub fn update_meta_state(&self, family: Family, pots: &Vec<Pot>, shares: &Vec<Share>) {
let _ = self.set_own_family(family);
for pot in pots {
let _ = self.add_pot(pot.id.to_owned(), pot.app_type.to_owned());
}
for share in shares {
let _ = self.db().shares().add(share.to_owned());
}
}
pub fn set_share(&self, share: Share) {
let _ = self.db().shares().set(share);
}
pub fn send_to_peers(&self, ct: MessageContent, peers: Vec<PeerId>) {
self.state.send_to_peers(ct, peers)
}
fn db(&self) -> &StateDB {
&self.state.db
}

View file

@ -127,9 +127,7 @@ impl<'a> Apps<'a> {
#[cfg(test)]
mod tests {
use ubisync_lib::types::{
AppId, ContentUpdateStrategy, ElementContent, ElementId, Pot, PotId,
};
use ubisync_lib::types::{AppId, ContentUpdateStrategy, ElementContent, ElementId, Pot, PotId};
use crate::{api::v0::app::App, state::database::StateDB};

View file

@ -1,13 +1,8 @@
use anyhow::{anyhow, Error};
use bonsaidb::{
core::{
connection::Connection,
document::Emit,
schema::{Collection, MapReduce, SerializedCollection, View, ViewSchema},
},
core::schema::{Collection, SerializedCollection},
local::Database,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ubisync_lib::types::{
ContentUpdateStrategy, Element, ElementContent, ElementId, MessageId, PotId,
@ -40,36 +35,6 @@ impl From<DbElement> for Element {
}
}
#[derive(Debug, Clone, View, ViewSchema)]
#[view(collection = DbElement, key = AsKey<PotId>, value = Vec<ElementId>, name = "by-pot-id")]
pub(super) struct ElementIdByPotId;
impl MapReduce for ElementIdByPotId {
fn map<'doc>(
&self,
document: &'doc bonsaidb::core::document::BorrowedDocument<'_>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let entry = DbElement::document_contents(document)?;
document
.header
.emit_key_and_value(AsKey::new(entry.pot), vec![(*entry.id).clone()])
}
fn reduce(
&self,
mappings: &[bonsaidb::core::schema::MappedValue<
Self::MappedKey<'_>,
<Self::View as View>::Value,
>],
_rereduce: bool,
) -> Result<<Self::View as View>::Value, bonsaidb::core::Error> {
Ok(mappings
.iter()
.map(|mapping| mapping.value.clone())
.concat())
}
}
pub(crate) struct Elements<'a> {
parent: &'a StateDB,
db: &'a Database,
@ -109,14 +74,6 @@ impl<'a> Elements<'a> {
.map(|el| el.contents.into())
}
pub fn ids_by_pot_id(&self, pot: PotId) -> anyhow::Result<Vec<ElementId>> {
self.db
.view::<ElementIdByPotId>()
.with_key(&AsKey::new(pot))
.reduce()
.map_err(|e| anyhow!(e))
}
pub fn set_content(&self, id: ElementId, content: ElementContent) -> anyhow::Result<()> {
DbElement::get(&AsKey::new(id), self.db)
.map_err(|e| anyhow!(e))?

View file

@ -6,7 +6,6 @@ use peer_families::DbPeerFamily;
use peers::DbPeer;
use pot_memberships::DbPotMembership;
use pots::DbPot;
use shares::DbShare;
pub(crate) use apps::Apps;
pub(crate) use elements::Elements;
@ -14,7 +13,6 @@ pub(crate) use peer_families::Families;
pub(crate) use peers::Peers;
pub(crate) use pot_memberships::PotMemberships;
pub(crate) use pots::Pots;
pub(crate) use shares::Shares;
mod apps;
mod elements;
@ -22,10 +20,9 @@ mod peer_families;
mod peers;
mod pot_memberships;
mod pots;
mod shares;
#[derive(Schema, Debug)]
#[schema(name = "ubisync", collections = [DbElement, DbPotMembership, DbApp, DbPot, DbPeer, DbPeerFamily, DbShare])]
#[schema(name = "ubisync", collections = [DbElement, DbPotMembership, DbApp, DbPot, DbPeer, DbPeerFamily])]
pub struct UbisyncSchema;
#[cfg(test)]

View file

@ -3,9 +3,7 @@ use bonsaidb::{
core::schema::{Collection, SerializedCollection},
local::Database,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tracing::debug;
use ubisync_lib::types::{Pot, PotId};
use crate::state::database::{as_key::AsKey, StateDB};
@ -57,21 +55,11 @@ impl<'a> Pots<'a> {
.map(|pot_opt| pot_opt.map(|pot| pot.contents.into()))
.map_err(|e| anyhow!(e))
}
pub fn get_all(&self) -> anyhow::Result<Vec<Pot>> {
Ok(DbPot::all(self.db)
.query()?
.iter()
.map(|doc| doc.contents.clone().into())
.collect_vec())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use ubisync_lib::types::{Pot, PotId, Share, ShareContent, ShareId};
use ubisync_lib::types::{Pot, PotId};
use crate::state::database::StateDB;

View file

@ -1,108 +0,0 @@
use std::collections::HashMap;
use anyhow::anyhow;
use bonsaidb::{
core::schema::{Collection, SerializedCollection},
local::Database,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ubisync_lib::types::{FamilyId, Share, ShareContent, ShareId, SharePermissions};
use crate::state::database::{as_key::AsKey, StateDB};
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "shares", views = [])]
pub(super) struct DbShare {
#[natural_id]
pub(super) id: AsKey<ShareId>,
pub(super) content: ShareContent,
pub(super) members: HashMap<FamilyId, SharePermissions>,
}
impl From<DbShare> for Share {
fn from(value: DbShare) -> Self {
Share {
id: (*value.id).clone(),
content: value.content,
members: value.members,
}
}
}
impl From<Share> for DbShare {
fn from(value: Share) -> Self {
DbShare {
id: AsKey::new(value.id),
content: value.content,
members: value.members,
}
}
}
pub(crate) struct Shares<'a> {
parent: &'a StateDB,
db: &'a Database,
}
impl<'a> Shares<'a> {
pub const fn new(parent: &'a StateDB, bonsai: &'a Database) -> Self {
Self { parent, db: bonsai }
}
pub fn add(&self, share: Share) -> anyhow::Result<()> {
DbShare::push(share.into(), self.db)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn get(&self, id: ShareId) -> anyhow::Result<Option<Share>> {
DbShare::get(&AsKey::new(id), self.db)
.map(|share_opt| share_opt.map(|share| share.contents.into()))
.map_err(|e| anyhow!(e))
}
pub fn get_all(&self) -> anyhow::Result<Vec<Share>> {
Ok(DbShare::all(self.db)
.query()?
.iter()
.map(|doc| doc.contents.clone().into())
.collect_vec())
}
pub fn set(&self, share: Share) -> anyhow::Result<()> {
match DbShare::get(&AsKey::new(share.id.clone()), self.db).map_err(|e| anyhow!(e))? {
Some(mut doc) => doc
.modify(self.db, |d| {
d.contents = DbShare {
id: AsKey::new(share.id.clone()),
content: share.content.clone(),
members: share.members.clone(),
}
})
.map_err(|e| anyhow!(e)),
None => self.add(share),
}
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::types::{Share, ShareContent, ShareId};
use crate::state::database::StateDB;
#[test]
fn add_get() {
let db = StateDB::init(None);
let share = Share {
id: ShareId::new(),
content: ShareContent { pots: vec![] },
members: Default::default(),
};
db.shares().add(share.clone()).unwrap();
let retrieved_share = db.shares().get(share.id.clone()).unwrap();
assert_eq!(retrieved_share, Some(share))
}
}

View file

@ -2,22 +2,19 @@ use anyhow::anyhow;
use bonsaidb::{
core::keyvalue::KeyValue,
local::{
config::{Builder, StorageConfiguration},
Database,
config::{Builder, StorageConfiguration},Database
},
};
use serde::Deserialize;
use tracing::debug;
use ubisync_lib::types::{PeerId, ShareId};
use ubisync_lib::types::PeerId;
use uuid::Uuid;
use self::collections::{
Apps, Elements, Families, Peers, PotMemberships, Pots, Shares, UbisyncSchema,
};
use self::collections::{Apps, Elements, Families, Peers, PotMemberships, Pots, UbisyncSchema};
mod as_key;
mod collections;
pub struct StateDB {
db: Database,
}
@ -57,20 +54,6 @@ impl StateDB {
.map_err(|e| anyhow!(e))
}
pub fn set_metastate_share(&self, share: ShareId) {
let _ = self.db.set_key("metastate_share", &share).execute();
}
pub fn get_metastate_share(&self) -> Option<ShareId> {
self.db
.get_key("metastate_share")
.query()
.ok()
.flatten()
.map(|val| val.deserialize().ok())
.flatten()
}
pub const fn apps(&self) -> Apps {
Apps::new(&self, &self.db)
}
@ -89,9 +72,6 @@ impl StateDB {
pub const fn pots(&self) -> Pots {
Pots::new(&self, &self.db)
}
pub const fn shares(&self) -> Shares {
Shares::new(&self, &self.db)
}
}
#[cfg(test)]

View file

@ -9,10 +9,7 @@ use std::{
use ubisync_lib::{
api::events::AppEvent,
messages::{Message, MessageContent},
types::{
AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, Peer,
PeerId, Pot, PotId, Share, ShareContent, ShareId, SharePermissions, Tag,
},
types::{AppId, Element, ElementContent, ElementId, Family, FamilyId, Peer, PeerId, PotId, Tag},
};
use anyhow::Error;
@ -153,54 +150,6 @@ impl State {
})
}
pub fn create_own_family(&self) {
let family_id = FamilyId::new();
let metastate_potid = PotId::new();
let metastate_shareid = ShareId::new();
let metastate_elementid = ElementId::new();
let mut family_members = HashSet::new();
family_members.insert(self.own_peer_id().expect("Node does not know own peer id"));
let mut metastate_members = HashMap::new();
metastate_members.insert(family_id.clone(), SharePermissions::Owner);
let _ = self.db.families().add(Family {
id: family_id.clone(),
name: None,
members: family_members.clone(),
});
let _ = self.db.shares().add(Share {
id: metastate_shareid,
content: ShareContent {
pots: vec![metastate_potid.clone()],
},
members: metastate_members,
});
let _ = self
.db
.pots()
.add(metastate_potid.clone(), "".to_string());
debug!("{:?}", self.db.pots().get_all().unwrap());
let _ = self.db.elements().add(
metastate_elementid,
ElementContent::MetaState {
family: Family {
id: family_id,
name: None,
members: family_members,
},
pots: self.db.pots().get_all().unwrap(),
shares: self.db.shares().get_all().unwrap(),
},
ContentUpdateStrategy::Overwrite,
None,
true,
metastate_potid,
);
}
pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> {
let my_id = self
.own_peer_id()
@ -229,13 +178,7 @@ impl State {
};
self.send_to_peers(
MessageContent::AddedToFamily {
family: my_family,
metastate_share: self
.db
.get_metastate_share()
.expect("Node is in a family, but does not know metastate ShareId"),
},
MessageContent::AddedToFamily { family: my_family },
vec![peer],
);
Ok(())
@ -258,10 +201,6 @@ impl State {
}
}
pub fn get_metastate_share(&self) -> Option<ShareId> {
self.db.get_metastate_share()
}
pub fn get_event_receiver(
&self,
app: &AppId,