Migrated from cozodb to BonsaiDB.

Relevant to #3
This commit is contained in:
Philip (a-0) 2024-02-10 19:59:21 +01:00
parent 4bf897278a
commit ce7519225e
33 changed files with 2709 additions and 1084 deletions

1424
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -5,15 +5,15 @@ use serde::{Deserialize, Serialize};
use crate::types::PeerId;
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Peer {
id: PeerId,
name: String,
name: Option<String>,
family: Vec<PeerId>,
}
impl Peer {
pub fn new(id: PeerId, name: String) -> Self {
pub fn new(id: PeerId, name: Option<String>) -> Self {
Peer {
id: id,
name: name,
@ -29,7 +29,7 @@ impl Peer {
self.id.clone()
}
pub fn name(&self) -> String {
pub fn name(&self) -> Option<String> {
self.name.clone()
}
}

View file

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Serialize, Deserialize, Clone, Debug, Default, Ord, PartialOrd, PartialEq, Eq, Hash)]
pub struct AppId(Uuid);
impl AppId {

View file

@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use super::{ElementContent, ElementId, MessageId, PotId};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Element {
// Uuid identifying the element itself
id: ElementId,
@ -46,4 +46,10 @@ impl Element {
pub fn pot(&self) -> &Option<PotId> {
&self.pot
}
pub fn latest_message(&self) -> &Option<MessageId> {
&self.latest_message
}
pub fn local_changes(&self) -> bool {
self.local_changes
}
}

View file

@ -1,12 +1,16 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct ElementId(Uuid);
impl ElementId {
pub fn new() -> Self {
ElementId(Uuid::new_v4())
}
pub fn bytes(&self) -> &[u8; 16] {
self.0.as_bytes()
}
}
impl ToString for ElementId {
@ -27,3 +31,15 @@ impl TryFrom<&str> for ElementId {
serde_json::from_str(value)
}
}
impl From<Uuid> for ElementId {
fn from(value: Uuid) -> Self {
ElementId {0: value}
}
}
impl From<[u8; 16]> for ElementId {
fn from(value: [u8; 16]) -> Self {
ElementId {0: Uuid::from_bytes(value)}
}
}

View file

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct MessageId(Uuid);
impl MessageId {

View file

@ -1,8 +1,8 @@
use anyhow::bail;
use i2p::net::{I2pSocketAddr, ToI2pSocketAddrs};
use i2p::net::{I2pAddr, I2pSocketAddr, ToI2pSocketAddrs};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct PeerId {
i2p_addr: I2pSocketAddr,
}
@ -54,3 +54,11 @@ impl From<PeerId> for I2pSocketAddr {
value.i2p_addr
}
}
impl Default for PeerId {
fn default() -> Self {
PeerId {
i2p_addr: I2pSocketAddr::new(I2pAddr::new(""), 0),
}
}
}

View file

@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use super::PotId;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Pot {
pub id: PotId,
pub app_type: String,

View file

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct PotId(Uuid);
impl PotId {
pub fn new() -> Self {

View file

@ -8,8 +8,8 @@ edition = "2021"
[dependencies]
anyhow = "1.0.71"
axum = { version = "0.7.2", features = [ "macros" ] }
bonsaidb = { version = "0.5.0", features = [ "local-full" ] }
itertools = "0.12.0"
cozo = { version = "0.7.5", features = [ "storage-rocksdb", "requests", "graph-algo" ] }
jsonwebtoken = "9.2.0"
serde = { version = "1.0.166", features = [ "derive" ] }
serde_json = "1.0.99"

View file

@ -24,7 +24,7 @@ use ubisync_lib::{
use crate::state::ApiState;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct App {
pub id: AppId,
pub app_type: String,
@ -52,7 +52,7 @@ pub(super) async fn auth(
s.jwt_decoding_key(),
s.jwt_validation(),
) {
if let Ok(true) = s.app_exists(&token.claims.sub) {
if let Ok(true) = s.app_exists(token.claims.sub.clone()) {
debug!("Authentication for {:?} succeeded.", &token.claims.sub);
request.extensions_mut().insert(token.claims.sub);
return next.run(request).await;
@ -72,7 +72,7 @@ pub(super) async fn register(
// Maybe ask for consent by user
// If user wants registration, proceed
let result = s.add_app(&body.name, &body.description, &body.app_type);
let result = s.add_app(body.name, body.description, body.app_type);
match result {
Ok(id) => {
@ -111,7 +111,7 @@ pub(super) async fn set_default_pot(
app_id: Extension<AppId>,
Json(body): Json<AppSetDefaultPotRequest>,
) -> Response {
match s.set_app_default_pot(&app_id.0, &body.pot_id) {
match s.set_app_default_pot(app_id.0, body.pot_id) {
Ok(_) => (
StatusCode::OK,
Json {
@ -131,14 +131,18 @@ pub(super) async fn get_default_pot(
app_id: Extension<AppId>,
Json(_): Json<AppGetDefaultPotRequest>,
) -> Response {
match s.get_default_pot(&app_id.0) {
Ok(p) => (
match s.get_default_pot(app_id.0) {
Ok(Some(p)) => (
StatusCode::OK,
Json {
0: AppGetDefaultPotResponse { pot: p },
},
)
.into_response(),
Ok(_) => {
warn!("Pot not found");
StatusCode::NOT_FOUND.into_response()
}
Err(e) => {
warn!("No default pot found: {}", e);
StatusCode::NOT_FOUND.into_response()
@ -151,7 +155,7 @@ pub(super) async fn create_pot(
app_id: Extension<AppId>,
Json(body): Json<AppCreatePotRequest>,
) -> Response {
let app = match s.get_app(&app_id.0) {
let app = match s.get_app(app_id.0.clone()) {
Ok(a) => a,
Err(e) => {
debug!("Failed to fetch app: {}", e);
@ -162,11 +166,11 @@ pub(super) async fn create_pot(
Some(t) => t,
None => app.app_type,
};
match s.create_pot(&app_id.0, &inferred_app_type) {
match s.create_pot(app_id.0.clone(), inferred_app_type) {
Ok(id) => {
// If this is the first pot for this app, set it as default
if app.default_pot.is_none() {
match s.set_app_default_pot(&app_id.0, &id) {
match s.set_app_default_pot(app_id.0, id.clone()) {
Ok(_) => (
StatusCode::OK,
Json {

View file

@ -23,7 +23,7 @@ pub(super) async fn get(
app: Extension<AppId>,
s: Extension<Arc<ApiState>>,
) -> Response {
let element = s.get_element(&id, &app);
let element = s.get_element(id, app.0);
match element {
Ok(el) => (
StatusCode::OK,
@ -46,8 +46,12 @@ pub(super) async fn create(
) -> Response {
let pot_id = match req.pot {
Some(p) => p,
None => match s.get_default_pot(&app.0) {
Ok(p) => p.id,
None => match s.get_default_pot(app.0) {
Ok(Some(p)) => p.id,
Ok(_) => {
warn!("Pot not found");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
},
Err(e) => {
warn!("Element create request did not provide pot id, and no default pot for requesting app was found: {}", e);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
@ -55,7 +59,7 @@ pub(super) async fn create(
},
};
let element_id = s.create_element(&req.content, &pot_id);
let element_id = s.create_element(req.content, pot_id);
debug!("{:?}", element_id);
match element_id {
Ok(id) => (
@ -75,7 +79,7 @@ pub(super) async fn set(
s: Extension<Arc<ApiState>>,
Json(req): Json<ElementSetRequest>,
) -> Response {
let res = s.write_element_content(&id, &app, &req.content);
let res = s.write_element_content(id, app.0, req.content);
match res {
Ok(_) => (
StatusCode::OK,
@ -89,7 +93,7 @@ pub(super) async fn set(
}
pub(super) async fn remove(Path(id): Path<ElementId>, s: Extension<Arc<ApiState>>) -> Response {
let res = s.remove_element(&id);
let res = s.remove_element(id);
match res {
Ok(_) => (
StatusCode::OK,

View file

@ -1,5 +1,6 @@
use tracing::debug;
use ubisync_lib::peer::Peer;
use ubisync_lib::types::PeerId;
use ubisync_lib::messages::{Message, MessageContent};
@ -10,23 +11,32 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
debug!("Handling message now: {:?}", message);
match message.content() {
MessageContent::Hello { peer_name } => {
state.set_peer(peer, peer_name).expect("State failed");
state
.set_peer(Peer::new(peer.to_owned(), Some(peer_name.to_string())))
.expect("State failed");
}
MessageContent::CreateElement { id, content, pot } => {
state
.add_received_element(id, content, message.id(), pot)
.add_received_element(
id.to_owned(),
content.to_owned(),
Some(message.id().to_owned()),
pot.to_owned(),
)
.expect("State failed");
}
MessageContent::SetElement { id, content } => {
state
.update_element_content(id, content, message.id())
.update_element_content(id.to_owned(), content.to_owned(), message.id().to_owned())
.expect("State failed");
}
MessageContent::RemoveElement { id } => {
state.remove_element(id).expect("State failed");
state.remove_element(id.to_owned()).expect("State failed");
}
MessageContent::AddPot { id, app_type } => {
state.add_pot(id, app_type).expect("State failed");
state
.add_pot(id.to_owned(), app_type.to_string())
.expect("State failed");
}
}
}

View file

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Config {
pub i2p_private_key: Option<String>,
pub database_location: String,
pub database_location: Option<String>,
pub api_config: ApiConfig,
pub jwt_secret: String,
}
@ -12,7 +12,7 @@ impl Default for Config {
fn default() -> Self {
Config {
i2p_private_key: None,
database_location: "mem".to_string(),
database_location: None,
api_config: Default::default(),
jwt_secret: "insecuresecret".to_string(),
}

View file

@ -4,9 +4,10 @@ use anyhow::bail;
use api::{v0::app::App, Api, ApiBuilder};
use comm::CommHandle;
use config::Config;
use node_events::UbisyncNodeEvent;
use i2p::net::I2pSocketAddr;
use node_events::UbisyncNodeEvent;
use state::{ApiState, CommState, State};
use ubisync_lib::{
peer::Peer,
types::{AppId, PeerId, PotId},
@ -26,7 +27,7 @@ pub struct Ubisync {
impl Ubisync {
pub async fn new(config: &Config) -> anyhow::Result<Self> {
let state = State::new(&config.database_location).await?;
let state = State::new(config.database_location.clone()).await?;
let comm_handle = Arc::new(CommHandle::new(CommState::new(state.clone()), config)?);
state.set_comm_handle(comm_handle.clone());
@ -44,8 +45,11 @@ impl Ubisync {
})
}
pub fn set_node_event_callback<CallbackFunction>(&self, cb: CallbackFunction, node: Arc<Ubisync>)
where
pub fn set_node_event_callback<CallbackFunction>(
&self,
cb: CallbackFunction,
node: Arc<Ubisync>,
) where
CallbackFunction: Fn(UbisyncNodeEvent, Arc<Ubisync>) + Send + Sync + 'static,
{
self.state_handle.set_node_event_callback(cb, node);
@ -57,14 +61,14 @@ impl Ubisync {
pub fn add_peer(&self, p: impl TryInto<Peer, Error = anyhow::Error>) -> anyhow::Result<()> {
match p.try_into() {
Ok(peer) => self.state_handle.set_peer(&peer),
Ok(peer) => self.state_handle.set_peer(peer),
Err(e) => bail!(e),
}
}
pub fn add_peer_from_id(&self, id: PeerId) -> anyhow::Result<()> {
// TODO: resolve peer's name before setting
self.state_handle.set_peer(&Peer::new(id, "".to_string()))
self.state_handle.set_peer(Peer::new(id, None))
}
pub fn get_apps(&self) -> Vec<App> {
@ -79,7 +83,7 @@ impl Ubisync {
self.comm_handle.i2p_address()
}
pub fn add_pot_member(&self, pot: &PotId, app: &AppId) -> anyhow::Result<()> {
pub fn add_pot_member(&self, pot: PotId, app: AppId) -> anyhow::Result<()> {
self.state_handle.add_pot_member(pot, app)
}
}

View file

@ -1,9 +1,7 @@
use std::{sync::Arc, time::Duration};
use anyhow::Error;
use cozo::DbInstance;
use jsonwebtoken::{DecodingKey, EncodingKey, Validation};
use serde_with::chrono::Utc;
use tracing::debug;
use ubisync_lib::{
api::events::AppEvent,
@ -11,9 +9,9 @@ use ubisync_lib::{
types::{AppId, Element, ElementContent, ElementId, Pot, PotId},
};
use crate::{api::v0::app::App, state::queries};
use crate::api::v0::app::App;
use super::State;
use super::{database::StateDB, State};
pub struct ApiState {
state: Arc<State>,
@ -36,50 +34,54 @@ impl ApiState {
api_state
}
pub fn add_app(&self, name: &str, description: &str, app_type: &str) -> anyhow::Result<AppId> {
pub fn add_app(
&self,
name: String,
description: String,
app_type: String,
) -> anyhow::Result<AppId> {
let id = AppId::new();
let last_access = Utc::now();
queries::apps::add(self.db(), &id, &last_access, name, description, app_type)?;
self.db().add_app(id.clone(), name, description, app_type)?;
debug!("Successfully added app");
Ok(id)
}
pub fn app_exists(&self, id: &AppId) -> anyhow::Result<bool> {
queries::apps::exists(self.db(), id)
pub fn app_exists(&self, id: AppId) -> anyhow::Result<bool> {
Ok(self.db().get_app(id)?.is_some())
}
pub fn get_app(&self, id: &AppId) -> anyhow::Result<App> {
queries::apps::get(self.db(), id)
pub fn get_app(&self, id: AppId) -> anyhow::Result<App> {
self.db()
.get_app(id)
.map(|app_opt| app_opt.ok_or(Error::msg("Failed to find app")))?
}
pub fn create_element(
&self,
content: &ElementContent,
pot: &PotId,
) -> anyhow::Result<ElementId> {
pub fn create_element(&self, content: ElementContent, pot: PotId) -> anyhow::Result<ElementId> {
let id = ElementId::new();
queries::elements::add(self.db(), &id, &content, None, true, pot)?;
debug!("Added element {{{}}}", &id.to_string());
self.db()
.add_element(id.clone(), content.clone(), None, false, pot.clone())?;
debug!("Added element {{{}}}", id.to_string());
self.state.send_to_peers(MessageContent::CreateElement {
id: id.clone(),
content: content.clone(),
pot: pot.clone(),
content: content,
pot: pot,
});
Ok(id)
}
pub fn write_element_content(
&self,
id: &ElementId,
app: &AppId,
content: &ElementContent,
id: ElementId,
app: AppId,
content: ElementContent,
) -> anyhow::Result<()> {
if queries::elements::get_app_access(self.db(), id, app)? {
queries::elements::set_content(self.db(), id, content)?;
queries::elements::set_local_changes(self.db(), id, true)?;
debug!("Wrote element content {{{}}}", &id.to_string());
if self.db().app_has_access(app, id.clone())? {
self.db().set_element_content(id.clone(), content)?;
self.db().set_element_local_changes(id.clone(), true)?;
debug!("Wrote element content {{{}}}", id.to_string());
Ok(())
} else {
@ -89,35 +91,37 @@ impl ApiState {
}
}
pub fn remove_element(&self, id: &ElementId) -> anyhow::Result<()> {
let res = self.state.remove_element(id);
debug!("Removed element {{{}}}", &id.to_string());
res
pub fn remove_element(&self, id: ElementId) -> anyhow::Result<()> {
self.db()
.remove_element(id.clone())
.inspect(|_| debug!("Removed element {{{}}}", id.to_string()))
}
pub fn set_app_default_pot(&self, app_id: &AppId, pot_id: &PotId) -> anyhow::Result<()> {
queries::apps::set_default_pot(self.db(), app_id, pot_id)
pub fn set_app_default_pot(&self, app_id: AppId, pot_id: PotId) -> anyhow::Result<()> {
self.db().set_default_pot(app_id, pot_id)
}
pub fn get_default_pot(&self, app_id: &AppId) -> anyhow::Result<Pot> {
queries::apps::get_default_pot(self.db(), app_id)
pub fn get_default_pot(&self, app_id: AppId) -> anyhow::Result<Option<Pot>> {
self.db().get_default_pot(app_id)
}
pub fn create_pot(&self, app_id: &AppId, app_type: &str) -> anyhow::Result<PotId> {
pub fn create_pot(&self, app_id: AppId, app_type: String) -> anyhow::Result<PotId> {
let pot_id = PotId::new();
queries::apps::create_pot(self.db(), &pot_id, app_id, app_type)?;
self.db().add_pot(pot_id.clone(), app_type.clone())?;
self.db().add_pot_membership(pot_id.clone(), app_id)?;
self.state.send_to_peers(MessageContent::AddPot {
id: pot_id.to_owned(),
app_type: app_type.to_string(),
id: pot_id.clone(),
app_type: app_type,
});
Ok(pot_id)
}
pub fn get_element(&self, id: &ElementId, app: &AppId) -> anyhow::Result<Element> {
if queries::elements::get_app_access(self.db(), id, app)? {
self.state.get_element(id)
pub fn get_element(&self, id: ElementId, app: AppId) -> anyhow::Result<Element> {
if self.db().app_has_access(app, id.clone())? {
self.db()
.get_element(id)
.ok_or(Error::msg("Could not get element"))
} else {
Err(Error::msg(
"Element does not exist or app does not have access to it",
@ -163,12 +167,12 @@ impl ApiState {
} else {
Err(Error::msg("Failed to lock on receiver mutex"))
}
},
}
Err(e) => Err(e),
}
}
fn db(&self) -> &DbInstance {
fn db(&self) -> &StateDB {
&self.state.db
}
}
@ -191,15 +195,26 @@ mod tests {
.init();
let state = ApiState::new(
State::new("mem").await.unwrap(),
State::new(None).await.unwrap(),
"abcdabcdabcdabcdabcdabcdabcdabcd",
);
let app_id = state.add_app("appname", "appdesc", "apptype").unwrap();
let pot_id = state.create_pot(&app_id, "apptype").unwrap();
let id = state
.create_element(&ElementContent::Text("Test-text".to_string()), &pot_id)
let app_id = state
.add_app(
"appname".to_string(),
"appdesc".to_string(),
"apptype".to_string(),
)
.unwrap();
let el = state.get_element(&id, &app_id).unwrap();
let pot_id = state
.create_pot(app_id.clone(), "apptype".to_string())
.unwrap();
let id = state
.create_element(
ElementContent::Text("Test-text".to_string()),
pot_id.clone(),
)
.unwrap();
let el = state.get_element(id, app_id).unwrap();
assert_eq!(
ElementContent::Text("Test-text".to_string()),
el.content().to_owned()
@ -215,22 +230,33 @@ mod tests {
.init();
let state = ApiState::new(
State::new("mem").await.unwrap(),
State::new(None).await.unwrap(),
"abcdabcdabcdabcdabcdabcdabcdabcd",
);
let app_id = state.add_app("appname", "appdesc", "apptype").unwrap();
let pot_id = state.create_pot(&app_id, "apptype").unwrap();
let app_id = state
.add_app(
"appname".to_string(),
"appdesc".to_string(),
"apptype".to_string(),
)
.unwrap();
let pot_id = state
.create_pot(app_id.clone(), "apptype".to_string())
.unwrap();
let id = state
.create_element(&ElementContent::Text("Test-text".to_string()), &pot_id)
.create_element(
ElementContent::Text("Test-text".to_string()),
pot_id.clone(),
)
.unwrap();
state
.write_element_content(
&id,
&app_id,
&ElementContent::Text("Test-text 2".to_string()),
id.clone(),
app_id.clone(),
ElementContent::Text("Test-text 2".to_string()),
)
.unwrap();
let el = state.get_element(&id, &app_id).unwrap();
let el = state.get_element(id, app_id).unwrap();
assert_eq!(
ElementContent::Text("Test-text 2".to_string()),
el.content().to_owned()

View file

@ -1,17 +1,16 @@
use std::sync::Arc;
use cozo::DbInstance;
use tracing::debug;
use ubisync_lib::{
api::events::AppEvent,
peer::Peer,
types::{Element, ElementContent, ElementId, MessageId, PeerId, PotId},
types::{Element, ElementContent, ElementId, MessageId, PotId},
};
use crate::{node_events::UbisyncNodeEvent, state::queries};
use crate::node_events::UbisyncNodeEvent;
use super::State;
use super::{database::StateDB, State};
pub struct CommState {
state: Arc<State>,
@ -24,85 +23,74 @@ impl CommState {
pub fn add_received_element(
&self,
id: &ElementId,
content: &ElementContent,
latest_message: &MessageId,
pot_id: &PotId,
id: ElementId,
content: ElementContent,
latest_message: Option<MessageId>,
pot_id: PotId,
) -> anyhow::Result<()> {
queries::elements::add(
self.db(),
&id,
&content,
Some(latest_message.to_owned()),
false,
pot_id,
)?;
debug!("Added element {{{}}}", &id.to_string());
Ok(())
self.db()
.add_element(id.clone(), content, latest_message, false, pot_id)
.inspect(|_| debug!("Added element {{{}}}", id.to_string()))
}
pub fn update_element_content(
&self,
id: &ElementId,
content: &ElementContent,
latest_message: &MessageId,
id: ElementId,
content: ElementContent,
latest_message: MessageId,
) -> anyhow::Result<()> {
//TODO: resolve potential conflicts with local changes
queries::elements::set_content(self.db(), id, content)?;
queries::elements::set_latest_message(self.db(), id, Some(latest_message.to_owned()))?;
debug!("Updated element {{{}}}", &id.to_string());
self.db().set_element_content(id.clone(), content)?;
self.db()
.set_element_latest_message(id.clone(), Some(latest_message))?;
debug!("Updated element {{{}}}", id.to_string());
let _ = self.state.emit_app_event(
queries::pots::get_pot_members(
self.db(),
&queries::elements::get(self.db(), &id)?
.pot()
.as_ref()
.unwrap(),
)?
.get(0)
.unwrap(),
AppEvent::ElementUpdate { id: id.clone() },
);
if let Some(el) = self.db().get_element(id.clone()) {
if let Some(pot) = el.pot() {
if let Ok(apps) = self.db().get_pot_members(pot.to_owned()) {
for app in apps {
self.state
.emit_app_event(&app, AppEvent::ElementUpdate { id: id.clone() })
}
}
}
}
Ok(())
}
pub fn remove_element(&self, id: &ElementId) -> anyhow::Result<()> {
let res = self.state.remove_element(id);
debug!("Removed element {{{}}}", &id.to_string());
res
pub fn remove_element(&self, id: ElementId) -> anyhow::Result<()> {
self.db()
.remove_element(id.clone())
.inspect(|_| debug!("Removed element {{{}}}", &id.to_string()))
}
pub fn get_element(&self, id: &ElementId) -> anyhow::Result<Element> {
self.state.get_element(id)
pub fn get_element(&self, id: ElementId) -> Option<Element> {
self.db().get_element(id)
}
pub fn set_peer(&self, id: &PeerId, name: &str) -> anyhow::Result<()> {
queries::peers::put(self.db(), id, name)?;
debug!("Set peer {} with address {}.", &name, id.to_string());
Ok(())
pub fn set_peer(&self, peer: Peer) -> anyhow::Result<()> {
self.db()
.add_peer(peer.clone())
.inspect(|_| debug!("Added peer {:?}.", peer))
}
pub fn get_peers(&self) -> anyhow::Result<Vec<Peer>> {
self.state.get_peers()
}
pub fn add_pot(&self, id: &PotId, app_type: &str) -> anyhow::Result<()> {
queries::pots::add(self.db(), id, app_type)?;
pub fn add_pot(&self, id: PotId, app_type: String) -> anyhow::Result<()> {
self.db().add_pot(id.clone(), app_type.clone())?;
let _ = self.state.emit_node_event(UbisyncNodeEvent::NewPot {
id: id.clone(),
app_type: app_type.to_string(),
id: id,
app_type: app_type,
});
Ok(())
}
fn db(&self) -> &DbInstance {
fn db(&self) -> &StateDB {
&self.state.db
}
}
@ -124,18 +112,18 @@ mod tests {
.with_max_level(Level::DEBUG)
.init();
let state = CommState::new(State::new("mem").await.unwrap());
let state = CommState::new(State::new(None).await.unwrap());
let id = ElementId::new();
let pot_id = PotId::new();
state
.add_received_element(
&id,
&ElementContent::Text("Test-text".to_string()),
&MessageId::new(),
&pot_id,
id.clone(),
ElementContent::Text("Test-text".to_string()),
Some(MessageId::new()),
pot_id,
)
.unwrap();
let el = state.get_element(&id).unwrap();
let el = state.get_element(id).unwrap();
assert_eq!(
ElementContent::Text("Test-text".to_string()),
el.content().to_owned()
@ -150,25 +138,25 @@ mod tests {
.with_max_level(Level::DEBUG)
.init();
let state = CommState::new(State::new("mem").await.unwrap());
let state = CommState::new(State::new(None).await.unwrap());
let id = ElementId::new();
let pot_id = PotId::new();
state
.add_received_element(
&id,
&ElementContent::Text("Test-text".to_string()),
&MessageId::new(),
&pot_id,
id.clone(),
ElementContent::Text("Test-text".to_string()),
Some(MessageId::new()),
pot_id,
)
.unwrap();
state
.update_element_content(
&id,
&ElementContent::Text("Test-text 2".to_string()),
&MessageId::new(),
id.clone(),
ElementContent::Text("Test-text 2".to_string()),
MessageId::new(),
)
.unwrap();
let el = state.get_element(&id).unwrap();
let el = state.get_element(id).unwrap();
assert_eq!(
ElementContent::Text("Test-text 2".to_string()),
el.content().to_owned()

View file

@ -0,0 +1,93 @@
use std::{fmt::Display, ops::{Deref, DerefMut}};
use bonsaidb::core::key::{Key, KeyEncoding};
use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize};
#[derive(Debug)]
pub(crate) struct KeyEncodingError;
impl Display for KeyEncodingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Error in handling a bonsaidb-encoded key")
}
}
impl std::error::Error for KeyEncodingError {}
pub(crate) trait SerdeCompatibleKey: Serialize + DeserializeOwned + Default + Clone + Send + Sync {}
impl<T> SerdeCompatibleKey for T where T: Serialize + DeserializeOwned + Default + Clone + Send + Sync {}
#[derive(Clone, Debug, Default, Serialize, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct AsKey<T: SerdeCompatibleKey>(T);
impl<T: SerdeCompatibleKey> AsKey<T>
{
pub fn new(key: T) -> Self {
AsKey(key)
}
}
impl<T: SerdeCompatibleKey> Deref for AsKey<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: SerdeCompatibleKey> DerefMut for AsKey<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<'de, T> Deserialize<'de> for AsKey<T>
where
T: SerdeCompatibleKey,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let t = T::deserialize(deserializer)?;
Ok(AsKey(t))
}
}
impl<'k, T: SerdeCompatibleKey> Key<'k> for AsKey<T>
{
const CAN_OWN_BYTES: bool = false;
fn first_value() -> Result<Self, bonsaidb::core::key::NextValueError> {
Ok(AsKey::default())
}
fn from_ord_bytes<'e>(
bytes: bonsaidb::core::key::ByteSource<'k, 'e>,
) -> Result<Self, Self::Error> {
match serde_json::from_slice(&*bytes) as Result<T, _> {
Ok(k) => Ok(AsKey(k)),
Err(_) => Err(KeyEncodingError),
}
}
}
impl<T: SerdeCompatibleKey> KeyEncoding for AsKey<T>
{
type Error = KeyEncodingError;
const LENGTH: Option<usize> = None;
fn describe<Visitor>(visitor: &mut Visitor)
where
Visitor: bonsaidb::core::key::KeyVisitor,
{
visitor.visit_type(bonsaidb::core::key::KeyKind::Bytes)
}
fn as_ord_bytes(&self) -> Result<std::borrow::Cow<'_, [u8]>, Self::Error> {
Ok(std::borrow::Cow::Owned(
serde_json::to_vec(&self.0).unwrap(),
))
}
}

View file

@ -0,0 +1,277 @@
use anyhow::{anyhow, Error};
use bonsaidb::core::schema::{Collection, SerializedCollection};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_with::chrono::{DateTime, Utc};
use ubisync_lib::types::{AppId, ElementId, Pot, PotId};
use crate::{
api::v0::app::App,
state::database::{as_key::AsKey, StateDB},
};
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "apps", views = [])]
pub(super) struct DbApps {
#[natural_id]
pub(super) id: AsKey<AppId>,
pub(super) last_access: DateTime<Utc>,
pub(super) app_type: String,
pub(super) name: String,
pub(super) description: String,
pub(super) default_pot: Option<PotId>,
}
impl From<DbApps> for App {
fn from(value: DbApps) -> Self {
App {
id: (*value.id).clone(),
app_type: value.app_type,
name: value.name,
description: value.description,
default_pot: value.default_pot,
last_access: value.last_access,
}
}
}
impl StateDB {
pub fn add_app(
&self,
id: AppId,
name: String,
description: String,
app_type: String,
) -> anyhow::Result<()> {
DbApps::push(
DbApps {
id: AsKey::new(id),
last_access: Utc::now(),
app_type,
name,
description,
default_pot: None,
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn set_default_pot(&self, id: AppId, pot: PotId) -> anyhow::Result<()> {
DbApps::get(&AsKey::new(id), &self.db)
.unwrap()
.unwrap()
.modify(&self.db, |app| app.contents.default_pot = Some(pot.clone()))
.map_err(|e| anyhow!(e))
}
pub fn get_default_pot(&self, id: AppId) -> anyhow::Result<Option<Pot>> {
let pot_id = DbApps::get(&AsKey::new(id), &self.db)?
.map(|app| app.contents.default_pot)
.ok_or(Error::msg("App not found"))?
.ok_or(Error::msg("Could not get default pot"))?;
self.get_pot(pot_id)
}
pub fn get_all_apps(&self) -> anyhow::Result<Vec<App>> {
Ok(DbApps::all(&self.db)
.query()?
.iter()
.map(|app| app.contents.clone().into())
.collect_vec())
}
pub fn get_all_app_ids(&self) -> anyhow::Result<Vec<AppId>> {
Ok(DbApps::all(&self.db)
.query()?
.iter()
.map(|app| (*app.contents.id).clone())
.collect_vec())
}
pub fn app_has_access(&self, app: AppId, element: ElementId) -> anyhow::Result<bool> {
if let Some(el) = self.get_element(element) {
Ok(self
.get_pot_members(
el.pot()
.clone()
.ok_or(Error::msg("Could not fetch pot members"))?,
)?
.contains(&app))
} else {
Err(Error::msg("Element not found"))
}
}
pub fn get_app(&self, id: AppId) -> anyhow::Result<Option<App>> {
DbApps::get(&AsKey::new(id), &self.db)
.map(|app_option| app_option.map(|app| app.contents.into()))
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::types::{AppId, ElementContent, ElementId, Pot, PotId};
use crate::{api::v0::app::App, state::database::StateDB};
#[test]
fn add_get() {
let db = StateDB::init(None);
let app_id = AppId::new();
db.add_app(
app_id.clone(),
"app name".to_string(),
"description".to_string(),
"app_type".to_string(),
)
.unwrap();
let retrieved_app = db.get_app(app_id.clone()).unwrap();
match retrieved_app {
Some(App {
id,
app_type,
name,
description,
default_pot,
last_access: _,
}) => {
assert_eq!(
(id, app_type, name, description, default_pot),
(
app_id,
"app_type".to_string(),
"app name".to_string(),
"description".to_string(),
None
)
)
}
None => assert!(false),
}
}
#[test]
fn get_default_pot() {
let db = StateDB::init(None);
let app_id = AppId::new();
db.add_app(
app_id.clone(),
"app name".to_string(),
"description".to_string(),
"app_type".to_string(),
)
.unwrap();
assert_eq!(db.get_default_pot(app_id).unwrap(), None)
}
#[test]
fn set_default_pot() {
let db = StateDB::init(None);
let app_id = AppId::new();
db.add_app(
app_id.clone(),
"app name".to_string(),
"description".to_string(),
"app_type".to_string(),
)
.unwrap();
let pot = Pot::new(PotId::new(), "app_type".to_string());
db.add_pot(pot.id.clone(), pot.app_type.clone()).unwrap();
db.set_default_pot(app_id.clone(), pot.id.clone()).unwrap();
assert_eq!(db.get_default_pot(app_id).unwrap(), Some(pot))
}
#[test]
fn get_apps() {
let db = StateDB::init(None);
assert_eq!(db.get_all_apps().unwrap(), vec![]);
let (app1, app2) = (AppId::new(), AppId::new());
db.add_app(
app1,
"name1".to_string(),
"desc1".to_string(),
"type1".to_string(),
)
.unwrap();
db.add_app(
app2,
"name2".to_string(),
"desc2".to_string(),
"type2".to_string(),
)
.unwrap();
assert_eq!(db.get_all_apps().unwrap().len(), 2);
}
#[test]
fn get_app_ids() {
let db = StateDB::init(None);
assert_eq!(db.get_all_app_ids().unwrap(), vec![]);
let (app1, app2) = (AppId::new(), AppId::new());
db.add_app(
app1.clone(),
"name1".to_string(),
"desc1".to_string(),
"type1".to_string(),
)
.unwrap();
db.add_app(
app2.clone(),
"name2".to_string(),
"desc2".to_string(),
"type2".to_string(),
)
.unwrap();
assert_eq!(db.get_all_app_ids().unwrap(), vec![app1, app2])
}
#[test]
fn app_access() {
let db = StateDB::init(None);
let app_id = AppId::new();
let pot_id = PotId::new();
let element_id = ElementId::new();
db.add_app(
app_id.clone(),
"name".to_string(),
"description".to_string(),
"app_type".to_string(),
)
.unwrap();
db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap();
db.add_element(
element_id.clone(),
ElementContent::Text("Text".to_string()),
None,
false,
pot_id.clone(),
)
.unwrap();
assert_eq!(
db.app_has_access(app_id.clone(), element_id.clone())
.unwrap(),
false
);
db.add_pot_membership(pot_id, app_id.clone()).unwrap();
assert_eq!(db.app_has_access(app_id, element_id).unwrap(), true);
}
}

View file

@ -0,0 +1,232 @@
use anyhow::{anyhow, Error};
use bonsaidb::core::schema::{Collection, SerializedCollection};
use serde::{Deserialize, Serialize};
use ubisync_lib::types::{Element, ElementContent, ElementId, MessageId, PotId};
use crate::state::database::{as_key::AsKey, StateDB};
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "elements", views = [])]
pub(super) struct DbElement {
#[natural_id]
pub(super) id: AsKey<ElementId>,
pub(super) content: ElementContent,
pub(super) latest_message: Option<MessageId>,
pub(super) local_changes: bool,
pub(super) pot: PotId,
}
impl From<DbElement> for Element {
fn from(value: DbElement) -> Self {
Element::from((
(*value.id).clone(),
Some(value.pot),
value.content,
value.latest_message,
value.local_changes,
))
}
}
impl StateDB {
pub fn add_element(
&self,
id: ElementId,
content: ElementContent,
latest_message: Option<MessageId>,
local_changes: bool,
pot: PotId,
) -> anyhow::Result<()> {
DbElement::push(
DbElement {
id: AsKey::new(id),
content,
latest_message,
local_changes,
pot,
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn get_element(&self, id: ElementId) -> Option<Element> {
DbElement::get(&AsKey::new(id), &self.db)
.ok()?
.map(|el| el.contents.into())
}
pub fn set_element_content(
&self,
id: ElementId,
content: ElementContent,
) -> anyhow::Result<()> {
DbElement::get(&AsKey::new(id), &self.db)
.map_err(|e| anyhow!(e))?
.ok_or(Error::msg("Could not find element by id"))?
.modify(&self.db, |t| t.contents.content = content.clone())
.map_err(|e| anyhow!(e))
}
pub fn set_element_latest_message(
&self,
id: ElementId,
message: Option<MessageId>,
) -> anyhow::Result<()> {
DbElement::get(&AsKey::new(id), &self.db)
.map_err(|e| anyhow!(e))?
.ok_or(Error::msg("Could not find element by id"))?
.modify(&self.db, |t| t.contents.latest_message = message.clone())
.map_err(|e| anyhow!(e))
}
pub fn set_element_local_changes(
&self,
id: ElementId,
local_changes: bool,
) -> anyhow::Result<()> {
DbElement::get(&AsKey::new(id), &self.db)
.map_err(|e| anyhow!(e))?
.ok_or(Error::msg("Could not find element by id"))?
.modify(&self.db, |t| t.contents.local_changes = local_changes)
.map_err(|e| anyhow!(e))
}
pub fn remove_element(&self, id: ElementId) -> anyhow::Result<()> {
DbElement::get(&AsKey::new(id), &self.db)
.map_err(|e| anyhow!(e))?
.ok_or(Error::msg("Could not find element by id"))?
.delete(&self.db)
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::types::{ElementContent, ElementId, MessageId, PotId};
use crate::state::database::StateDB;
#[test]
fn add_get() {
let db = StateDB::init(None);
let pot_id = PotId::new();
let element_id = ElementId::new();
db.add_element(
element_id.clone(),
ElementContent::Text("Content!!!".to_string()),
None,
false,
pot_id.clone(),
)
.unwrap();
let retrieved_element = db.get_element(element_id.clone());
assert_eq!(
Some(
(
element_id,
Some(pot_id),
ElementContent::Text("Content!!!".to_string()),
None,
false
)
.into()
),
retrieved_element
)
}
#[test]
fn set_content() {
let db = StateDB::init(None);
let element_id = ElementId::new();
db.add_element(
element_id.clone(),
ElementContent::Text("Content!!!".to_string()),
None,
false,
PotId::new(),
)
.unwrap();
db.set_element_content(
element_id.clone(),
ElementContent::Text("New Content!!!".to_string()),
)
.unwrap();
assert_eq!(
db.get_element(element_id).unwrap().content().to_owned(),
ElementContent::Text("New Content!!!".to_string())
)
}
#[test]
fn set_latest_message() {
let db = StateDB::init(None);
let element_id = ElementId::new();
db.add_element(
element_id.clone(),
ElementContent::Text("Content!!!".to_string()),
None,
false,
PotId::new(),
)
.unwrap();
assert_eq!(
db.get_element(element_id.clone())
.unwrap()
.latest_message()
.to_owned(),
None
);
let msg_id = MessageId::new();
db.set_element_latest_message(element_id.clone(), Some(msg_id.clone()))
.unwrap();
assert_eq!(
db.get_element(element_id)
.unwrap()
.latest_message()
.to_owned(),
Some(msg_id)
)
}
#[test]
fn set_local_changes() {
let db = StateDB::init(None);
let element_id = ElementId::new();
db.add_element(
element_id.clone(),
ElementContent::Text("Content!!!".to_string()),
None,
false,
PotId::new(),
)
.unwrap();
assert_eq!(
db.get_element(element_id.clone())
.unwrap().local_changes(),
false
);
db.set_element_local_changes(element_id.clone(), true)
.unwrap();
assert_eq!(
db.get_element(element_id)
.unwrap()
.local_changes(),
true
)
}
}

View file

@ -0,0 +1,21 @@
use bonsaidb::core::schema::Schema;
use apps::DbApps;
use elements::DbElement;
use peers::DbPeers;
use pot_memberships::DbPotMemberships;
use pots::DbPots;
mod apps;
mod elements;
mod peers;
mod pot_memberships;
mod pots;
#[derive(Schema, Debug)]
#[schema(name = "ubisync", collections = [DbElement, DbPotMemberships, DbApps, DbPots, DbPeers])]
pub struct UbisyncSchema;
#[cfg(test)]
mod tests {}

View file

@ -0,0 +1,106 @@
use anyhow::{anyhow, Error};
use bonsaidb::core::schema::{Collection, SerializedCollection};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ubisync_lib::{
peer::Peer,
types::{self, PeerId},
};
use crate::state::database::{as_key::AsKey, StateDB};
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "peers", views = [])]
pub(super) struct DbPeers {
#[natural_id]
pub(super) id: AsKey<types::PeerId>,
pub(super) name: Option<String>,
}
impl From<DbPeers> for Peer {
fn from(value: DbPeers) -> Self {
Peer::new((*value.id).clone(), value.name)
}
}
impl StateDB {
pub fn add_peer(&self, peer: Peer) -> anyhow::Result<()> {
DbPeers::push(
DbPeers {
id: AsKey::new(peer.id()),
name: peer.name(),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn get_peer(&self, id: PeerId) -> anyhow::Result<Option<Peer>> {
DbPeers::get(&AsKey::new(id), &self.db)
.map(|doc| doc.map(|peer| Peer::new((*peer.contents.id).clone(), peer.contents.name)))
.map_err(|e| anyhow!(e))
}
pub fn get_all_peers(&self) -> anyhow::Result<Vec<Peer>> {
DbPeers::all(&self.db)
.query()
.map(|peers| peers.iter().map(|p| p.contents.clone().into()).collect_vec())
.map_err(|e| anyhow!(e))
}
pub fn set_peer_name(&self, id: PeerId, name: &Option<String>) -> anyhow::Result<()> {
DbPeers::get(&AsKey::new(id), &self.db)
.map_err(|e| anyhow!(e))?
.ok_or(Error::msg("Peer not found"))?
.modify(&self.db, |doc| doc.contents.name = name.clone())
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::{peer::Peer, types::PeerId};
use crate::state::database::StateDB;
#[test]
fn add_get() {
let db = StateDB::init(None);
let peer = Peer::new(PeerId::default(), Some("Peer name".to_string()));
db.add_peer(peer.clone()).unwrap();
let retrieved_peer = db.get_peer(peer.id()).unwrap();
assert_eq!(Some(peer), retrieved_peer)
}
#[test]
fn get_all() {
let db = StateDB::init(None);
let peer = Peer::new(PeerId::default(), Some("Peer name".to_string()));
db.add_peer(peer.clone()).unwrap();
let all_peers = db.get_all_peers().unwrap();
assert_eq!(all_peers, vec![peer]);
}
#[test]
fn set_peer_name() {
let db = StateDB::init(None);
let peer = Peer::new(PeerId::default(), Some("Peer name".to_string()));
db.add_peer(peer.clone()).unwrap();
db.set_peer_name(peer.id().clone(), &Some("New peer name".to_string()))
.unwrap();
let retrieved_peer = db.get_peer(peer.id()).unwrap();
assert_eq!(
Some(Peer::new(peer.id(), Some("New peer name".to_string()))),
retrieved_peer
)
}
}

View file

@ -0,0 +1,173 @@
use anyhow::{anyhow, Error};
use bonsaidb::core::{
connection::Connection,
document::Emit,
schema::{Collection, MapReduce, SerializedCollection, View, ViewSchema},
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ubisync_lib::types::{AppId, PotId};
use crate::state::database::{as_key::AsKey, StateDB};
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "pot_memberships", primary_key = u128, views = [DbPotMembershipsByPotId, DbPotMembershipsByAppId, DbPotMembershipsByBothIds])]
pub(super) struct DbPotMemberships {
pub(super) pot_id: AsKey<PotId>,
pub(super) app_id: AsKey<AppId>,
}
#[derive(Debug, Clone, View, ViewSchema)]
#[view(collection = DbPotMemberships, key = AsKey<PotId>, value = Vec<AppId>, name = "by-pot-id")]
pub(super) struct DbPotMembershipsByPotId;
impl MapReduce for DbPotMembershipsByPotId {
fn map<'doc>(
&self,
document: &'doc bonsaidb::core::document::BorrowedDocument<'_>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let entry = DbPotMemberships::document_contents(document)?;
document
.header
.emit_key_and_value(entry.pot_id, vec![(*entry.app_id).clone()])
}
fn reduce(
&self,
mappings: &[bonsaidb::core::schema::MappedValue<
Self::MappedKey<'_>,
<Self::View as View>::Value,
>],
_rereduce: bool,
) -> Result<<Self::View as View>::Value, bonsaidb::core::Error> {
Ok(mappings
.iter()
.map(|mapping| mapping.value.clone())
.concat())
}
}
#[derive(Debug, Clone, View, ViewSchema)]
#[view(collection = DbPotMemberships, key = AsKey<AppId>, value = Vec<PotId>, name = "by-app-id")]
pub(super) struct DbPotMembershipsByAppId;
impl MapReduce for DbPotMembershipsByAppId {
fn map<'doc>(
&self,
document: &'doc bonsaidb::core::document::BorrowedDocument<'_>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let entry = DbPotMemberships::document_contents(document)?;
document
.header
.emit_key_and_value(entry.app_id, vec![(*entry.pot_id).clone()])
}
fn reduce(
&self,
mappings: &[bonsaidb::core::schema::MappedValue<
Self::MappedKey<'_>,
<Self::View as View>::Value,
>],
_rereduce: bool,
) -> Result<<Self::View as View>::Value, bonsaidb::core::Error> {
Ok(mappings
.iter()
.map(|mapping| mapping.value.clone())
.concat())
}
}
#[derive(Debug, Clone, View, ViewSchema)]
#[view(collection = DbPotMemberships, key = (AsKey<PotId>, AsKey<AppId>), value = DbPotMemberships, name = "by-both")]
pub(super) struct DbPotMembershipsByBothIds;
impl MapReduce for DbPotMembershipsByBothIds {
fn map<'doc>(
&self,
document: &'doc bonsaidb::core::document::BorrowedDocument<'_>,
) -> bonsaidb::core::schema::ViewMapResult<'doc, Self> {
let content = DbPotMemberships::document_contents(document)?;
document
.header
.emit_key_and_value((content.pot_id.clone(), content.app_id.clone()), content)
}
}
impl StateDB {
pub fn add_pot_membership(&self, pot: PotId, app: AppId) -> anyhow::Result<()> {
if let Err(_) = self.get_pot(pot.clone()) {
Err(Error::msg(
"A member was meant to be added to a pot which does not exist.",
))
} else if let Err(_) = self.get_app(app.clone()) {
Err(Error::msg(
"A member app which does not exist was meant to be added to a pot",
))
} else {
DbPotMemberships::push(
DbPotMemberships {
pot_id: AsKey::new(pot),
app_id: AsKey::new(app),
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
}
pub fn get(&self, pot: PotId, app: AppId) -> anyhow::Result<Option<()>> {
self.db
.view::<DbPotMembershipsByBothIds>()
.with_key(&(AsKey::new(pot), AsKey::new(app)))
.query_with_collection_docs()
.map(|_| Some(()))
.map_err(|e| anyhow!(e))
}
pub fn get_pot_members(&self, pot: PotId) -> anyhow::Result<Vec<AppId>> {
self.db
.view::<DbPotMembershipsByPotId>()
.with_key(&AsKey::new(pot))
.reduce()
.map_err(|e| anyhow!(e))
}
pub fn get_app_pot_memberships(&self, app: AppId) -> anyhow::Result<Vec<PotId>> {
self.db
.view::<DbPotMembershipsByAppId>()
.with_key(&AsKey::new(app))
.reduce()
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::types::{AppId, PotId};
use crate::state::database::StateDB;
#[test]
fn add_get() {
let db = StateDB::init(None);
let pot_id = PotId::new();
let app_id = AppId::new();
db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap();
db.add_app(
app_id.clone(),
"name".to_string(),
"description".to_string(),
"app_type".to_string(),
)
.unwrap();
db.add_pot_membership(pot_id.clone(), app_id.clone())
.unwrap();
let retrieved_members = db.get_pot_members(pot_id.clone()).unwrap();
assert_eq!(vec![app_id.clone()], retrieved_members);
let retrieved_memberships = db.get_app_pot_memberships(app_id).unwrap();
assert_eq!(vec![pot_id], retrieved_memberships)
}
}

View file

@ -0,0 +1,58 @@
use anyhow::anyhow;
use bonsaidb::core::schema::{Collection, SerializedCollection};
use serde::{Deserialize, Serialize};
use ubisync_lib::types::{Pot, PotId};
use crate::state::database::{as_key::AsKey, StateDB};
#[derive(Debug, Serialize, Deserialize, Collection, PartialEq, Clone)]
#[collection(name = "pots", views = [])]
pub(super) struct DbPots {
#[natural_id]
pub(super) id: AsKey<PotId>,
pub(super) app_type: String,
}
impl From<DbPots> for Pot {
fn from(value: DbPots) -> Self {
Pot {id: (*value.id).clone(), app_type: value.app_type}
}
}
impl StateDB {
pub fn add_pot(&self, id: PotId, app_type: String) -> anyhow::Result<()> {
DbPots::push(
DbPots {
id: AsKey::new(id),
app_type,
},
&self.db,
)
.map(|_| ())
.map_err(|e| anyhow!(e))
}
pub fn get_pot(&self, id: PotId) -> anyhow::Result<Option<Pot>> {
DbPots::get(&AsKey::new(id), &self.db)
.map(|pot_opt| pot_opt.map(|pot| pot.contents.into()))
.map_err(|e| anyhow!(e))
}
}
#[cfg(test)]
mod tests {
use ubisync_lib::types::{Pot, PotId};
use crate::state::database::StateDB;
#[test]
fn add_get() {
let db = StateDB::init(None);
let pot_id = PotId::new();
db.add_pot(pot_id.clone(), "app_type".to_string()).unwrap();
let retrieved_pot = db.get_pot(pot_id.clone()).unwrap();
assert_eq!(retrieved_pot, Some(Pot {id: pot_id, app_type: "app_type".to_string()}))
}
}

View file

@ -0,0 +1,27 @@
use bonsaidb::local::{
config::{Builder, StorageConfiguration},
Database as BonsaiDb,
};
use uuid::Uuid;
use self::collections::UbisyncSchema;
mod as_key;
mod collections;
pub struct StateDB {
db: BonsaiDb,
}
impl StateDB {
pub fn init(path: Option<String>) -> Self {
let storage_conf = match path {
Some(p) => StorageConfiguration::new(p),
None => StorageConfiguration::new(format!("/tmp/{}", Uuid::new_v4())),
// None => StorageConfiguration::default().memory_only()
};
StateDB {
db: BonsaiDb::open::<UbisyncSchema>(storage_conf).unwrap(),
}
}
}

View file

@ -1,3 +1,4 @@
use crate::{api::v0::app::App, comm::CommHandle, node_events::UbisyncNodeEvent, Ubisync};
use std::{
collections::HashMap,
sync::{
@ -5,18 +6,6 @@ use std::{
Arc, Mutex, RwLock,
},
};
use anyhow::Error;
use cozo::DbInstance;
use tracing::{debug, error, warn};
mod api_state;
mod comm_state;
mod queries;
mod schema;
pub use api_state::ApiState;
pub use comm_state::CommState;
use ubisync_lib::{
api::events::AppEvent,
messages::{Message, MessageContent},
@ -24,28 +13,31 @@ use ubisync_lib::{
types::{AppId, Element, ElementContent, ElementId, PotId, Tag},
};
use crate::{api::v0::app::App, comm::CommHandle, node_events::UbisyncNodeEvent, Ubisync};
use anyhow::Error;
use tracing::{debug, warn};
mod api_state;
mod comm_state;
mod database;
pub use api_state::ApiState;
pub use comm_state::CommState;
use self::database::StateDB;
pub struct State {
db: DbInstance,
db: StateDB,
comm_handle: RwLock<Option<Arc<CommHandle>>>,
app_event_channels: RwLock<HashMap<AppId, (Sender<AppEvent>, Arc<Mutex<Receiver<AppEvent>>>)>>,
node_event_callback: RwLock<Option<Box<dyn Fn(UbisyncNodeEvent) + Send + Sync>>>,
}
impl State {
pub async fn new(db_location: &str) -> anyhow::Result<Arc<State>> {
let db = match db_location {
"mem" => DbInstance::new("mem", "", Default::default()),
path => DbInstance::new("rocksdb", path, Default::default()),
};
match db {
Ok(d) => {
//TODO: change "add schema" to "ensure newest schema"
schema::add_schema(&d)?;
pub async fn new(db_path: Option<String>) -> anyhow::Result<Arc<State>> {
let db = StateDB::init(db_path);
let state = Arc::new(State {
db: d,
db,
comm_handle: RwLock::new(None),
app_event_channels: Default::default(),
node_event_callback: RwLock::new(None),
@ -53,9 +45,6 @@ impl State {
state.init_event_channels();
Ok(state)
}
Err(e) => Err(Error::msg(format!("{:?}", e))),
}
}
pub fn set_comm_handle(&self, handle: Arc<CommHandle>) {
*self
@ -78,76 +67,70 @@ impl State {
}
pub fn get_apps(&self) -> anyhow::Result<Vec<App>> {
queries::apps::get_all(&self.db)
self.db.get_all_apps()
}
pub fn set_element_content(
&self,
element_id: &ElementId,
content: &ElementContent,
element_id: ElementId,
content: ElementContent,
) -> anyhow::Result<()> {
let res = queries::elements::set_content(&self.db, element_id, content);
debug!(
"Set content of element with id {:?}: {:?}",
element_id,
self.get_element(element_id)
);
self.db
.set_element_content(element_id.clone(), content.clone())
.inspect(|_| {
self.send_to_peers(MessageContent::SetElement {
id: element_id.clone(),
content: content.clone(),
});
res
})
})
}
pub fn remove_element(&self, element_id: &ElementId) -> anyhow::Result<()> {
let res = queries::elements::remove(&self.db, element_id);
pub fn remove_element(&self, element_id: ElementId) -> anyhow::Result<()> {
self.db.remove_element(element_id.clone()).inspect(|_| {
self.send_to_peers(MessageContent::RemoveElement {
id: element_id.clone(),
});
res
}
pub fn get_element(&self, id: &ElementId) -> anyhow::Result<Element> {
queries::elements::get(&self.db, id)
}
pub fn get_elements_by_tag(&self, tag: &Tag) -> Vec<ElementId> {
queries::elements::get_by_tag(&self.db, tag)
.map_err(|e| {
error!("{}", e);
e
})
.unwrap_or(vec![])
})
}
pub fn set_peer(&self, peer: &Peer) -> anyhow::Result<()> {
queries::peers::put(&self.db, &peer.id(), &peer.name())
pub fn get_element(&self, id: ElementId) -> Option<Element> {
self.db.get_element(id)
}
pub fn get_elements_by_tag(&self, _tag: &Tag) -> Vec<ElementId> {
todo!()
}
pub fn set_peer(&self, peer: Peer) -> anyhow::Result<()> {
self.db.add_peer(peer)
}
pub fn get_peers(&self) -> anyhow::Result<Vec<Peer>> {
queries::peers::get(&self.db)
self.db.get_all_peers()
}
pub fn add_pot_member(&self, pot: &PotId, app: &AppId) -> anyhow::Result<()> {
debug!("Pot: {:?}", queries::pots::get(&self.db, pot));
match queries::pots::add_pot_member(&self.db, pot, app) {
Ok(_) => {
pub fn add_pot_member(&self, pot: PotId, app: AppId) -> anyhow::Result<()> {
let p = self
.db
.get_pot(pot.clone())?
.ok_or(Error::msg("Could not find pot"))?;
self.db
.add_pot_membership(pot.clone(), app.clone())
.inspect(|_| {
self.emit_app_event(
app,
&app,
AppEvent::NewPot {
id: pot.clone(),
app_type: queries::pots::get(&self.db, pot)?.app_type,
app_type: p.app_type,
},
);
Ok(())
}
Err(e) => Err(e),
}
)
})
}
pub fn get_event_receiver(&self, app: &AppId) -> anyhow::Result<Arc<Mutex<Receiver<AppEvent>>>> {
pub fn get_event_receiver(
&self,
app: &AppId,
) -> anyhow::Result<Arc<Mutex<Receiver<AppEvent>>>> {
self.create_event_channel_if_not_exists(app.clone());
match self.app_event_channels.read() {
Ok(map) => match map.get(&app) {
@ -184,8 +167,7 @@ impl State {
debug!("Emitting app event failed: {:?}", e);
}
}
}
else {
} else {
debug!("Failed to get channels");
}
}
@ -204,7 +186,7 @@ impl State {
}
fn init_event_channels(&self) {
let app_ids = queries::apps::get_all_ids(&self.db).unwrap_or(vec![]);
let app_ids = self.db.get_all_app_ids().unwrap();
for app in app_ids {
self.create_event_channel_if_not_exists(app);
}

View file

@ -1,312 +0,0 @@
use std::collections::BTreeMap;
use anyhow::{bail, Error};
use cozo::{DataValue, DbInstance, Num, ScriptMutability};
use itertools::Itertools;
use serde_with::chrono::{DateTime, Utc};
use tracing::{debug, warn};
use ubisync_lib::types::{AppId, Pot, PotId};
use crate::{api::v0::app::App, run_query};
pub fn add(
db: &DbInstance,
id: &AppId,
last_access: &DateTime<Utc>,
name: &str,
description: &str,
app_type: &str,
) -> anyhow::Result<()> {
let params = vec![
(
"id",
DataValue::Str(serde_json::to_string(&id).unwrap().into()),
),
(
"last_access",
DataValue::Num(Num::Int(last_access.timestamp())),
),
("name", DataValue::Str(name.into())),
("description", DataValue::Str(description.into())),
("app_type", DataValue::Str(app_type.into())),
("default_pot", DataValue::Null),
];
match run_query!(
&db,
":insert apps {id => last_access, app_type, name, description, default_pot}",
params,
cozo::ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}
pub fn exists(db: &DbInstance, id: &AppId) -> anyhow::Result<bool> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script(
"?[name] := *apps[$id, last_access, app_type, name, description, default_pot]",
params,
cozo::ScriptMutability::Immutable,
);
if let Ok(rows) = result {
return Ok(rows.rows.len() == 1);
}
Err(Error::msg("Could not check whether app is registered"))
}
pub fn get_all(db: &DbInstance) -> anyhow::Result<Vec<App>> {
match db.run_script(
"?[id, last_access, app_type, name, description, default_pot] := *apps[id, last_access, app_type, name, description, default_pot]",
BTreeMap::default(),
ScriptMutability::Immutable) {
Ok(named_rows) => {
Ok(named_rows.rows.iter().filter_map(|row| {
if let [
DataValue::Str(id),
DataValue::Num(Num::Int(last_access)),
DataValue::Str(app_type),
DataValue::Str(name),
DataValue::Str(description),
default_pot,
] = row.as_slice() {
Some(App {
id: serde_json::from_str(&id).unwrap(),
app_type: app_type.to_string(),
last_access: DateTime::from_timestamp(last_access.to_owned(), 0).unwrap(),
name: name.to_string(),
description: description.to_string(),
default_pot: default_pot.get_str().map(|str| serde_json::from_str(str).unwrap())}
)
}
else {
None
}
}).collect_vec())
},
Err(e) => bail!(e),
}
}
pub fn get_all_ids(db: &DbInstance) -> anyhow::Result<Vec<AppId>> {
let result = db.run_script(
"?[id] := *apps{id}",
BTreeMap::new(),
ScriptMutability::Immutable,
);
match result {
Ok(named_rows) => Ok(named_rows
.rows
.iter()
.filter_map(|row| {
if let [DataValue::Str(app_id)] = row.as_slice() {
serde_json::from_str::<AppId>(app_id).ok()
} else {
None
}
})
.collect()),
Err(e) => bail!(e),
}
}
pub fn get(db: &DbInstance, id: &AppId) -> anyhow::Result<App> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script(
"?[last_access, app_type, name, description, default_pot] := *apps[$id, last_access, app_type, name, description, default_pot]",
params,
cozo::ScriptMutability::Immutable,
);
if let Ok(rows) = result {
if let Some(firstrow) = rows.rows.first() {
if let [DataValue::Num(Num::Int(ts)), DataValue::Str(app_type), DataValue::Str(name), DataValue::Str(desc), default_pot] =
firstrow.as_slice()
{
let last_access = DateTime::from_timestamp(ts.to_owned(), 0)
.ok_or(Error::msg("Failed to deserialize timestamp"))?;
let pot_id = match default_pot {
DataValue::Str(dpid) => Some(serde_json::from_str(dpid)?),
_ => None,
};
return Ok(App {
id: id.clone(),
app_type: app_type.to_string(),
last_access,
name: name.to_string(),
description: desc.to_string(),
default_pot: pot_id,
});
}
}
}
Err(Error::msg("Could not find app"))
}
pub fn set_default_pot(db: &DbInstance, id: &AppId, pot: &PotId) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(id)?.into())),
(
"default_pot",
DataValue::Str(serde_json::to_string(pot)?.into()),
),
];
match run_query!(
&db,
":update apps {id => default_pot}",
params.clone(),
ScriptMutability::Mutable
) {
Ok(_) => match run_query!(
&db,
":put pot_memberships {pot_id = default_pot, app_id = id}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
},
Err(report) => bail!(report),
}
}
pub fn get_default_pot(db: &DbInstance, app: &AppId) -> anyhow::Result<Pot> {
let mut params = BTreeMap::new();
params.insert(
"app_id".to_string(),
DataValue::Str(serde_json::to_string(&app)?.into()),
);
let result = db.run_script(
"
default_pot[pot_id] := *apps{id: $app_id, default_pot: pot_id}
?[pot_id, app_type] := default_pot[pot_id], *pots[pot_id, app_type]
",
params.clone(),
ScriptMutability::Immutable,
);
match result {
Ok(rows) => {
if let Some(firstrow) = rows.rows.first() {
if let [DataValue::Str(pot_id_str), DataValue::Str(app_type)] = firstrow.as_slice()
{
Ok(Pot {
id: serde_json::from_str(&pot_id_str)?,
app_type: app_type.to_string(),
})
} else {
Err(Error::msg("Failed to deserialize query result"))
}
} else {
Err(Error::msg("App not found"))
}
}
Err(e) => bail!(e),
}
}
pub fn create_pot(
db: &DbInstance,
pot_id: &PotId,
app_id: &AppId,
app_type: &str,
) -> anyhow::Result<()> {
let params = vec![
(
"pot_id",
DataValue::Str(serde_json::to_string(pot_id)?.into()),
),
(
"app_id",
DataValue::Str(serde_json::to_string(app_id)?.into()),
),
("app_type", DataValue::Str(app_type.into())),
];
match run_query!(
&db,
":insert pots {id = pot_id => app_type}",
params.clone(),
ScriptMutability::Mutable
) {
Ok(_) => match run_query!(
&db,
":insert pot_memberships {pot_id => app_id}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(e) => {
warn!("{:?}", e);
bail!(e)
}
},
Err(e) => {
warn!("{:?}", e);
bail!(e)
}
}
}
#[cfg(test)]
mod tests {
use cozo::DbInstance;
use itertools::Itertools;
use serde_with::chrono::Utc;
use tracing::{debug, Level};
use ubisync_lib::types::{AppId, PotId};
use crate::state::{queries::pots, schema};
#[test]
pub fn default_pot() {
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
let db = DbInstance::new("mem", "", Default::default()).unwrap();
schema::add_schema(&db).unwrap();
let app_id = AppId::new();
let pot_id = PotId::new();
super::add(&db, &app_id, &Utc::now(), "name", "description", "app_type").unwrap();
pots::add(&db, &pot_id, "app_type").unwrap();
super::set_default_pot(&db, &app_id, &pot_id).unwrap();
debug!("Result: {:?}", super::get_default_pot(&db, &app_id));
}
#[test]
pub fn add_and_get_all() {
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
let db = DbInstance::new("mem", "", Default::default()).unwrap();
schema::add_schema(&db).unwrap();
super::add(&db, &AppId::new(), &Utc::now(), "app1", "description", "app_type").unwrap();
super::add(&db, &AppId::new(), &Utc::now(), "app2", "description", "app_type").unwrap();
let all_ids = super::get_all_ids(&db).unwrap();
let all_apps = super::get_all(&db).unwrap();
assert_eq!(all_ids.len(), 2);
assert_eq!(all_ids, all_apps.iter().map(|app| app.id.clone()).collect_vec());
}
}

View file

@ -1,218 +0,0 @@
use std::collections::BTreeMap;
use anyhow::{bail, Error};
use cozo::{DataValue, DbInstance, JsonData, ScriptMutability};
use serde_json::Value;
use tracing::error;
use crate::{
run_query,
state::{Element, ElementContent, ElementId},
};
use ubisync_lib::types::{AppId, MessageId, PotId, Tag};
pub fn add(
db: &DbInstance,
id: &ElementId,
content: &ElementContent,
latest_message: Option<MessageId>,
local_changes: bool,
pot: &PotId,
) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(&id)?.into())),
(
"content",
DataValue::Json(JsonData(serde_json::to_value(content)?)),
),
(
"latest_message",
match latest_message {
Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()),
None => DataValue::Null,
},
),
("local_changes", DataValue::Bool(local_changes)),
("pot", DataValue::Str(serde_json::to_string(pot)?.into())),
];
match run_query!(
&db,
":insert elements {id => content, latest_message, local_changes, pot}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}
pub fn set_content(
db: &DbInstance,
id: &ElementId,
content: &ElementContent,
) -> anyhow::Result<()> {
set_property(
db,
id,
"content",
DataValue::Json(JsonData(serde_json::to_value(content)?)),
)
}
pub fn set_latest_message(
db: &DbInstance,
id: &ElementId,
latest_message: Option<MessageId>,
) -> anyhow::Result<()> {
set_property(
db,
id,
"latest_message",
match latest_message {
Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()),
None => DataValue::Null,
},
)
}
pub fn set_local_changes(
db: &DbInstance,
id: &ElementId,
local_changes: bool,
) -> anyhow::Result<()> {
set_property(db, id, "local_changes", DataValue::Bool(local_changes))
}
pub fn remove(db: &DbInstance, id: &ElementId) -> anyhow::Result<()> {
match run_query!(
&db,
":delete elements {id}",
vec![("id", DataValue::Str(serde_json::to_string(&id)?.into()))],
cozo::ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}
pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result<Element> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script("
?[content, latest_message, local_changes, pot] := *elements[$id, content, latest_message, local_changes, pot]
", params, cozo::ScriptMutability::Immutable);
match result {
Ok(val) => {
if let Some(firstrow) = val.rows.first() {
if let [DataValue::Json(JsonData(content)), latest_message, DataValue::Bool(local_changes), DataValue::Str(pot_id)] =
firstrow.as_slice()
{
return Ok(Element::from((
id.to_owned(),
Some(serde_json::from_str(pot_id)?),
serde_json::from_value(content.to_owned())?,
match latest_message {
DataValue::Str(s) => Some(serde_json::from_str(s)?),
_ => None,
},
local_changes.to_owned(),
)));
}
return Err(Error::msg("Could not parse db result as Element"));
} else {
return Err(Error::msg("No rows returned for element query"));
}
}
Err(report) => bail!(report),
}
}
pub fn get_app_access(db: &DbInstance, id: &ElementId, app: &AppId) -> anyhow::Result<bool> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
params.insert(
"app_id".to_string(),
DataValue::Str(serde_json::to_string(&app)?.into()),
);
//TODO id in script is not bound to $id parameter
let result = db.run_script(
"
memberships[pot_id] := *pot_memberships{pot_id, app_id}
?[id] := memberships[pot_id], *elements{id, pot: pot_id}
",
params.clone(),
cozo::ScriptMutability::Immutable,
);
match result {
Ok(named_rows) => Ok(named_rows.rows.len() > 0),
Err(report) => bail!(report),
}
}
pub fn get_by_tag(db: &DbInstance, tag: &Tag) -> anyhow::Result<Vec<ElementId>> {
let mut params = BTreeMap::new();
params.insert(
"tag".to_string(),
DataValue::Str(serde_json::to_string(tag)?.into()),
);
let result = db.run_script(
"
?[element] := *tags[$tag, element]
",
params,
cozo::ScriptMutability::Immutable,
);
match result {
Ok(named_rows) => {
let mut element_ids = vec![];
for row in named_rows.rows {
if let [DataValue::Json(JsonData(Value::String(element_id)))] = row.as_slice() {
match serde_json::from_str(&element_id) {
Ok(id) => element_ids.push(id),
Err(e) => {
error!("Error parsing element id {}: {}", element_id, e);
continue;
}
}
}
}
Ok(element_ids)
}
Err(report) => bail!(report),
}
}
fn set_property(
db: &DbInstance,
id: &ElementId,
key: &str,
value: DataValue,
) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(id)?.into())),
(key, value),
];
match run_query!(
&db,
format!(":update elements {{id => {key}}}"),
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}

View file

@ -1,51 +0,0 @@
pub mod apps;
pub mod elements;
pub mod peers;
pub mod pots;
#[macro_export]
macro_rules! build_query {
($payload:expr, $params:expr) => {{
use cozo::DataValue;
use std::collections::BTreeMap;
// Build parameters map
let mut params_map: BTreeMap<String, DataValue> = Default::default();
let mut parameters_init = String::new();
if $params.len() > 0 {
for (name, value) in $params {
let _: &str = name; // only for type annotation
params_map.insert(name.to_string(), value);
}
// First line: Initialize parameters, make them available in CozoScript
use itertools::Itertools;
parameters_init += "?[";
parameters_init += &params_map
.iter()
.map(|(name, _)| name)
.format(", ")
.to_string();
parameters_init += "] <- [[";
parameters_init += &params_map
.iter()
.map(|(name, _)| format!("${}", name))
.format(", ")
.to_string();
parameters_init += "]]";
}
// Return query string and parameters map
(format!("{}\n\n{}", parameters_init, $payload), params_map)
}};
}
use build_query;
#[macro_export]
macro_rules! run_query {
($db:expr, $payload:expr, $params:expr, $mutability:expr) => {{
let (query, parameters) = crate::state::queries::build_query!($payload, $params);
$db.run_script(query.as_str(), parameters, $mutability)
}};
}

View file

@ -1,50 +0,0 @@
use anyhow::Error;
use cozo::{DataValue, DbInstance, ScriptMutability};
use ubisync_lib::{peer::Peer, types::PeerId};
use crate::run_query;
pub fn put(db: &DbInstance, id: &PeerId, name: &str) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(id)?.into())),
("name", DataValue::Str(serde_json::to_string(name)?.into())),
];
match run_query!(
&db,
":put peers {id => name}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn get(db: &DbInstance) -> anyhow::Result<Vec<Peer>> {
let result = db.run_script(
"
?[id, name] := *peers{id, name}
",
Default::default(),
cozo::ScriptMutability::Immutable,
);
match result {
Ok(rows) => Ok(rows
.rows
.into_iter()
.map(|row| match row.as_slice() {
[DataValue::Str(id_string), DataValue::Str(name_string)] => {
if let Ok(id) = serde_json::from_str(&id_string) {
Some(Peer::new(id, name_string.as_str().to_string()))
} else {
None
}
}
_ => None,
})
.flatten()
.collect()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}

View file

@ -1,130 +0,0 @@
use std::collections::BTreeMap;
use anyhow::Error;
use cozo::{DataValue, DbInstance, ScriptMutability};
use tracing::debug;
use ubisync_lib::types::{AppId, Pot, PotId};
use crate::run_query;
pub fn add(db: &DbInstance, id: &PotId, app_type: &str) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(id)?.into())),
(
"app_type",
DataValue::Str(serde_json::to_string(app_type)?.into()),
),
];
match run_query!(
&db,
":put pots {id => app_type}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result<Pot> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script(
"
?[app_type] := *pots[$id, app_type]
",
params,
cozo::ScriptMutability::Immutable,
);
match result {
Ok(rows) => {
if let Some(firstrow) = rows.rows.first() {
if let [DataValue::Str(app_type)] = firstrow.as_slice() {
Ok(Pot::new(
id.clone(),
serde_json::from_str(app_type)?,
))
} else {
Err(Error::msg("Could not parse result from query"))
}
} else {
Err(Error::msg("Pot not found"))
}
}
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn add_pot_member(db: &DbInstance, pot: &PotId, app: &AppId) -> anyhow::Result<()> {
let params = vec![
("pot_id", DataValue::Str(serde_json::to_string(pot)?.into())),
("app_id", DataValue::Str(serde_json::to_string(app)?.into())),
];
match run_query!(
&db,
":insert pot_memberships {pot_id, app_id}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result<Vec<AppId>> {
let mut params = BTreeMap::new();
params.insert(
"id".to_string(),
DataValue::Str(serde_json::to_string(&id)?.into()),
);
let result = db.run_script(
"
?[app_id] := *pot_memberships{id: $id, app_id}
",
params,
cozo::ScriptMutability::Immutable,
);
match result {
Ok(rows) => Ok(rows
.rows
.iter()
.map(|row| {
if let [DataValue::Str(app_id_string)] = row.as_slice() {
if let Ok(app_id) = serde_json::from_str::<AppId>(&app_id_string) {
Some(app_id)
} else {
None
}
} else {
None
}
})
.flatten()
.collect()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
#[cfg(test)]
mod tests {
use cozo::DbInstance;
use ubisync_lib::types::PotId;
use crate::state::schema;
#[test]
pub fn add_and_get() {
let db = DbInstance::new("mem", "", Default::default()).unwrap();
schema::add_schema(&db).unwrap();
let pot_id = PotId::new();
super::add(&db, &pot_id, "app_type").unwrap();
let returned = super::get(&db, &pot_id).unwrap();
assert_eq!(pot_id, returned.id);
assert_eq!("app_type", returned.app_type);
}
}

View file

@ -1,53 +0,0 @@
use std::collections::BTreeMap;
use anyhow::Error;
use cozo::DbInstance;
pub fn add_schema(db: &DbInstance) -> anyhow::Result<()> {
let params = BTreeMap::new();
match db.run_script(
"
{:create apps {
id: String,
=>
last_access: Int,
app_type: String,
name: String,
description: String,
default_pot: String?,
}}
{:create peers {
id: String,
=>
name: String,
}}
{:create pots {
id: String,
=>
app_type: String
}}
{:create pot_memberships {
pot_id: String,
=>
app_id: String,
}}
{:create elements {
id: String,
=>
content: Json,
latest_message: String?,
local_changes: Bool,
pot: String,
}}
{:create tags {
tag: String,
element: String,
}}
",
params,
cozo::ScriptMutability::Mutable,
) {
Ok(_) => Ok(()),
Err(e) => Err(Error::msg(format!("Failed to set up schema: {}", e))),
}
}

View file

@ -43,7 +43,7 @@ async fn two_nodes_element_creation() {
move |ev, node| {
if let UbisyncNodeEvent::NewPot { id, app_type } = ev {
debug!("callback called");
node.add_pot_member(&id, &app_id2).unwrap();
node.add_pot_member(id, app_id2.clone()).unwrap();
}
},
ubi2.clone(),
@ -105,7 +105,7 @@ async fn two_nodes_api_event() {
move |ev, node| {
debug!("callback called");
if let UbisyncNodeEvent::NewPot { id, app_type } = ev {
node.add_pot_member(&id, &app_id1).unwrap();
node.add_pot_member(id, app_id1.clone()).unwrap();
}
},
ubi1.clone(),