Event system

- App events, can be polled by apps using the HTTP API
- Node events, can be processed by a node implementation by registering a callback in the `Ubisync` object
- Some further additions, like adding pot members etc. to test these new event functions
This commit is contained in:
Philip (a-0) 2024-01-24 17:53:50 +01:00
parent 76f6a6b67b
commit d258060769
24 changed files with 479 additions and 96 deletions

1
Cargo.lock generated
View file

@ -3094,6 +3094,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"tokio",
"tracing",
"tracing-subscriber",
"ubisync-lib",

View file

@ -1,7 +1,7 @@
use reqwest::Method;
use serde::{Deserialize, Serialize};
use crate::types::{PotId, Pot};
use crate::types::{AppId, Pot, PotId};
use super::UbisyncRequest;
@ -15,6 +15,7 @@ pub struct AppRegisterRequest {
#[derive(Serialize, Deserialize)]
pub struct AppRegisterResponse {
pub token: String,
pub app_id: AppId,
}
impl UbisyncRequest for AppRegisterRequest {

View file

@ -0,0 +1,42 @@
use reqwest::Method;
use serde::{Serialize, Deserialize};
use crate::types::{ElementId, PotId};
use super::UbisyncRequest;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum AppEvent {
NewPot {
id: PotId,
app_type: String,
},
ElementUpdate {
id: ElementId,
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PollEventsRequest {
pub timeout: u16,
pub accumulate: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PollEventsResponse {
pub events: Vec<AppEvent>,
}
impl UbisyncRequest for PollEventsRequest {
type PathParameters = ();
type Response = PollEventsResponse;
fn method(&self) -> reqwest::Method {
Method::GET
}
fn path(&self, _: Self::PathParameters) -> String {
"/events".to_string()
}
}

View file

@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
pub mod app;
pub mod element;
pub mod events;
/// Any struct defining a request body for the ubisync API must implement this trait
/// It is used both by the client in the SDK and by the API logic in the ubisync node

View file

@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
pub struct AppId(Uuid);
impl AppId {
pub fn new() -> Self {
AppId { 0: Uuid::new_v4() }
}
}

View file

@ -43,4 +43,7 @@ impl Element {
pub fn content(&self) -> &ElementContent {
&self.content
}
pub fn pot(&self) -> &Option<PotId> {
&self.pot
}
}

View file

@ -1,3 +1,6 @@
mod app_id;
pub use app_id::AppId;
mod element_content;
pub use element_content::ElementContent;

View file

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct PotId(Uuid);
impl PotId {
pub fn new() -> Self {

View file

@ -10,6 +10,7 @@ anyhow = "1.0.79"
reqwest = { version = "0.11.23", features = [ "json" ] }
serde = { version = "1.0.166", features = [ "derive" ] }
serde_json = "1.0.99"
tokio = "1.35.1"
tracing = "0.1.37"
tracing-subscriber = "0.3.17"

View file

@ -2,11 +2,14 @@ use anyhow::anyhow;
use error::UbisyncError;
use reqwest::{Client, StatusCode};
use tracing::debug;
use ubisync_lib::api::{
app::{AppRegisterRequest, AppRegisterResponse, AppCreatePotRequest},
UbisyncRequest,
};
pub use ubisync_lib::*;
use ubisync_lib::{
api::{
app::{AppCreatePotRequest, AppRegisterRequest, AppRegisterResponse},
UbisyncRequest,
},
types::AppId,
};
pub mod error;
@ -15,15 +18,20 @@ pub struct UbisyncClient {
port: u16,
selected_api_version: String,
base_url: String,
jwt_token: String,
registration: AppRegistration,
reqwest_client: Client,
}
pub struct AppRegistration {
pub jwt_token: String,
pub app_id: AppId,
}
impl UbisyncClient {
pub async fn init(
host: &str,
port: u16,
jwt_token: Option<&str>,
registration: Option<AppRegistration>,
application_name: &str,
application_description: &str,
application_type: &str,
@ -43,8 +51,8 @@ impl UbisyncClient {
.get(0)
.expect("No available API version returned by ubisync node");
let token = match jwt_token {
Some(t) => t.to_string(),
let registration = match registration {
Some(t) => t,
None => {
let response = http_client
.put(Self::build_base_url(host, port, &selected_version) + "/app/register")
@ -59,11 +67,14 @@ impl UbisyncClient {
if response.status() != StatusCode::OK {
return Err(UbisyncError::AppRegistrationFailed);
}
response
let parsed = response
.json::<AppRegisterResponse>()
.await
.expect("Failed to extract JWT from app regstration request")
.token
.expect("Failed to extract JWT from app regstration request");
AppRegistration {
jwt_token: parsed.token,
app_id: parsed.app_id,
}
}
};
@ -72,15 +83,15 @@ impl UbisyncClient {
port: port,
selected_api_version: selected_version.to_string(),
base_url: Self::build_base_url(host, port, selected_version),
jwt_token: token.to_string(),
registration,
reqwest_client: http_client,
})
}
pub async fn create_default_pot(self) -> anyhow::Result<UbisyncClient> {
let response = self.send(AppCreatePotRequest {
app_type: None,
}, ()).await?;
let response = self
.send(AppCreatePotRequest { app_type: None }, ())
.await?;
debug!("Created new pot with ID {:?}", response.pot_id);
Ok(self)
}
@ -98,7 +109,7 @@ impl UbisyncClient {
request.method(),
&(self.base_url.to_owned() + &request.path(parameters)),
)
.bearer_auth(&self.jwt_token)
.bearer_auth(&self.registration.jwt_token)
.json(&request)
.send()
.await
@ -123,6 +134,10 @@ impl UbisyncClient {
self.base_url = Self::build_base_url(&self.host, self.port, &self.selected_api_version);
}
pub fn app_id(&self) -> AppId {
self.registration.app_id.clone()
}
fn build_base_url(host: &str, port: u16, api_version: &str) -> String {
format!("http://{}:{}/{}", host, port, api_version)
}

View file

@ -19,21 +19,11 @@ use ubisync_lib::{
AppGetDefaultPotResponse, AppRegisterRequest, AppRegisterResponse, AppSetDefaultPotRequest,
AppSetDefaultPotResponse,
},
types::PotId,
types::{AppId, PotId},
};
use uuid::Uuid;
use crate::state::ApiState;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AppId(Uuid);
impl AppId {
pub fn new() -> Self {
AppId { 0: Uuid::new_v4() }
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct App {
pub id: AppId,
@ -89,14 +79,17 @@ pub(super) async fn register(
// Build JWT, respond
let jwt = jsonwebtoken::encode(
&Header::default(),
&JWTClaims { sub: id },
&JWTClaims { sub: id.clone() },
&s.jwt_encoding_key(),
);
match jwt {
Ok(token) => (
StatusCode::OK,
Json {
0: AppRegisterResponse { token: token },
0: AppRegisterResponse {
token: token,
app_id: id,
},
},
)
.into_response(),

View file

@ -14,10 +14,9 @@ use ubisync_lib::{
ElementCreateRequest, ElementCreateResponse, ElementGetResponse, ElementRemoveResponse,
ElementSetRequest, ElementSetResponse,
},
types::ElementId,
types::{AppId, ElementId},
};
use super::app::AppId;
pub(super) async fn get(
Path(id): Path<ElementId>,

View file

@ -0,0 +1,32 @@
use std::sync::Arc;
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Extension, Json,
};
use ubisync_lib::{api::events::{PollEventsRequest, PollEventsResponse}, types::AppId};
use crate::state::ApiState;
pub(super) async fn poll(
app: Extension<AppId>,
s: Extension<Arc<ApiState>>,
Json(req): Json<PollEventsRequest>,
) -> Response {
match s.events(app.0, req.timeout, req.accumulate).await {
Ok(events) => (
StatusCode::OK,
Json {
0: PollEventsResponse { events },
},
)
.into_response(),
Err(_) => (
StatusCode::CONFLICT,
"Another poll request is currently being handled",
)
.into_response(),
}
}

View file

@ -9,6 +9,7 @@ use crate::state::ApiState;
pub mod app;
pub mod element;
pub mod events;
pub fn get_router(state: ApiState) -> Router {
Router::new()
@ -23,6 +24,7 @@ pub fn get_router(state: ApiState) -> Router {
"/element/:id",
get(element::get).post(element::set).delete(element::remove),
)
.route("/events", get(events::poll))
.layer(from_fn(app::auth))
// public / unauthenticated routes
.route("/app/register", put(app::register))

View file

@ -27,8 +27,6 @@ pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
}
MessageContent::AddPot { id, app_type } => {
state.add_pot(id, app_type).expect("State failed");
//TODO: remove when setting default pot properly is possible
let _ = state.set_default_pot_for_all_apps(id);
}
}
}

View file

@ -4,13 +4,18 @@ use anyhow::bail;
use api::{Api, ApiBuilder};
use comm::CommHandle;
use config::Config;
use node_events::UbisyncNodeEvent;
use i2p::net::I2pSocketAddr;
use state::{ApiState, CommState, State};
use ubisync_lib::{peer::Peer, types::PeerId};
use ubisync_lib::{
peer::Peer,
types::{AppId, PeerId, PotId},
};
pub mod api;
pub mod comm;
pub mod config;
pub mod node_events;
pub mod state;
pub struct Ubisync {
@ -39,6 +44,13 @@ impl Ubisync {
})
}
pub fn set_node_event_callback<CallbackFunction>(&self, cb: CallbackFunction, node: Arc<Ubisync>)
where
CallbackFunction: Fn(UbisyncNodeEvent, Arc<Ubisync>) + Send + Sync + 'static,
{
self.state_handle.set_node_event_callback(cb, node);
}
pub fn api(&self) -> Arc<Api> {
self.api.clone()
}
@ -62,4 +74,8 @@ impl Ubisync {
pub fn get_destination(&self) -> anyhow::Result<I2pSocketAddr> {
self.comm_handle.i2p_address()
}
pub fn add_pot_member(&self, pot: &PotId, app: &AppId) -> anyhow::Result<()> {
self.state_handle.add_pot_member(pot, app)
}
}

View file

@ -0,0 +1,10 @@
use ubisync_lib::types::PotId;
pub enum UbisyncNodeEvent {
NewPot {
id: PotId,
app_type: String,
}
}

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use anyhow::Error;
use cozo::DbInstance;
@ -6,14 +6,12 @@ use jsonwebtoken::{DecodingKey, EncodingKey, Validation};
use serde_with::chrono::Utc;
use tracing::debug;
use ubisync_lib::{
api::events::AppEvent,
messages::MessageContent,
types::{Element, ElementContent, ElementId, Pot, PotId},
types::{AppId, Element, ElementContent, ElementId, Pot, PotId},
};
use crate::{
api::v0::app::{App, AppId},
state::queries,
};
use crate::{api::v0::app::App, state::queries};
use super::State;
@ -28,12 +26,14 @@ impl ApiState {
pub fn new(state: Arc<State>, jwt_secret: &str) -> Self {
let mut validation = Validation::default();
validation.set_required_spec_claims(&vec!["sub"]);
ApiState {
let api_state = ApiState {
state: state,
jwt_encoding_key: EncodingKey::from_secret(jwt_secret.as_bytes()),
jwt_decoding_key: DecodingKey::from_secret(jwt_secret.as_bytes()),
jwt_validation: validation,
}
};
api_state
}
pub fn add_app(&self, name: &str, description: &str, app_type: &str) -> anyhow::Result<AppId> {
@ -125,10 +125,6 @@ impl ApiState {
}
}
fn db(&self) -> &DbInstance {
&self.state.db
}
pub fn jwt_encoding_key(&self) -> &EncodingKey {
&self.jwt_encoding_key
}
@ -140,6 +136,41 @@ impl ApiState {
pub fn jwt_validation(&self) -> &Validation {
&self.jwt_validation
}
pub async fn events(
&self,
app: AppId,
timeout: u16,
accumulate: u16,
) -> anyhow::Result<Vec<AppEvent>> {
match self.state.get_event_receiver(&app) {
Ok(rx_mutex) => {
if let Ok(receiver) = rx_mutex.lock() {
let mut events = vec![];
loop {
//TODO: Use recv_deadline once it's stable
match receiver.recv_timeout(Duration::from_secs(timeout.into())) {
Ok(ev) => {
debug!("ApiState received event {:?}", &ev);
events.push(ev);
if events.len() >= accumulate.into() {
return Ok(events);
}
}
Err(_) => return Ok(events),
}
}
} else {
Err(Error::msg("Failed to lock on receiver mutex"))
}
},
Err(e) => Err(e),
}
}
fn db(&self) -> &DbInstance {
&self.state.db
}
}
#[cfg(test)]

View file

@ -1,19 +1,18 @@
use std::sync::Arc;
use anyhow::Error;
use cozo::DbInstance;
use tracing::debug;
use ubisync_lib::{
api::events::AppEvent,
peer::Peer,
types::{Element, ElementContent, ElementId, MessageId, PeerId, PotId},
};
use crate::state::queries;
use crate::{node_events::UbisyncNodeEvent, state::queries};
use super::State;
//TODO: Notify API about changes
pub struct CommState {
state: Arc<State>,
}
@ -54,6 +53,19 @@ impl CommState {
queries::elements::set_latest_message(self.db(), id, Some(latest_message.to_owned()))?;
debug!("Updated element {{{}}}", &id.to_string());
let _ = self.state.emit_app_event(
queries::pots::get_pot_members(
self.db(),
&queries::elements::get(self.db(), &id)?
.pot()
.as_ref()
.unwrap(),
)?
.get(0)
.unwrap(),
AppEvent::ElementUpdate { id: id.clone() },
);
Ok(())
}
@ -80,21 +92,14 @@ impl CommState {
}
pub fn add_pot(&self, id: &PotId, app_type: &str) -> anyhow::Result<()> {
queries::pots::add(self.db(), id, app_type)
}
queries::pots::add(self.db(), id, app_type)?;
pub fn set_default_pot_for_all_apps(&self, id: &PotId) -> anyhow::Result<()> {
if let Ok(apps) = queries::apps::get_all_ids(self.db()) {
for app in apps {
let res = queries::apps::set_default_pot(self.db(), &app, id);
if let Ok(_) = res {
debug!("Set {:?} as default for app {:?}", id, &app);
}
}
Ok(())
} else {
Err(Error::msg("Could not fetch list of all apps"))
}
let _ = self.state.emit_node_event(UbisyncNodeEvent::NewPot {
id: id.clone(),
app_type: app_type.to_string(),
});
Ok(())
}
fn db(&self) -> &DbInstance {

View file

@ -1,8 +1,14 @@
use std::sync::{Arc, RwLock};
use std::{
collections::HashMap,
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex, RwLock,
},
};
use anyhow::Error;
use cozo::DbInstance;
use tracing::{debug, error};
use tracing::{debug, error, warn};
mod api_state;
mod comm_state;
@ -12,16 +18,19 @@ mod schema;
pub use api_state::ApiState;
pub use comm_state::CommState;
use ubisync_lib::{
api::events::AppEvent,
messages::{Message, MessageContent},
peer::Peer,
types::{Element, ElementContent, ElementId, Tag},
types::{AppId, Element, ElementContent, ElementId, PotId, Tag},
};
use crate::comm::CommHandle;
use crate::{comm::CommHandle, node_events::UbisyncNodeEvent, Ubisync};
pub struct State {
db: DbInstance,
comm_handle: RwLock<Option<Arc<CommHandle>>>,
app_event_channels: RwLock<HashMap<AppId, (Sender<AppEvent>, Arc<Mutex<Receiver<AppEvent>>>)>>,
node_event_callback: RwLock<Option<Box<dyn Fn(UbisyncNodeEvent) + Send + Sync>>>,
}
impl State {
@ -32,11 +41,17 @@ impl State {
};
match db {
Ok(d) => {
//TODO: change "add schema" to "ensure newest schema"
schema::add_schema(&d)?;
Ok(Arc::new(State {
let state = Arc::new(State {
db: d,
comm_handle: RwLock::new(None),
}))
app_event_channels: Default::default(),
node_event_callback: RwLock::new(None),
});
state.init_event_channels();
Ok(state)
}
Err(e) => Err(Error::msg(format!("{:?}", e))),
}
@ -50,6 +65,18 @@ impl State {
.expect("Could not set state's CommHandle") = Some(handle);
}
pub fn set_node_event_callback<T>(&self, cb: T, ubi: Arc<Ubisync>)
where
T: Fn(UbisyncNodeEvent, Arc<Ubisync>) + Send + Sync + 'static,
{
match self.node_event_callback.write() {
Ok(mut writeguard) => {
*writeguard = Some(Box::new(move |ev: UbisyncNodeEvent| (cb)(ev, ubi.clone())))
}
Err(e) => warn!("Failed to set node event callback: {:?}", e),
}
}
pub fn set_element_content(
&self,
element_id: &ElementId,
@ -99,6 +126,86 @@ impl State {
queries::peers::get(&self.db)
}
pub fn add_pot_member(&self, pot: &PotId, app: &AppId) -> anyhow::Result<()> {
debug!("Pot: {:?}", queries::pots::get(&self.db, pot));
match queries::pots::add_pot_member(&self.db, pot, app) {
Ok(_) => {
self.emit_app_event(
app,
AppEvent::NewPot {
id: pot.clone(),
app_type: queries::pots::get(&self.db, pot)?.app_type,
},
);
Ok(())
}
Err(e) => Err(e),
}
}
pub fn get_event_receiver(&self, app: &AppId) -> anyhow::Result<Arc<Mutex<Receiver<AppEvent>>>> {
self.create_event_channel_if_not_exists(app.clone());
match self.app_event_channels.read() {
Ok(map) => match map.get(&app) {
Some((_, rx)) => Ok(rx.clone()),
None => Err(Error::msg("Failed to get event receiver")),
},
Err(_) => Err(Error::msg("Failed to lock on event_channels")),
}
}
pub fn emit_node_event(&self, ev: UbisyncNodeEvent) -> anyhow::Result<()> {
debug!("1");
match self.node_event_callback.read() {
Ok(readguard) => {
debug!("2");
debug!("{:?}", readguard.is_some());
// If a callback is set, call it.
if let Some(cb) = readguard.as_ref() {
debug!("3");
(*cb)(ev);
}
// Whether a callback is set or not, return Ok(_)
Ok(())
}
Err(_) => Err(Error::msg("Failed to lock on node event callback function")),
}
}
fn emit_app_event(&self, app: &AppId, ev: AppEvent) {
if let Ok(channels) = self.app_event_channels.read() {
if let Some((sender, _)) = channels.get(app) {
debug!("Emitting app event to {:?}", &app);
if let Err(e) = sender.send(ev) {
debug!("Emitting app event failed: {:?}", e);
}
}
}
else {
debug!("Failed to get channels");
}
}
fn create_event_channel_if_not_exists(&self, app: AppId) {
if let Ok(mut map) = self.app_event_channels.write() {
if !map.contains_key(&app) {
debug!(
"No event channel for {:?} exists, creating a new one.",
&app
);
let (tx, rx) = channel();
map.insert(app, (tx, Arc::new(Mutex::new(rx))));
}
}
}
fn init_event_channels(&self) {
let app_ids = queries::apps::get_all_ids(&self.db).unwrap_or(vec![]);
for app in app_ids {
self.create_event_channel_if_not_exists(app);
}
}
fn send_to_peers(&self, ct: MessageContent) {
match self.comm_handle.read() {
Ok(opt) => {

View file

@ -4,12 +4,9 @@ use anyhow::{bail, Error};
use cozo::{DataValue, DbInstance, Num, ScriptMutability};
use serde_with::chrono::{DateTime, Utc};
use tracing::warn;
use ubisync_lib::types::{Pot, PotId};
use ubisync_lib::types::{AppId, Pot, PotId};
use crate::{
api::v0::app::{App, AppId},
run_query,
};
use crate::{api::v0::app::App, run_query};
pub fn add(
db: &DbInstance,
@ -239,12 +236,9 @@ mod tests {
use cozo::DbInstance;
use serde_with::chrono::Utc;
use tracing::{debug, Level};
use ubisync_lib::types::PotId;
use ubisync_lib::types::{AppId, PotId};
use crate::{
api::v0::app::AppId,
state::{queries::pots, schema},
};
use crate::state::{queries::pots, schema};
#[test]
pub fn default_pot() {

View file

@ -3,15 +3,14 @@ use std::collections::BTreeMap;
use anyhow::{bail, Error};
use cozo::{DataValue, DbInstance, JsonData, ScriptMutability};
use serde_json::Value;
use tracing::{debug, error};
use tracing::error;
use crate::{
api::v0::app::AppId,
run_query,
state::{Element, ElementContent, ElementId},
};
use ubisync_lib::types::{MessageId, PotId, Tag};
use ubisync_lib::types::{AppId, MessageId, PotId, Tag};
pub fn add(
db: &DbInstance,
@ -111,7 +110,6 @@ pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result<Element> {
match result {
Ok(val) => {
if let Some(firstrow) = val.rows.first() {
debug!("db result: {:?}", &firstrow.as_slice());
if let [DataValue::Json(JsonData(content)), latest_message, DataValue::Bool(local_changes), DataValue::Str(pot_id)] =
firstrow.as_slice()
{
@ -146,6 +144,7 @@ pub fn get_app_access(db: &DbInstance, id: &ElementId, app: &AppId) -> anyhow::R
DataValue::Str(serde_json::to_string(&app)?.into()),
);
//TODO id in script is not bound to $id parameter
let result = db.run_script(
"
memberships[pot_id] := *pot_memberships{pot_id, app_id}

View file

@ -2,9 +2,10 @@ use std::collections::BTreeMap;
use anyhow::Error;
use cozo::{DataValue, DbInstance, ScriptMutability};
use ubisync_lib::types::{Pot, PotId};
use tracing::debug;
use ubisync_lib::types::{AppId, Pot, PotId};
use crate::{api::v0::app::AppId, run_query};
use crate::run_query;
pub fn add(db: &DbInstance, id: &PotId, app_type: &str) -> anyhow::Result<()> {
let params = vec![
@ -34,7 +35,7 @@ pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result<Pot> {
);
let result = db.run_script(
"
?[id, app_type] := *pots{id, app_type}
?[app_type] := *pots[$id, app_type]
",
params,
cozo::ScriptMutability::Immutable,
@ -42,10 +43,10 @@ pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result<Pot> {
match result {
Ok(rows) => {
if let Some(firstrow) = rows.rows.first() {
if let [DataValue::Str(pot_id), DataValue::Str(app_type)] = firstrow.as_slice() {
if let [DataValue::Str(app_type)] = firstrow.as_slice() {
Ok(Pot::new(
serde_json::from_str(pot_id)?,
app_type.to_string(),
id.clone(),
serde_json::from_str(app_type)?,
))
} else {
Err(Error::msg("Could not parse result from query"))
@ -58,6 +59,23 @@ pub fn get(db: &DbInstance, id: &PotId) -> anyhow::Result<Pot> {
}
}
pub fn add_pot_member(db: &DbInstance, pot: &PotId, app: &AppId) -> anyhow::Result<()> {
let params = vec![
("pot_id", DataValue::Str(serde_json::to_string(pot)?.into())),
("app_id", DataValue::Str(serde_json::to_string(app)?.into())),
];
match run_query!(
&db,
":insert pot_memberships {pot_id, app_id}",
params,
ScriptMutability::Mutable
) {
Ok(_) => Ok(()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result<Vec<AppId>> {
let mut params = BTreeMap::new();
params.insert(
@ -66,7 +84,7 @@ pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result<Vec<AppId>
);
let result = db.run_script(
"
?[app_id] := *pot_memberships{id, app_id}
?[app_id] := *pot_memberships{id: $id, app_id}
",
params,
cozo::ScriptMutability::Immutable,
@ -91,3 +109,22 @@ pub fn get_pot_members(db: &DbInstance, id: &PotId) -> anyhow::Result<Vec<AppId>
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
#[cfg(test)]
mod tests {
use cozo::DbInstance;
use ubisync_lib::types::PotId;
use crate::state::schema;
#[test]
pub fn add_and_get() {
let db = DbInstance::new("mem", "", Default::default()).unwrap();
schema::add_schema(&db).unwrap();
let pot_id = PotId::new();
super::add(&db, &pot_id, "app_type").unwrap();
let returned = super::get(&db, &pot_id).unwrap();
assert_eq!(pot_id, returned.id);
assert_eq!("app_type", returned.app_type);
}
}

View file

@ -1,9 +1,12 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tracing::{debug, warn, Level};
use ubisync::{config::Config, Ubisync};
use ubisync::{config::Config, node_events::UbisyncNodeEvent, Ubisync};
use ubisync_lib::{
api::element::{ElementCreateRequest, ElementGetRequest},
api::{
element::{ElementCreateRequest, ElementGetRequest},
events::PollEventsRequest,
},
types::{Element, ElementContent},
};
use ubisync_sdk::UbisyncClient;
@ -19,7 +22,7 @@ async fn two_nodes_element_creation() {
let mut c2 = Config::default();
c2.api_config.port = Some(9982);
let ubi1 = Ubisync::new(&Config::default()).await.unwrap();
let ubi2 = Ubisync::new(&c2).await.unwrap();
let ubi2 = Arc::new(Ubisync::new(&c2).await.unwrap());
ubi1.add_peer_from_id(ubi2.get_destination().unwrap().into())
.unwrap();
@ -35,6 +38,17 @@ async fn two_nodes_element_creation() {
.await
.unwrap();
let app_id2 = api_client2.app_id();
ubi2.set_node_event_callback(
move |ev, node| {
if let UbisyncNodeEvent::NewPot { id, app_type } = ev {
debug!("callback called");
node.add_pot_member(&id, &app_id2).unwrap();
}
},
ubi2.clone(),
);
tokio::time::sleep(Duration::from_millis(5000)).await;
let test_element_content = ElementContent::Text("Text".to_string());
@ -65,3 +79,71 @@ async fn two_nodes_element_creation() {
assert_eq!(&test_element_content, received_element.content());
std::process::exit(0);
}
#[tokio::test(flavor = "multi_thread")]
async fn two_nodes_api_event() {
tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.init();
// Two nodes need to bind to different ports
let mut c2 = Config::default();
c2.api_config.port = Some(9982);
let ubi1 = Arc::new(Ubisync::new(&Config::default()).await.unwrap());
let ubi2 = Ubisync::new(&c2).await.unwrap();
ubi2.add_peer_from_id(ubi1.get_destination().unwrap().into())
.unwrap();
let api_client1 =
UbisyncClient::init("localhost", 9981, None, "App", "Long desc", "test-app-type")
.await
.unwrap();
let app_id1 = api_client1.app_id();
ubi1.set_node_event_callback(
move |ev, node| {
debug!("callback called");
if let UbisyncNodeEvent::NewPot { id, app_type } = ev {
node.add_pot_member(&id, &app_id1).unwrap();
}
},
ubi1.clone(),
);
tokio::spawn(async move {
loop {
if let Ok(events) = api_client1
.send(
PollEventsRequest {
timeout: 60,
accumulate: 1,
},
(),
)
.await
{
if events.events.len() > 0 {
debug!(
"Received event from local ubisync node: {:?}",
events.events.get(0).unwrap()
);
std::process::exit(0);
}
debug!("Still waiting for an event from the local ubisync node");
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
});
let _api_client2 =
UbisyncClient::init("localhost", 9982, None, "App", "Long desc", "test-app-type")
.await
.unwrap()
.create_default_pot()
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10000)).await;
}