From d2580607690dc686cd57dcc39f2b00b6850092ef Mon Sep 17 00:00:00 2001 From: "Philip (a-0)" <@ph:a-0.me> Date: Wed, 24 Jan 2024 17:53:50 +0100 Subject: [PATCH] Event system - App events, can be polled by apps using the HTTP API - Node events, can be processed by a node implementation by registering a callback in the `Ubisync` object - Some further additions, like adding pot members etc. to test these new event functions --- Cargo.lock | 1 + ubisync-lib/src/api/app.rs | 3 +- ubisync-lib/src/api/events.rs | 42 +++++++++ ubisync-lib/src/api/mod.rs | 1 + ubisync-lib/src/types/app_id.rs | 11 +++ ubisync-lib/src/types/element.rs | 3 + ubisync-lib/src/types/mod.rs | 3 + ubisync-lib/src/types/pot_id.rs | 2 +- ubisync-sdk/Cargo.toml | 1 + ubisync-sdk/src/lib.rs | 47 ++++++---- ubisync/src/api/v0/app.rs | 19 ++-- ubisync/src/api/v0/element.rs | 3 +- ubisync/src/api/v0/events.rs | 32 +++++++ ubisync/src/api/v0/mod.rs | 2 + ubisync/src/comm/message_processor.rs | 2 - ubisync/src/lib.rs | 18 +++- ubisync/src/node_events/mod.rs | 10 +++ ubisync/src/state/api_state.rs | 55 +++++++++--- ubisync/src/state/comm_state.rs | 39 +++++---- ubisync/src/state/mod.rs | 119 ++++++++++++++++++++++++-- ubisync/src/state/queries/apps.rs | 14 +-- ubisync/src/state/queries/elements.rs | 7 +- ubisync/src/state/queries/pots.rs | 51 +++++++++-- ubisync/tests/api.rs | 90 ++++++++++++++++++- 24 files changed, 479 insertions(+), 96 deletions(-) create mode 100644 ubisync-lib/src/api/events.rs create mode 100644 ubisync-lib/src/types/app_id.rs create mode 100644 ubisync/src/api/v0/events.rs create mode 100644 ubisync/src/node_events/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 664c604..87cd4aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3094,6 +3094,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "tokio", "tracing", "tracing-subscriber", "ubisync-lib", diff --git a/ubisync-lib/src/api/app.rs b/ubisync-lib/src/api/app.rs index 22a16d9..8f83b79 100644 --- a/ubisync-lib/src/api/app.rs +++ b/ubisync-lib/src/api/app.rs @@ -1,7 +1,7 @@ use reqwest::Method; use serde::{Deserialize, Serialize}; -use crate::types::{PotId, Pot}; +use crate::types::{AppId, Pot, PotId}; use super::UbisyncRequest; @@ -15,6 +15,7 @@ pub struct AppRegisterRequest { #[derive(Serialize, Deserialize)] pub struct AppRegisterResponse { pub token: String, + pub app_id: AppId, } impl UbisyncRequest for AppRegisterRequest { diff --git a/ubisync-lib/src/api/events.rs b/ubisync-lib/src/api/events.rs new file mode 100644 index 0000000..50e675f --- /dev/null +++ b/ubisync-lib/src/api/events.rs @@ -0,0 +1,42 @@ +use reqwest::Method; +use serde::{Serialize, Deserialize}; + +use crate::types::{ElementId, PotId}; + +use super::UbisyncRequest; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum AppEvent { + NewPot { + id: PotId, + app_type: String, + }, + ElementUpdate { + id: ElementId, + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PollEventsRequest { + pub timeout: u16, + pub accumulate: u16, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PollEventsResponse { + pub events: Vec, +} + + +impl UbisyncRequest for PollEventsRequest { + type PathParameters = (); + type Response = PollEventsResponse; + + fn method(&self) -> reqwest::Method { + Method::GET + } + + fn path(&self, _: Self::PathParameters) -> String { + "/events".to_string() + } +} \ No newline at end of file diff --git a/ubisync-lib/src/api/mod.rs b/ubisync-lib/src/api/mod.rs index 13ba5a7..0b61791 100644 --- a/ubisync-lib/src/api/mod.rs +++ b/ubisync-lib/src/api/mod.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; pub mod app; pub mod element; +pub mod events; /// Any struct defining a request body for the ubisync API must implement this trait /// It is used both by the client in the SDK and by the API logic in the ubisync node diff --git a/ubisync-lib/src/types/app_id.rs b/ubisync-lib/src/types/app_id.rs new file mode 100644 index 0000000..91208e5 --- /dev/null +++ b/ubisync-lib/src/types/app_id.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct AppId(Uuid); + +impl AppId { + pub fn new() -> Self { + AppId { 0: Uuid::new_v4() } + } +} \ No newline at end of file diff --git a/ubisync-lib/src/types/element.rs b/ubisync-lib/src/types/element.rs index dd3a6ee..d3c5680 100644 --- a/ubisync-lib/src/types/element.rs +++ b/ubisync-lib/src/types/element.rs @@ -43,4 +43,7 @@ impl Element { pub fn content(&self) -> &ElementContent { &self.content } + pub fn pot(&self) -> &Option { + &self.pot + } } diff --git a/ubisync-lib/src/types/mod.rs b/ubisync-lib/src/types/mod.rs index 754d7f0..d1d6af7 100644 --- a/ubisync-lib/src/types/mod.rs +++ b/ubisync-lib/src/types/mod.rs @@ -1,3 +1,6 @@ +mod app_id; +pub use app_id::AppId; + mod element_content; pub use element_content::ElementContent; diff --git a/ubisync-lib/src/types/pot_id.rs b/ubisync-lib/src/types/pot_id.rs index bcad24b..b580b3c 100644 --- a/ubisync-lib/src/types/pot_id.rs +++ b/ubisync-lib/src/types/pot_id.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct PotId(Uuid); impl PotId { pub fn new() -> Self { diff --git a/ubisync-sdk/Cargo.toml b/ubisync-sdk/Cargo.toml index 1280caf..eb46550 100644 --- a/ubisync-sdk/Cargo.toml +++ b/ubisync-sdk/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1.0.79" reqwest = { version = "0.11.23", features = [ "json" ] } serde = { version = "1.0.166", features = [ "derive" ] } serde_json = "1.0.99" +tokio = "1.35.1" tracing = "0.1.37" tracing-subscriber = "0.3.17" diff --git a/ubisync-sdk/src/lib.rs b/ubisync-sdk/src/lib.rs index 0cc7f08..3c91229 100644 --- a/ubisync-sdk/src/lib.rs +++ b/ubisync-sdk/src/lib.rs @@ -2,11 +2,14 @@ use anyhow::anyhow; use error::UbisyncError; use reqwest::{Client, StatusCode}; use tracing::debug; -use ubisync_lib::api::{ - app::{AppRegisterRequest, AppRegisterResponse, AppCreatePotRequest}, - UbisyncRequest, -}; pub use ubisync_lib::*; +use ubisync_lib::{ + api::{ + app::{AppCreatePotRequest, AppRegisterRequest, AppRegisterResponse}, + UbisyncRequest, + }, + types::AppId, +}; pub mod error; @@ -15,15 +18,20 @@ pub struct UbisyncClient { port: u16, selected_api_version: String, base_url: String, - jwt_token: String, + registration: AppRegistration, reqwest_client: Client, } +pub struct AppRegistration { + pub jwt_token: String, + pub app_id: AppId, +} + impl UbisyncClient { pub async fn init( host: &str, port: u16, - jwt_token: Option<&str>, + registration: Option, application_name: &str, application_description: &str, application_type: &str, @@ -43,8 +51,8 @@ impl UbisyncClient { .get(0) .expect("No available API version returned by ubisync node"); - let token = match jwt_token { - Some(t) => t.to_string(), + let registration = match registration { + Some(t) => t, None => { let response = http_client .put(Self::build_base_url(host, port, &selected_version) + "/app/register") @@ -59,11 +67,14 @@ impl UbisyncClient { if response.status() != StatusCode::OK { return Err(UbisyncError::AppRegistrationFailed); } - response + let parsed = response .json::() .await - .expect("Failed to extract JWT from app regstration request") - .token + .expect("Failed to extract JWT from app regstration request"); + AppRegistration { + jwt_token: parsed.token, + app_id: parsed.app_id, + } } }; @@ -72,15 +83,15 @@ impl UbisyncClient { port: port, selected_api_version: selected_version.to_string(), base_url: Self::build_base_url(host, port, selected_version), - jwt_token: token.to_string(), + registration, reqwest_client: http_client, }) } pub async fn create_default_pot(self) -> anyhow::Result { - let response = self.send(AppCreatePotRequest { - app_type: None, - }, ()).await?; + let response = self + .send(AppCreatePotRequest { app_type: None }, ()) + .await?; debug!("Created new pot with ID {:?}", response.pot_id); Ok(self) } @@ -98,7 +109,7 @@ impl UbisyncClient { request.method(), &(self.base_url.to_owned() + &request.path(parameters)), ) - .bearer_auth(&self.jwt_token) + .bearer_auth(&self.registration.jwt_token) .json(&request) .send() .await @@ -123,6 +134,10 @@ impl UbisyncClient { self.base_url = Self::build_base_url(&self.host, self.port, &self.selected_api_version); } + pub fn app_id(&self) -> AppId { + self.registration.app_id.clone() + } + fn build_base_url(host: &str, port: u16, api_version: &str) -> String { format!("http://{}:{}/{}", host, port, api_version) } diff --git a/ubisync/src/api/v0/app.rs b/ubisync/src/api/v0/app.rs index 845d2c6..9ad190f 100644 --- a/ubisync/src/api/v0/app.rs +++ b/ubisync/src/api/v0/app.rs @@ -19,21 +19,11 @@ use ubisync_lib::{ AppGetDefaultPotResponse, AppRegisterRequest, AppRegisterResponse, AppSetDefaultPotRequest, AppSetDefaultPotResponse, }, - types::PotId, + types::{AppId, PotId}, }; -use uuid::Uuid; use crate::state::ApiState; -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct AppId(Uuid); - -impl AppId { - pub fn new() -> Self { - AppId { 0: Uuid::new_v4() } - } -} - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct App { pub id: AppId, @@ -89,14 +79,17 @@ pub(super) async fn register( // Build JWT, respond let jwt = jsonwebtoken::encode( &Header::default(), - &JWTClaims { sub: id }, + &JWTClaims { sub: id.clone() }, &s.jwt_encoding_key(), ); match jwt { Ok(token) => ( StatusCode::OK, Json { - 0: AppRegisterResponse { token: token }, + 0: AppRegisterResponse { + token: token, + app_id: id, + }, }, ) .into_response(), diff --git a/ubisync/src/api/v0/element.rs b/ubisync/src/api/v0/element.rs index ff664b6..94210a2 100644 --- a/ubisync/src/api/v0/element.rs +++ b/ubisync/src/api/v0/element.rs @@ -14,10 +14,9 @@ use ubisync_lib::{ ElementCreateRequest, ElementCreateResponse, ElementGetResponse, ElementRemoveResponse, ElementSetRequest, ElementSetResponse, }, - types::ElementId, + types::{AppId, ElementId}, }; -use super::app::AppId; pub(super) async fn get( Path(id): Path, diff --git a/ubisync/src/api/v0/events.rs b/ubisync/src/api/v0/events.rs new file mode 100644 index 0000000..4ab66f3 --- /dev/null +++ b/ubisync/src/api/v0/events.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, + Extension, Json, +}; +use ubisync_lib::{api::events::{PollEventsRequest, PollEventsResponse}, types::AppId}; + +use crate::state::ApiState; + + +pub(super) async fn poll( + app: Extension, + s: Extension>, + Json(req): Json, +) -> Response { + match s.events(app.0, req.timeout, req.accumulate).await { + Ok(events) => ( + StatusCode::OK, + Json { + 0: PollEventsResponse { events }, + }, + ) + .into_response(), + Err(_) => ( + StatusCode::CONFLICT, + "Another poll request is currently being handled", + ) + .into_response(), + } +} diff --git a/ubisync/src/api/v0/mod.rs b/ubisync/src/api/v0/mod.rs index f868897..552b9b6 100644 --- a/ubisync/src/api/v0/mod.rs +++ b/ubisync/src/api/v0/mod.rs @@ -9,6 +9,7 @@ use crate::state::ApiState; pub mod app; pub mod element; +pub mod events; pub fn get_router(state: ApiState) -> Router { Router::new() @@ -23,6 +24,7 @@ pub fn get_router(state: ApiState) -> Router { "/element/:id", get(element::get).post(element::set).delete(element::remove), ) + .route("/events", get(events::poll)) .layer(from_fn(app::auth)) // public / unauthenticated routes .route("/app/register", put(app::register)) diff --git a/ubisync/src/comm/message_processor.rs b/ubisync/src/comm/message_processor.rs index a8e7bf6..3041f48 100644 --- a/ubisync/src/comm/message_processor.rs +++ b/ubisync/src/comm/message_processor.rs @@ -27,8 +27,6 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) { } MessageContent::AddPot { id, app_type } => { state.add_pot(id, app_type).expect("State failed"); - //TODO: remove when setting default pot properly is possible - let _ = state.set_default_pot_for_all_apps(id); } } } diff --git a/ubisync/src/lib.rs b/ubisync/src/lib.rs index ec7dc0d..e83691f 100644 --- a/ubisync/src/lib.rs +++ b/ubisync/src/lib.rs @@ -4,13 +4,18 @@ use anyhow::bail; use api::{Api, ApiBuilder}; use comm::CommHandle; use config::Config; +use node_events::UbisyncNodeEvent; use i2p::net::I2pSocketAddr; use state::{ApiState, CommState, State}; -use ubisync_lib::{peer::Peer, types::PeerId}; +use ubisync_lib::{ + peer::Peer, + types::{AppId, PeerId, PotId}, +}; pub mod api; pub mod comm; pub mod config; +pub mod node_events; pub mod state; pub struct Ubisync { @@ -39,6 +44,13 @@ impl Ubisync { }) } + pub fn set_node_event_callback(&self, cb: CallbackFunction, node: Arc) + where + CallbackFunction: Fn(UbisyncNodeEvent, Arc) + Send + Sync + 'static, + { + self.state_handle.set_node_event_callback(cb, node); + } + pub fn api(&self) -> Arc { self.api.clone() } @@ -62,4 +74,8 @@ impl Ubisync { pub fn get_destination(&self) -> anyhow::Result { self.comm_handle.i2p_address() } + + pub fn add_pot_member(&self, pot: &PotId, app: &AppId) -> anyhow::Result<()> { + self.state_handle.add_pot_member(pot, app) + } } diff --git a/ubisync/src/node_events/mod.rs b/ubisync/src/node_events/mod.rs new file mode 100644 index 0000000..756ac24 --- /dev/null +++ b/ubisync/src/node_events/mod.rs @@ -0,0 +1,10 @@ +use ubisync_lib::types::PotId; + + + +pub enum UbisyncNodeEvent { + NewPot { + id: PotId, + app_type: String, + } +} \ No newline at end of file diff --git a/ubisync/src/state/api_state.rs b/ubisync/src/state/api_state.rs index 06c65e7..b80c795 100644 --- a/ubisync/src/state/api_state.rs +++ b/ubisync/src/state/api_state.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use anyhow::Error; use cozo::DbInstance; @@ -6,14 +6,12 @@ use jsonwebtoken::{DecodingKey, EncodingKey, Validation}; use serde_with::chrono::Utc; use tracing::debug; use ubisync_lib::{ + api::events::AppEvent, messages::MessageContent, - types::{Element, ElementContent, ElementId, Pot, PotId}, + types::{AppId, Element, ElementContent, ElementId, Pot, PotId}, }; -use crate::{ - api::v0::app::{App, AppId}, - state::queries, -}; +use crate::{api::v0::app::App, state::queries}; use super::State; @@ -28,12 +26,14 @@ impl ApiState { pub fn new(state: Arc, jwt_secret: &str) -> Self { let mut validation = Validation::default(); validation.set_required_spec_claims(&vec!["sub"]); - ApiState { + let api_state = ApiState { state: state, jwt_encoding_key: EncodingKey::from_secret(jwt_secret.as_bytes()), jwt_decoding_key: DecodingKey::from_secret(jwt_secret.as_bytes()), jwt_validation: validation, - } + }; + + api_state } pub fn add_app(&self, name: &str, description: &str, app_type: &str) -> anyhow::Result { @@ -125,10 +125,6 @@ impl ApiState { } } - fn db(&self) -> &DbInstance { - &self.state.db - } - pub fn jwt_encoding_key(&self) -> &EncodingKey { &self.jwt_encoding_key } @@ -140,6 +136,41 @@ impl ApiState { pub fn jwt_validation(&self) -> &Validation { &self.jwt_validation } + + pub async fn events( + &self, + app: AppId, + timeout: u16, + accumulate: u16, + ) -> anyhow::Result> { + match self.state.get_event_receiver(&app) { + Ok(rx_mutex) => { + if let Ok(receiver) = rx_mutex.lock() { + let mut events = vec![]; + loop { + //TODO: Use recv_deadline once it's stable + match receiver.recv_timeout(Duration::from_secs(timeout.into())) { + Ok(ev) => { + debug!("ApiState received event {:?}", &ev); + events.push(ev); + if events.len() >= accumulate.into() { + return Ok(events); + } + } + Err(_) => return Ok(events), + } + } + } else { + Err(Error::msg("Failed to lock on receiver mutex")) + } + }, + Err(e) => Err(e), + } + } + + fn db(&self) -> &DbInstance { + &self.state.db + } } #[cfg(test)] diff --git a/ubisync/src/state/comm_state.rs b/ubisync/src/state/comm_state.rs index cd0b2e7..d310a1f 100644 --- a/ubisync/src/state/comm_state.rs +++ b/ubisync/src/state/comm_state.rs @@ -1,19 +1,18 @@ use std::sync::Arc; -use anyhow::Error; use cozo::DbInstance; use tracing::debug; use ubisync_lib::{ + api::events::AppEvent, peer::Peer, types::{Element, ElementContent, ElementId, MessageId, PeerId, PotId}, }; -use crate::state::queries; +use crate::{node_events::UbisyncNodeEvent, state::queries}; use super::State; -//TODO: Notify API about changes pub struct CommState { state: Arc, } @@ -54,6 +53,19 @@ impl CommState { queries::elements::set_latest_message(self.db(), id, Some(latest_message.to_owned()))?; debug!("Updated element {{{}}}", &id.to_string()); + let _ = self.state.emit_app_event( + queries::pots::get_pot_members( + self.db(), + &queries::elements::get(self.db(), &id)? + .pot() + .as_ref() + .unwrap(), + )? + .get(0) + .unwrap(), + AppEvent::ElementUpdate { id: id.clone() }, + ); + Ok(()) } @@ -80,21 +92,14 @@ impl CommState { } pub fn add_pot(&self, id: &PotId, app_type: &str) -> anyhow::Result<()> { - queries::pots::add(self.db(), id, app_type) - } + queries::pots::add(self.db(), id, app_type)?; - pub fn set_default_pot_for_all_apps(&self, id: &PotId) -> anyhow::Result<()> { - if let Ok(apps) = queries::apps::get_all_ids(self.db()) { - for app in apps { - let res = queries::apps::set_default_pot(self.db(), &app, id); - if let Ok(_) = res { - debug!("Set {:?} as default for app {:?}", id, &app); - } - } - Ok(()) - } else { - Err(Error::msg("Could not fetch list of all apps")) - } + let _ = self.state.emit_node_event(UbisyncNodeEvent::NewPot { + id: id.clone(), + app_type: app_type.to_string(), + }); + + Ok(()) } fn db(&self) -> &DbInstance { diff --git a/ubisync/src/state/mod.rs b/ubisync/src/state/mod.rs index c5fe79c..8b77bf4 100644 --- a/ubisync/src/state/mod.rs +++ b/ubisync/src/state/mod.rs @@ -1,8 +1,14 @@ -use std::sync::{Arc, RwLock}; +use std::{ + collections::HashMap, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, RwLock, + }, +}; use anyhow::Error; use cozo::DbInstance; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; mod api_state; mod comm_state; @@ -12,16 +18,19 @@ mod schema; pub use api_state::ApiState; pub use comm_state::CommState; use ubisync_lib::{ + api::events::AppEvent, messages::{Message, MessageContent}, peer::Peer, - types::{Element, ElementContent, ElementId, Tag}, + types::{AppId, Element, ElementContent, ElementId, PotId, Tag}, }; -use crate::comm::CommHandle; +use crate::{comm::CommHandle, node_events::UbisyncNodeEvent, Ubisync}; pub struct State { db: DbInstance, comm_handle: RwLock>>, + app_event_channels: RwLock, Arc>>)>>, + node_event_callback: RwLock>>, } impl State { @@ -32,11 +41,17 @@ impl State { }; match db { Ok(d) => { + //TODO: change "add schema" to "ensure newest schema" schema::add_schema(&d)?; - Ok(Arc::new(State { + + let state = Arc::new(State { db: d, comm_handle: RwLock::new(None), - })) + app_event_channels: Default::default(), + node_event_callback: RwLock::new(None), + }); + state.init_event_channels(); + Ok(state) } Err(e) => Err(Error::msg(format!("{:?}", e))), } @@ -50,6 +65,18 @@ impl State { .expect("Could not set state's CommHandle") = Some(handle); } + pub fn set_node_event_callback(&self, cb: T, ubi: Arc) + where + T: Fn(UbisyncNodeEvent, Arc) + Send + Sync + 'static, + { + match self.node_event_callback.write() { + Ok(mut writeguard) => { + *writeguard = Some(Box::new(move |ev: UbisyncNodeEvent| (cb)(ev, ubi.clone()))) + } + Err(e) => warn!("Failed to set node event callback: {:?}", e), + } + } + pub fn set_element_content( &self, element_id: &ElementId, @@ -99,6 +126,86 @@ impl State { queries::peers::get(&self.db) } + pub fn add_pot_member(&self, pot: &PotId, app: &AppId) -> anyhow::Result<()> { + debug!("Pot: {:?}", queries::pots::get(&self.db, pot)); + match queries::pots::add_pot_member(&self.db, pot, app) { + Ok(_) => { + self.emit_app_event( + app, + AppEvent::NewPot { + id: pot.clone(), + app_type: queries::pots::get(&self.db, pot)?.app_type, + }, + ); + Ok(()) + } + Err(e) => Err(e), + } + } + + pub fn get_event_receiver(&self, app: &AppId) -> anyhow::Result>>> { + self.create_event_channel_if_not_exists(app.clone()); + match self.app_event_channels.read() { + Ok(map) => match map.get(&app) { + Some((_, rx)) => Ok(rx.clone()), + None => Err(Error::msg("Failed to get event receiver")), + }, + Err(_) => Err(Error::msg("Failed to lock on event_channels")), + } + } + + pub fn emit_node_event(&self, ev: UbisyncNodeEvent) -> anyhow::Result<()> { + debug!("1"); + match self.node_event_callback.read() { + Ok(readguard) => { + debug!("2"); + debug!("{:?}", readguard.is_some()); + // If a callback is set, call it. + if let Some(cb) = readguard.as_ref() { + debug!("3"); + (*cb)(ev); + } + // Whether a callback is set or not, return Ok(_) + Ok(()) + } + Err(_) => Err(Error::msg("Failed to lock on node event callback function")), + } + } + + fn emit_app_event(&self, app: &AppId, ev: AppEvent) { + if let Ok(channels) = self.app_event_channels.read() { + if let Some((sender, _)) = channels.get(app) { + debug!("Emitting app event to {:?}", &app); + if let Err(e) = sender.send(ev) { + debug!("Emitting app event failed: {:?}", e); + } + } + } + else { + debug!("Failed to get channels"); + } + } + + fn create_event_channel_if_not_exists(&self, app: AppId) { + if let Ok(mut map) = self.app_event_channels.write() { + if !map.contains_key(&app) { + debug!( + "No event channel for {:?} exists, creating a new one.", + &app + ); + let (tx, rx) = channel(); + map.insert(app, (tx, Arc::new(Mutex::new(rx)))); + } + } + } + + fn init_event_channels(&self) { + let app_ids = queries::apps::get_all_ids(&self.db).unwrap_or(vec![]); + for app in app_ids { + self.create_event_channel_if_not_exists(app); + } + } + fn send_to_peers(&self, ct: MessageContent) { match self.comm_handle.read() { Ok(opt) => { diff --git a/ubisync/src/state/queries/apps.rs b/ubisync/src/state/queries/apps.rs index 8d61197..0d72660 100644 --- a/ubisync/src/state/queries/apps.rs +++ b/ubisync/src/state/queries/apps.rs @@ -4,12 +4,9 @@ use anyhow::{bail, Error}; use cozo::{DataValue, DbInstance, Num, ScriptMutability}; use serde_with::chrono::{DateTime, Utc}; use tracing::warn; -use ubisync_lib::types::{Pot, PotId}; +use ubisync_lib::types::{AppId, Pot, PotId}; -use crate::{ - api::v0::app::{App, AppId}, - run_query, -}; +use crate::{api::v0::app::App, run_query}; pub fn add( db: &DbInstance, @@ -239,12 +236,9 @@ mod tests { use cozo::DbInstance; use serde_with::chrono::Utc; use tracing::{debug, Level}; - use ubisync_lib::types::PotId; + use ubisync_lib::types::{AppId, PotId}; - use crate::{ - api::v0::app::AppId, - state::{queries::pots, schema}, - }; + use crate::state::{queries::pots, schema}; #[test] pub fn default_pot() { diff --git a/ubisync/src/state/queries/elements.rs b/ubisync/src/state/queries/elements.rs index dcd3988..0e79d67 100644 --- a/ubisync/src/state/queries/elements.rs +++ b/ubisync/src/state/queries/elements.rs @@ -3,15 +3,14 @@ use std::collections::BTreeMap; use anyhow::{bail, Error}; use cozo::{DataValue, DbInstance, JsonData, ScriptMutability}; use serde_json::Value; -use tracing::{debug, error}; +use tracing::error; use crate::{ - api::v0::app::AppId, run_query, state::{Element, ElementContent, ElementId}, }; -use ubisync_lib::types::{MessageId, PotId, Tag}; +use ubisync_lib::types::{AppId, MessageId, PotId, Tag}; pub fn add( db: &DbInstance, @@ -111,7 +110,6 @@ pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result { match result { Ok(val) => { if let Some(firstrow) = val.rows.first() { - debug!("db result: {:?}", &firstrow.as_slice()); if let [DataValue::Json(JsonData(content)), latest_message, DataValue::Bool(local_changes), DataValue::Str(pot_id)] = firstrow.as_slice() { @@ -146,6 +144,7 @@ pub fn get_app_access(db: &DbInstance, id: &ElementId, app: &AppId) -> anyhow::R DataValue::Str(serde_json::to_string(&app)?.into()), ); + //TODO id in script is not bound to $id parameter let result = db.run_script( " memberships[pot_id] := *pot_memberships{pot_id, app_id} diff --git a/ubisync/src/state/queries/pots.rs b/ubisync/src/state/queries/pots.rs index ff64f5a..c46b803 100644 --- a/ubisync/src/state/queries/pots.rs +++ b/ubisync/src/state/queries/pots.rs @@ -2,9 +2,10 @@ use std::collections::BTreeMap; use anyhow::Error; use cozo::{DataValue, DbInstance, ScriptMutability}; -use ubisync_lib::types::{Pot, PotId}; +use tracing::debug; +use ubisync_lib::types::{AppId, Pot, PotId}; -use crate::{api::v0::app::AppId, run_query}; +use crate::run_query; pub fn add(db: &DbInstance, id: &PotId, app_type: &str) -> anyhow::Result<()> { let params = vec![ @@ -34,7 +35,7 @@ pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result { ); let result = db.run_script( " - ?[id, app_type] := *pots{id, app_type} + ?[app_type] := *pots[$id, app_type] ", params, cozo::ScriptMutability::Immutable, @@ -42,10 +43,10 @@ pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result { match result { Ok(rows) => { if let Some(firstrow) = rows.rows.first() { - if let [DataValue::Str(pot_id), DataValue::Str(app_type)] = firstrow.as_slice() { + if let [DataValue::Str(app_type)] = firstrow.as_slice() { Ok(Pot::new( - serde_json::from_str(pot_id)?, - app_type.to_string(), + id.clone(), + serde_json::from_str(app_type)?, )) } else { Err(Error::msg("Could not parse result from query")) @@ -58,6 +59,23 @@ pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result { } } +pub fn add_pot_member(db: &DbInstance, pot: &PotId, app: &AppId) -> anyhow::Result<()> { + let params = vec![ + ("pot_id", DataValue::Str(serde_json::to_string(pot)?.into())), + ("app_id", DataValue::Str(serde_json::to_string(app)?.into())), + ]; + + match run_query!( + &db, + ":insert pot_memberships {pot_id, app_id}", + params, + ScriptMutability::Mutable + ) { + Ok(_) => Ok(()), + Err(report) => Err(Error::msg(format!("Query failed: {}", report))), + } +} + pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result> { let mut params = BTreeMap::new(); params.insert( @@ -66,7 +84,7 @@ pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result ); let result = db.run_script( " - ?[app_id] := *pot_memberships{id, app_id} + ?[app_id] := *pot_memberships{id: $id, app_id} ", params, cozo::ScriptMutability::Immutable, @@ -91,3 +109,22 @@ pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result Err(report) => Err(Error::msg(format!("Query failed: {}", report))), } } + +#[cfg(test)] +mod tests { + use cozo::DbInstance; + use ubisync_lib::types::PotId; + + use crate::state::schema; + + #[test] + pub fn add_and_get() { + let db = DbInstance::new("mem", "", Default::default()).unwrap(); + schema::add_schema(&db).unwrap(); + let pot_id = PotId::new(); + super::add(&db, &pot_id, "app_type").unwrap(); + let returned = super::get(&db, &pot_id).unwrap(); + assert_eq!(pot_id, returned.id); + assert_eq!("app_type", returned.app_type); + } +} \ No newline at end of file diff --git a/ubisync/tests/api.rs b/ubisync/tests/api.rs index 6ec0206..8fb3bad 100644 --- a/ubisync/tests/api.rs +++ b/ubisync/tests/api.rs @@ -1,9 +1,12 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tracing::{debug, warn, Level}; -use ubisync::{config::Config, Ubisync}; +use ubisync::{config::Config, node_events::UbisyncNodeEvent, Ubisync}; use ubisync_lib::{ - api::element::{ElementCreateRequest, ElementGetRequest}, + api::{ + element::{ElementCreateRequest, ElementGetRequest}, + events::PollEventsRequest, + }, types::{Element, ElementContent}, }; use ubisync_sdk::UbisyncClient; @@ -19,7 +22,7 @@ async fn two_nodes_element_creation() { let mut c2 = Config::default(); c2.api_config.port = Some(9982); let ubi1 = Ubisync::new(&Config::default()).await.unwrap(); - let ubi2 = Ubisync::new(&c2).await.unwrap(); + let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap()); ubi1.add_peer_from_id(ubi2.get_destination().unwrap().into()) .unwrap(); @@ -35,6 +38,17 @@ async fn two_nodes_element_creation() { .await .unwrap(); + let app_id2 = api_client2.app_id(); + ubi2.set_node_event_callback( + move |ev, node| { + if let UbisyncNodeEvent::NewPot { id, app_type } = ev { + debug!("callback called"); + node.add_pot_member(&id, &app_id2).unwrap(); + } + }, + ubi2.clone(), + ); + tokio::time::sleep(Duration::from_millis(5000)).await; let test_element_content = ElementContent::Text("Text".to_string()); @@ -65,3 +79,71 @@ async fn two_nodes_element_creation() { assert_eq!(&test_element_content, received_element.content()); std::process::exit(0); } + +#[tokio::test(flavor = "multi_thread")] +async fn two_nodes_api_event() { + tracing_subscriber::fmt() + .pretty() + .with_max_level(Level::DEBUG) + .init(); + + // Two nodes need to bind to different ports + let mut c2 = Config::default(); + c2.api_config.port = Some(9982); + let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap()); + let ubi2 = Ubisync::new(&c2).await.unwrap(); + ubi2.add_peer_from_id(ubi1.get_destination().unwrap().into()) + .unwrap(); + + let api_client1 = + UbisyncClient::init("localhost", 9981, None, "App", "Long desc", "test-app-type") + .await + .unwrap(); + + let app_id1 = api_client1.app_id(); + ubi1.set_node_event_callback( + move |ev, node| { + debug!("callback called"); + if let UbisyncNodeEvent::NewPot { id, app_type } = ev { + node.add_pot_member(&id, &app_id1).unwrap(); + } + }, + ubi1.clone(), + ); + + tokio::spawn(async move { + loop { + if let Ok(events) = api_client1 + .send( + PollEventsRequest { + timeout: 60, + accumulate: 1, + }, + (), + ) + .await + { + if events.events.len() > 0 { + debug!( + "Received event from local ubisync node: {:?}", + events.events.get(0).unwrap() + ); + std::process::exit(0); + } + + debug!("Still waiting for an event from the local ubisync node"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + }); + + let _api_client2 = + UbisyncClient::init("localhost", 9982, None, "App", "Long desc", "test-app-type") + .await + .unwrap() + .create_default_pot() + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(10000)).await; +}