Compare commits
2 commits
43006e9a53
...
95050d5c61
Author | SHA1 | Date | |
---|---|---|---|
|
95050d5c61 | ||
|
3589be0a3b |
19 changed files with 479 additions and 22 deletions
|
@ -1,6 +1,6 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::types::{ElementContent, ElementId, Family, MessageId, PotId};
|
use crate::types::{ElementContent, ElementId, Family, MessageId, PotId, Share, ShareId};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
|
@ -23,6 +23,7 @@ pub enum MessageContent {
|
||||||
JoinFamily,
|
JoinFamily,
|
||||||
AddedToFamily {
|
AddedToFamily {
|
||||||
family: Family,
|
family: Family,
|
||||||
|
metastate_share: ShareId,
|
||||||
},
|
},
|
||||||
LeaveFamily,
|
LeaveFamily,
|
||||||
CreateElement {
|
CreateElement {
|
||||||
|
@ -41,6 +42,12 @@ pub enum MessageContent {
|
||||||
id: PotId,
|
id: PotId,
|
||||||
app_type: String,
|
app_type: String,
|
||||||
},
|
},
|
||||||
|
Get {
|
||||||
|
shares: Vec<ShareId>,
|
||||||
|
},
|
||||||
|
UpdateShare {
|
||||||
|
share: Share
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
|
|
@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use super::{ElementContent, ElementId, MessageId, PotId};
|
use super::{ElementContent, ElementId, MessageId, PotId};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct Element {
|
pub struct Element {
|
||||||
// Uuid identifying the element itself
|
// Uuid identifying the element itself
|
||||||
pub id: ElementId,
|
pub id: ElementId,
|
||||||
|
@ -43,6 +43,11 @@ impl Element {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PartialEq for Element {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.id == other.id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
|
|
|
@ -1,7 +1,14 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use super::{Family, Pot, Share};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
pub enum ElementContent {
|
pub enum ElementContent {
|
||||||
|
MetaState {
|
||||||
|
family: Family,
|
||||||
|
pots: Vec<Pot>,
|
||||||
|
shares: Vec<Share>,
|
||||||
|
},
|
||||||
Text(String),
|
Text(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default, Hash)]
|
||||||
pub struct ElementId(Uuid);
|
pub struct ElementId(Uuid);
|
||||||
impl ElementId {
|
impl ElementId {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
|
|
|
@ -8,7 +8,7 @@ mod element_id;
|
||||||
pub use element_id::ElementId;
|
pub use element_id::ElementId;
|
||||||
|
|
||||||
mod element;
|
mod element;
|
||||||
pub use element::{Element, ContentUpdateStrategy};
|
pub use element::{ContentUpdateStrategy, Element};
|
||||||
|
|
||||||
mod family_id;
|
mod family_id;
|
||||||
pub use family_id::FamilyId;
|
pub use family_id::FamilyId;
|
||||||
|
@ -31,5 +31,8 @@ pub use pot_id::PotId;
|
||||||
mod pot;
|
mod pot;
|
||||||
pub use pot::Pot;
|
pub use pot::Pot;
|
||||||
|
|
||||||
|
mod share;
|
||||||
|
pub use share::{Share, ShareContent, ShareId, SharePermissions};
|
||||||
|
|
||||||
mod tag;
|
mod tag;
|
||||||
pub use tag::Tag;
|
pub use tag::Tag;
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
pub struct PotId(Uuid);
|
pub struct PotId(Uuid);
|
||||||
impl PotId {
|
impl PotId {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
|
|
52
ubisync-lib/src/types/share.rs
Normal file
52
ubisync-lib/src/types/share.rs
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use super::{FamilyId, PotId};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct ShareId(Uuid);
|
||||||
|
impl ShareId {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
ShareId(Uuid::new_v4())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToString for ShareId {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
self.0.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&ShareId> for String {
|
||||||
|
fn from(value: &ShareId) -> Self {
|
||||||
|
value.0.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&str> for ShareId {
|
||||||
|
type Error = serde_json::Error;
|
||||||
|
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||||
|
serde_json::from_str(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
|
pub struct ShareContent {
|
||||||
|
pub pots: Vec<PotId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
|
pub enum SharePermissions {
|
||||||
|
Read,
|
||||||
|
ReadWrite,
|
||||||
|
Owner
|
||||||
|
}
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
|
pub struct Share {
|
||||||
|
pub id: ShareId,
|
||||||
|
pub content: ShareContent,
|
||||||
|
pub members: HashMap<FamilyId, SharePermissions>,
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
|
use itertools::Itertools;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use ubisync_lib::types::Peer;
|
use ubisync_lib::types::Peer;
|
||||||
|
@ -19,15 +20,29 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
|
||||||
match message.content() {
|
match message.content() {
|
||||||
MessageContent::Hello { peer_name, family } => {
|
MessageContent::Hello { peer_name, family } => {
|
||||||
state
|
state
|
||||||
.set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string()), family.to_owned()))
|
.set_peer(Peer::new(
|
||||||
|
peer.to_owned(),
|
||||||
|
Some(peer_name.to_string()),
|
||||||
|
family.to_owned(),
|
||||||
|
))
|
||||||
.expect("State failed");
|
.expect("State failed");
|
||||||
}
|
}
|
||||||
MessageContent::JoinFamily => state.request_family_join(peer.to_owned()),
|
MessageContent::JoinFamily => state.request_family_join(peer.to_owned()),
|
||||||
MessageContent::AddedToFamily { family } => {
|
MessageContent::AddedToFamily {
|
||||||
|
family,
|
||||||
|
metastate_share,
|
||||||
|
} => {
|
||||||
if state.has_family_join_request(peer.to_owned()) {
|
if state.has_family_join_request(peer.to_owned()) {
|
||||||
state
|
state
|
||||||
.set_own_family(family.to_owned())
|
.set_own_family(family.to_owned())
|
||||||
.expect("State failed");
|
.expect("State failed");
|
||||||
|
state.set_metastate_share(metastate_share.to_owned());
|
||||||
|
state.send_to_peers(
|
||||||
|
MessageContent::Get {
|
||||||
|
shares: vec![metastate_share.to_owned()],
|
||||||
|
},
|
||||||
|
vec![peer.to_owned()],
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MessageContent::LeaveFamily => state.remove_peer_from_family(peer.to_owned()),
|
MessageContent::LeaveFamily => state.remove_peer_from_family(peer.to_owned()),
|
||||||
|
@ -66,5 +81,26 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
|
||||||
.add_pot(id.to_owned(), app_type.to_string())
|
.add_pot(id.to_owned(), app_type.to_string())
|
||||||
.expect("State failed");
|
.expect("State failed");
|
||||||
}
|
}
|
||||||
|
MessageContent::Get { shares } => {
|
||||||
|
let _ = shares
|
||||||
|
.iter()
|
||||||
|
.map(|share| state.elements_of_share(share.to_owned()))
|
||||||
|
.concat()
|
||||||
|
.iter()
|
||||||
|
.map(|element_id| state.get_element(element_id.to_owned()))
|
||||||
|
.flatten()
|
||||||
|
.map(|element| {
|
||||||
|
state.send_to_peers(
|
||||||
|
MessageContent::UpdateElement {
|
||||||
|
id: element.id.to_owned(),
|
||||||
|
content: element.content.to_owned(),
|
||||||
|
},
|
||||||
|
vec![peer.to_owned()],
|
||||||
|
)
|
||||||
|
});
|
||||||
|
},
|
||||||
|
MessageContent::UpdateShare { share } => {
|
||||||
|
state.set_share(share.to_owned())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ impl CommHandle {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
|
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
|
||||||
debug!("Sending message...\nFrom '{:?}' (dest: {:?})\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().addr(), self.own_peer_id().unwrap());
|
debug!("Sending message...\nFrom '{:?}'\nTo '{dest:?}'\n Message: '{msg:?}", self.own_peer_id().unwrap().addr());
|
||||||
match serde_json::to_string(&msg) {
|
match serde_json::to_string(&msg) {
|
||||||
Ok(msg_string) => {
|
Ok(msg_string) => {
|
||||||
self.send_to_addr(dest, msg_string.as_bytes()).await?;
|
self.send_to_addr(dest, msg_string.as_bytes()).await?;
|
||||||
|
|
|
@ -10,7 +10,7 @@ use state::{ApiState, CommState, State};
|
||||||
|
|
||||||
use ubisync_lib::{
|
use ubisync_lib::{
|
||||||
messages::{Message, MessageContent},
|
messages::{Message, MessageContent},
|
||||||
types::{AppId, Family, Peer, PeerId, PotId},
|
types::{AppId, Family, Peer, PeerId, PotId, ShareId},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod api;
|
pub mod api;
|
||||||
|
@ -86,12 +86,19 @@ impl Ubisync {
|
||||||
pub fn accept_family_join(&self, peer: PeerId) -> anyhow::Result<()> {
|
pub fn accept_family_join(&self, peer: PeerId) -> anyhow::Result<()> {
|
||||||
self.add_family_member_from_id(peer.clone())?;
|
self.add_family_member_from_id(peer.clone())?;
|
||||||
let family = self.get_family()?;
|
let family = self.get_family()?;
|
||||||
|
let metastate_share = self
|
||||||
|
.state_handle
|
||||||
|
.get_metastate_share()
|
||||||
|
.expect("Node does not know its metastate ShareId");
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
let ch = self.comm_handle.clone();
|
let ch = self.comm_handle.clone();
|
||||||
async move {
|
async move {
|
||||||
ch.send(
|
ch.send(
|
||||||
peer.addr_ref(),
|
peer.addr_ref(),
|
||||||
Message::new(MessageContent::AddedToFamily { family }),
|
Message::new(MessageContent::AddedToFamily {
|
||||||
|
family,
|
||||||
|
metastate_share,
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("Could not send family join confirmation to peer");
|
.expect("Could not send family join confirmation to peer");
|
||||||
|
@ -100,6 +107,10 @@ impl Ubisync {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn create_own_family(&self) {
|
||||||
|
self.state_handle.create_own_family()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_family(&self) -> anyhow::Result<Family> {
|
pub fn get_family(&self) -> anyhow::Result<Family> {
|
||||||
self.state_handle
|
self.state_handle
|
||||||
.get_family_of(self.comm_handle.own_peer_id()?)
|
.get_family_of(self.comm_handle.own_peer_id()?)
|
||||||
|
@ -146,6 +157,8 @@ mod tests {
|
||||||
c2.api_config.port = Some(9982);
|
c2.api_config.port = Some(9982);
|
||||||
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
|
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
|
||||||
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
|
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
|
||||||
|
ubi1.create_own_family();
|
||||||
|
ubi2.create_own_family();
|
||||||
|
|
||||||
ubi1.set_node_event_callback(
|
ubi1.set_node_event_callback(
|
||||||
move |ev, node| {
|
move |ev, node| {
|
||||||
|
@ -166,4 +179,43 @@ mod tests {
|
||||||
assert_eq!(ubi1.get_family().unwrap(), ubi2.get_family().unwrap());
|
assert_eq!(ubi1.get_family().unwrap(), ubi2.get_family().unwrap());
|
||||||
std::process::exit(0);
|
std::process::exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn sync_metastate() {
|
||||||
|
/*
|
||||||
|
create & join family
|
||||||
|
*/
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.pretty()
|
||||||
|
.with_max_level(Level::DEBUG)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
// Two nodes need to bind to different ports
|
||||||
|
let mut c2 = Config::default();
|
||||||
|
c2.api_config.port = Some(9982);
|
||||||
|
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
|
||||||
|
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
|
||||||
|
|
||||||
|
ubi1.set_node_event_callback(
|
||||||
|
move |ev, node| {
|
||||||
|
if let UbisyncNodeEvent::FamilyJoinRequest { joiner } = ev {
|
||||||
|
debug!("Received join request, make member join");
|
||||||
|
node.accept_family_join(joiner).unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ubi1.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
ubi2.request_family_join(ubi1.get_destination().into())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_millis(5000)).await;
|
||||||
|
|
||||||
|
/*
|
||||||
|
create & sync metastate
|
||||||
|
*/
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ use tracing::debug;
|
||||||
use ubisync_lib::{
|
use ubisync_lib::{
|
||||||
api::events::AppEvent,
|
api::events::AppEvent,
|
||||||
messages::MessageContent,
|
messages::MessageContent,
|
||||||
types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId},
|
types::{AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Pot, PotId, ShareId},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::api::v0::app::App;
|
use crate::api::v0::app::App;
|
||||||
|
|
|
@ -2,14 +2,18 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
|
use itertools::Itertools;
|
||||||
use ubisync_lib::{
|
use ubisync_lib::{
|
||||||
api::events::AppEvent,
|
api::events::AppEvent,
|
||||||
|
messages::MessageContent,
|
||||||
types::{
|
types::{
|
||||||
ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, MessageId,
|
ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, MessageId,
|
||||||
Peer, PeerId, PotId,
|
Peer, PeerId, Pot, PotId, Share, ShareId,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
|
|
||||||
use crate::node_events::UbisyncNodeEvent;
|
use crate::node_events::UbisyncNodeEvent;
|
||||||
|
|
||||||
use super::{database::StateDB, State};
|
use super::{database::StateDB, State};
|
||||||
|
@ -130,6 +134,48 @@ impl CommState {
|
||||||
self.state.own_peer_id()
|
self.state.own_peer_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn elements_of_share(&self, id: ShareId) -> Vec<ElementId> {
|
||||||
|
self.state
|
||||||
|
.db
|
||||||
|
.shares()
|
||||||
|
.get(id)
|
||||||
|
.iter()
|
||||||
|
.flatten()
|
||||||
|
.map(|share| share.content.pots.to_owned())
|
||||||
|
.concat()
|
||||||
|
.iter()
|
||||||
|
.unique()
|
||||||
|
.map(|pot| self.state.db.elements().ids_by_pot_id(pot.to_owned()))
|
||||||
|
.flatten()
|
||||||
|
.concat()
|
||||||
|
.iter()
|
||||||
|
.unique()
|
||||||
|
.map(|eid| eid.to_owned())
|
||||||
|
.collect_vec()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_metastate_share(&self, share: ShareId) {
|
||||||
|
self.db().set_metastate_share(share)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_meta_state(&self, family: Family, pots: &Vec<Pot>, shares: &Vec<Share>) {
|
||||||
|
let _ = self.set_own_family(family);
|
||||||
|
for pot in pots {
|
||||||
|
let _ = self.add_pot(pot.id.to_owned(), pot.app_type.to_owned());
|
||||||
|
}
|
||||||
|
for share in shares {
|
||||||
|
let _ = self.db().shares().add(share.to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_share(&self, share: Share) {
|
||||||
|
let _ = self.db().shares().set(share);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_to_peers(&self, ct: MessageContent, peers: Vec<PeerId>) {
|
||||||
|
self.state.send_to_peers(ct, peers)
|
||||||
|
}
|
||||||
|
|
||||||
fn db(&self) -> &StateDB {
|
fn db(&self) -> &StateDB {
|
||||||
&self.state.db
|
&self.state.db
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,9 @@ impl<'a> Apps<'a> {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use ubisync_lib::types::{AppId, ContentUpdateStrategy, ElementContent, ElementId, Pot, PotId};
|
use ubisync_lib::types::{
|
||||||
|
AppId, ContentUpdateStrategy, ElementContent, ElementId, Pot, PotId,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{api::v0::app::App, state::database::StateDB};
|
use crate::{api::v0::app::App, state::database::StateDB};
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
use anyhow::{anyhow, Error};
|
use anyhow::{anyhow, Error};
|
||||||
use bonsaidb::{
|
use bonsaidb::{
|
||||||
core::schema::{Collection, SerializedCollection},
|
core::{
|
||||||
|
connection::Connection,
|
||||||
|
document::Emit,
|
||||||
|
schema::{Collection, MapReduce, SerializedCollection, View, ViewSchema},
|
||||||
|
},
|
||||||
local::Database,
|
local::Database,
|
||||||
};
|
};
|
||||||
|
use itertools::Itertools;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use ubisync_lib::types::{
|
use ubisync_lib::types::{
|
||||||
ContentUpdateStrategy, Element, ElementContent, ElementId, MessageId, PotId,
|
ContentUpdateStrategy, Element, ElementContent, ElementId, MessageId, PotId,
|
||||||
|
@ -35,6 +40,36 @@ impl From<DbElement> for Element {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, View, ViewSchema)]
|
||||||
|
#[view(collection = DbElement, key = AsKey<PotId>, value = Vec<ElementId>, name = "by-pot-id")]
|
||||||
|
pub(super) struct ElementIdByPotId;
|
||||||
|
|
||||||
|
impl MapReduce for ElementIdByPotId {
|
||||||
|
fn map<'doc>(
|
||||||
|
&self,
|
||||||
|
document: &'doc bonsaidb::core::document::BorrowedDocument<'_>,
|
||||||
|
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
|
||||||
|
let entry = DbElement::document_contents(document)?;
|
||||||
|
document
|
||||||
|
.header
|
||||||
|
.emit_key_and_value(AsKey::new(entry.pot), vec![(*entry.id).clone()])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reduce(
|
||||||
|
&self,
|
||||||
|
mappings: &[bonsaidb::core::schema::MappedValue<
|
||||||
|
Self::MappedKey<'_>,
|
||||||
|
<Self::View as View>::Value,
|
||||||
|
>],
|
||||||
|
_rereduce: bool,
|
||||||
|
) -> Result<<Self::View as View>::Value, bonsaidb::core::Error> {
|
||||||
|
Ok(mappings
|
||||||
|
.iter()
|
||||||
|
.map(|mapping| mapping.value.clone())
|
||||||
|
.concat())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct Elements<'a> {
|
pub(crate) struct Elements<'a> {
|
||||||
parent: &'a StateDB,
|
parent: &'a StateDB,
|
||||||
db: &'a Database,
|
db: &'a Database,
|
||||||
|
@ -74,6 +109,14 @@ impl<'a> Elements<'a> {
|
||||||
.map(|el| el.contents.into())
|
.map(|el| el.contents.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn ids_by_pot_id(&self, pot: PotId) -> anyhow::Result<Vec<ElementId>> {
|
||||||
|
self.db
|
||||||
|
.view::<ElementIdByPotId>()
|
||||||
|
.with_key(&AsKey::new(pot))
|
||||||
|
.reduce()
|
||||||
|
.map_err(|e| anyhow!(e))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_content(&self, id: ElementId, content: ElementContent) -> anyhow::Result<()> {
|
pub fn set_content(&self, id: ElementId, content: ElementContent) -> anyhow::Result<()> {
|
||||||
DbElement::get(&AsKey::new(id), self.db)
|
DbElement::get(&AsKey::new(id), self.db)
|
||||||
.map_err(|e| anyhow!(e))?
|
.map_err(|e| anyhow!(e))?
|
||||||
|
|
|
@ -6,6 +6,7 @@ use peer_families::DbPeerFamily;
|
||||||
use peers::DbPeer;
|
use peers::DbPeer;
|
||||||
use pot_memberships::DbPotMembership;
|
use pot_memberships::DbPotMembership;
|
||||||
use pots::DbPot;
|
use pots::DbPot;
|
||||||
|
use shares::DbShare;
|
||||||
|
|
||||||
pub(crate) use apps::Apps;
|
pub(crate) use apps::Apps;
|
||||||
pub(crate) use elements::Elements;
|
pub(crate) use elements::Elements;
|
||||||
|
@ -13,6 +14,7 @@ pub(crate) use peer_families::Families;
|
||||||
pub(crate) use peers::Peers;
|
pub(crate) use peers::Peers;
|
||||||
pub(crate) use pot_memberships::PotMemberships;
|
pub(crate) use pot_memberships::PotMemberships;
|
||||||
pub(crate) use pots::Pots;
|
pub(crate) use pots::Pots;
|
||||||
|
pub(crate) use shares::Shares;
|
||||||
|
|
||||||
mod apps;
|
mod apps;
|
||||||
mod elements;
|
mod elements;
|
||||||
|
@ -20,9 +22,10 @@ mod peer_families;
|
||||||
mod peers;
|
mod peers;
|
||||||
mod pot_memberships;
|
mod pot_memberships;
|
||||||
mod pots;
|
mod pots;
|
||||||
|
mod shares;
|
||||||
|
|
||||||
#[derive(Schema, Debug)]
|
#[derive(Schema, Debug)]
|
||||||
#[schema(name = "ubisync", collections = [DbElement, DbPotMembership, DbApp, DbPot, DbPeer, DbPeerFamily])]
|
#[schema(name = "ubisync", collections = [DbElement, DbPotMembership, DbApp, DbPot, DbPeer, DbPeerFamily, DbShare])]
|
||||||
pub struct UbisyncSchema;
|
pub struct UbisyncSchema;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -3,7 +3,9 @@ use bonsaidb::{
|
||||||
core::schema::{Collection, SerializedCollection},
|
core::schema::{Collection, SerializedCollection},
|
||||||
local::Database,
|
local::Database,
|
||||||
};
|
};
|
||||||
|
use itertools::Itertools;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tracing::debug;
|
||||||
use ubisync_lib::types::{Pot, PotId};
|
use ubisync_lib::types::{Pot, PotId};
|
||||||
|
|
||||||
use crate::state::database::{as_key::AsKey, StateDB};
|
use crate::state::database::{as_key::AsKey, StateDB};
|
||||||
|
@ -55,11 +57,21 @@ impl<'a> Pots<'a> {
|
||||||
.map(|pot_opt| pot_opt.map(|pot| pot.contents.into()))
|
.map(|pot_opt| pot_opt.map(|pot| pot.contents.into()))
|
||||||
.map_err(|e| anyhow!(e))
|
.map_err(|e| anyhow!(e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_all(&self) -> anyhow::Result<Vec<Pot>> {
|
||||||
|
Ok(DbPot::all(self.db)
|
||||||
|
.query()?
|
||||||
|
.iter()
|
||||||
|
.map(|doc| doc.contents.clone().into())
|
||||||
|
.collect_vec())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use ubisync_lib::types::{Pot, PotId};
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use ubisync_lib::types::{Pot, PotId, Share, ShareContent, ShareId};
|
||||||
|
|
||||||
use crate::state::database::StateDB;
|
use crate::state::database::StateDB;
|
||||||
|
|
||||||
|
|
108
ubisync/src/state/database/collections/shares.rs
Normal file
108
ubisync/src/state/database/collections/shares.rs
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use anyhow::anyhow;
|
||||||
|
use bonsaidb::{
|
||||||
|
core::schema::{Collection, SerializedCollection},
|
||||||
|
local::Database,
|
||||||
|
};
|
||||||
|
use itertools::Itertools;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use ubisync_lib::types::{FamilyId, Share, ShareContent, ShareId, SharePermissions};
|
||||||
|
|
||||||
|
use crate::state::database::{as_key::AsKey, StateDB};
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
|
||||||
|
#[collection(name = "shares", views = [])]
|
||||||
|
pub(super) struct DbShare {
|
||||||
|
#[natural_id]
|
||||||
|
pub(super) id: AsKey<ShareId>,
|
||||||
|
pub(super) content: ShareContent,
|
||||||
|
pub(super) members: HashMap<FamilyId, SharePermissions>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<DbShare> for Share {
|
||||||
|
fn from(value: DbShare) -> Self {
|
||||||
|
Share {
|
||||||
|
id: (*value.id).clone(),
|
||||||
|
content: value.content,
|
||||||
|
members: value.members,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Share> for DbShare {
|
||||||
|
fn from(value: Share) -> Self {
|
||||||
|
DbShare {
|
||||||
|
id: AsKey::new(value.id),
|
||||||
|
content: value.content,
|
||||||
|
members: value.members,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct Shares<'a> {
|
||||||
|
parent: &'a StateDB,
|
||||||
|
db: &'a Database,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Shares<'a> {
|
||||||
|
pub const fn new(parent: &'a StateDB, bonsai: &'a Database) -> Self {
|
||||||
|
Self { parent, db: bonsai }
|
||||||
|
}
|
||||||
|
pub fn add(&self, share: Share) -> anyhow::Result<()> {
|
||||||
|
DbShare::push(share.into(), self.db)
|
||||||
|
.map(|_| ())
|
||||||
|
.map_err(|e| anyhow!(e))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, id: ShareId) -> anyhow::Result<Option<Share>> {
|
||||||
|
DbShare::get(&AsKey::new(id), self.db)
|
||||||
|
.map(|share_opt| share_opt.map(|share| share.contents.into()))
|
||||||
|
.map_err(|e| anyhow!(e))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_all(&self) -> anyhow::Result<Vec<Share>> {
|
||||||
|
Ok(DbShare::all(self.db)
|
||||||
|
.query()?
|
||||||
|
.iter()
|
||||||
|
.map(|doc| doc.contents.clone().into())
|
||||||
|
.collect_vec())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set(&self, share: Share) -> anyhow::Result<()> {
|
||||||
|
match DbShare::get(&AsKey::new(share.id.clone()), self.db).map_err(|e| anyhow!(e))? {
|
||||||
|
Some(mut doc) => doc
|
||||||
|
.modify(self.db, |d| {
|
||||||
|
d.contents = DbShare {
|
||||||
|
id: AsKey::new(share.id.clone()),
|
||||||
|
content: share.content.clone(),
|
||||||
|
members: share.members.clone(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map_err(|e| anyhow!(e)),
|
||||||
|
None => self.add(share),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
|
||||||
|
use ubisync_lib::types::{Share, ShareContent, ShareId};
|
||||||
|
|
||||||
|
use crate::state::database::StateDB;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn add_get() {
|
||||||
|
let db = StateDB::init(None);
|
||||||
|
let share = Share {
|
||||||
|
id: ShareId::new(),
|
||||||
|
content: ShareContent { pots: vec![] },
|
||||||
|
members: Default::default(),
|
||||||
|
};
|
||||||
|
db.shares().add(share.clone()).unwrap();
|
||||||
|
|
||||||
|
let retrieved_share = db.shares().get(share.id.clone()).unwrap();
|
||||||
|
assert_eq!(retrieved_share, Some(share))
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,19 +2,22 @@ use anyhow::anyhow;
|
||||||
use bonsaidb::{
|
use bonsaidb::{
|
||||||
core::keyvalue::KeyValue,
|
core::keyvalue::KeyValue,
|
||||||
local::{
|
local::{
|
||||||
config::{Builder, StorageConfiguration},Database
|
config::{Builder, StorageConfiguration},
|
||||||
|
Database,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use ubisync_lib::types::PeerId;
|
use ubisync_lib::types::{PeerId, ShareId};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use self::collections::{Apps, Elements, Families, Peers, PotMemberships, Pots, UbisyncSchema};
|
use self::collections::{
|
||||||
|
Apps, Elements, Families, Peers, PotMemberships, Pots, Shares, UbisyncSchema,
|
||||||
|
};
|
||||||
|
|
||||||
mod as_key;
|
mod as_key;
|
||||||
mod collections;
|
mod collections;
|
||||||
|
|
||||||
|
|
||||||
pub struct StateDB {
|
pub struct StateDB {
|
||||||
db: Database,
|
db: Database,
|
||||||
}
|
}
|
||||||
|
@ -54,6 +57,20 @@ impl StateDB {
|
||||||
.map_err(|e| anyhow!(e))
|
.map_err(|e| anyhow!(e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_metastate_share(&self, share: ShareId) {
|
||||||
|
let _ = self.db.set_key("metastate_share", &share).execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_metastate_share(&self) -> Option<ShareId> {
|
||||||
|
self.db
|
||||||
|
.get_key("metastate_share")
|
||||||
|
.query()
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.map(|val| val.deserialize().ok())
|
||||||
|
.flatten()
|
||||||
|
}
|
||||||
|
|
||||||
pub const fn apps(&self) -> Apps {
|
pub const fn apps(&self) -> Apps {
|
||||||
Apps::new(&self, &self.db)
|
Apps::new(&self, &self.db)
|
||||||
}
|
}
|
||||||
|
@ -72,6 +89,9 @@ impl StateDB {
|
||||||
pub const fn pots(&self) -> Pots {
|
pub const fn pots(&self) -> Pots {
|
||||||
Pots::new(&self, &self.db)
|
Pots::new(&self, &self.db)
|
||||||
}
|
}
|
||||||
|
pub const fn shares(&self) -> Shares {
|
||||||
|
Shares::new(&self, &self.db)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -9,7 +9,10 @@ use std::{
|
||||||
use ubisync_lib::{
|
use ubisync_lib::{
|
||||||
api::events::AppEvent,
|
api::events::AppEvent,
|
||||||
messages::{Message, MessageContent},
|
messages::{Message, MessageContent},
|
||||||
types::{AppId, Element, ElementContent, ElementId, Family, FamilyId, Peer, PeerId, PotId, Tag},
|
types::{
|
||||||
|
AppId, ContentUpdateStrategy, Element, ElementContent, ElementId, Family, FamilyId, Peer,
|
||||||
|
PeerId, Pot, PotId, Share, ShareContent, ShareId, SharePermissions, Tag,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
|
@ -150,6 +153,54 @@ impl State {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn create_own_family(&self) {
|
||||||
|
let family_id = FamilyId::new();
|
||||||
|
let metastate_potid = PotId::new();
|
||||||
|
let metastate_shareid = ShareId::new();
|
||||||
|
let metastate_elementid = ElementId::new();
|
||||||
|
|
||||||
|
let mut family_members = HashSet::new();
|
||||||
|
family_members.insert(self.own_peer_id().expect("Node does not know own peer id"));
|
||||||
|
|
||||||
|
let mut metastate_members = HashMap::new();
|
||||||
|
metastate_members.insert(family_id.clone(), SharePermissions::Owner);
|
||||||
|
|
||||||
|
let _ = self.db.families().add(Family {
|
||||||
|
id: family_id.clone(),
|
||||||
|
name: None,
|
||||||
|
members: family_members.clone(),
|
||||||
|
});
|
||||||
|
let _ = self.db.shares().add(Share {
|
||||||
|
id: metastate_shareid,
|
||||||
|
content: ShareContent {
|
||||||
|
pots: vec![metastate_potid.clone()],
|
||||||
|
},
|
||||||
|
members: metastate_members,
|
||||||
|
});
|
||||||
|
let _ = self
|
||||||
|
.db
|
||||||
|
.pots()
|
||||||
|
.add(metastate_potid.clone(), "".to_string());
|
||||||
|
|
||||||
|
debug!("{:?}", self.db.pots().get_all().unwrap());
|
||||||
|
let _ = self.db.elements().add(
|
||||||
|
metastate_elementid,
|
||||||
|
ElementContent::MetaState {
|
||||||
|
family: Family {
|
||||||
|
id: family_id,
|
||||||
|
name: None,
|
||||||
|
members: family_members,
|
||||||
|
},
|
||||||
|
pots: self.db.pots().get_all().unwrap(),
|
||||||
|
shares: self.db.shares().get_all().unwrap(),
|
||||||
|
},
|
||||||
|
ContentUpdateStrategy::Overwrite,
|
||||||
|
None,
|
||||||
|
true,
|
||||||
|
metastate_potid,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> {
|
pub fn add_family_member(&self, peer: PeerId) -> anyhow::Result<()> {
|
||||||
let my_id = self
|
let my_id = self
|
||||||
.own_peer_id()
|
.own_peer_id()
|
||||||
|
@ -178,7 +229,13 @@ impl State {
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send_to_peers(
|
self.send_to_peers(
|
||||||
MessageContent::AddedToFamily { family: my_family },
|
MessageContent::AddedToFamily {
|
||||||
|
family: my_family,
|
||||||
|
metastate_share: self
|
||||||
|
.db
|
||||||
|
.get_metastate_share()
|
||||||
|
.expect("Node is in a family, but does not know metastate ShareId"),
|
||||||
|
},
|
||||||
vec![peer],
|
vec![peer],
|
||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -201,6 +258,10 @@ impl State {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_metastate_share(&self) -> Option<ShareId> {
|
||||||
|
self.db.get_metastate_share()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_event_receiver(
|
pub fn get_event_receiver(
|
||||||
&self,
|
&self,
|
||||||
app: &AppId,
|
app: &AppId,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue