Proof of Concept

This commit is contained in:
Philip (a-0) 2023-12-07 19:51:19 +01:00
commit e4fe60c06e
26 changed files with 4604 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
/.vscode

3360
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

26
Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name = "ubisync"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.71"
async-trait = "0.1.73"
axum = { version = "0.7.2", features = [ "macros" ] }
itertools = "0.12.0"
serde = { version = "1.0.166", features = [ "derive" ] }
serde_json = "1.0.99"
serde_with = "3.3.0"
serial_test = "2.0.0"
tokio = { version = "1.32.0", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
uuid = "1.4.1"
i2p = { path = "../i2p-rs" }
cozo = { version = "0.7.5", features = [ "storage-rocksdb", "requests", "graph-algo" ] }
[dev-dependencies]
reqwest = { version = "0.11.20", features = [ "json" ] }

15
README.md Normal file
View file

@ -0,0 +1,15 @@
# ubisync
This repo provides arbitrary data synchronization via I2P.
## Goals
- [ ] Core library for delta-syncing data via I2P
- [ ] Simple multi-platform GUI for setup and administration of one's ubisync node
- [ ] API for other software running on the same machine, providing sync of their data
## Vision
- Tag-based and full-text search across all data of the node
- Expose common data types found in any application's data store to the API
- recognize to-dos or dates in any data and expose them
- expose all data linked to any given time interval (e.g. files changed and messages sent on a given day)
- ...
- Allow easy-to-set-up passive nodes which clone all data of the active nodes, essentially replacing data sync servers and serving as sovereign backup solution

26
shell.nix Normal file
View file

@ -0,0 +1,26 @@
let
rust_overlay = import (builtins.fetchTarball "https://github.com/oxalica/rust-overlay/archive/master.tar.gz");
pkgs = import <nixpkgs> { overlays = [ rust_overlay ]; };
#rustVersion = "latest";
rustVersion = "1.76.0";
rust = pkgs.rust-bin.stable.${rustVersion}.default.override {
extensions = [
"rust-src" # for rust-analyzer
];
};
in
pkgs.mkShell {
buildInputs = [
rust
] ++ (with pkgs; [
rust-analyzer
pkg-config
openssl.dev
clang
sqlite
# other dependencies
# ...
]);
LIBCLANG_PATH = pkgs.libclang.lib + "/lib/";
RUST_BACKTRACE = 1;
}

66
src/api/mod.rs Normal file
View file

@ -0,0 +1,66 @@
use std::sync::Arc;
use axum::Router;
use tokio::{net::TcpListener, task::JoinHandle};
use crate::{state::State, config::ApiConfig};
mod v0;
pub struct Api {
server_thread: JoinHandle<()>,
}
impl Api {
pub fn stop(&self) {
self.server_thread.abort();
}
}
pub struct ApiBuilder {
bind_ip: Option<String>,
port: Option<u16>,
version: Option<String>,
}
impl Default for ApiBuilder {
fn default() -> Self {
ApiBuilder { bind_ip: None, port: None, version: None }
}
}
impl From<ApiConfig> for ApiBuilder {
fn from(value: ApiConfig) -> Self {
ApiBuilder {
bind_ip: value.ip,
port: value.port,
version: value.version
}
}
}
impl ApiBuilder {
pub async fn build(&self, state: Arc<State>) -> Api {
let mut app: Router = Router::new();
match &self.version {
Some(v) if v == "v0" => app = app.nest(&format!("/{}", v), v0::get_router(state.clone())),
_ => app = app.nest("/v0", v0::get_router(state.clone())),
}
let ip = match &self.bind_ip {
Some(addr) => addr,
None => "127.0.0.1",
};
let port = match &self.port {
Some(p) => p,
None => &9981,
};
let listener = TcpListener::bind(&format!("{}:{}", ip, port)).await.unwrap();
let task_handle = tokio::spawn({ async move {
axum::serve(listener, app).await.unwrap();
}});
Api{ server_thread: task_handle }
}
}

47
src/api/v0.rs Normal file
View file

@ -0,0 +1,47 @@
use std::sync::Arc;
use axum::{Router, routing::{put, get}, extract::{Path, Json}, Extension, response::{IntoResponse, Response}, http::StatusCode};
use crate::state::{State, types::{ElementId, ElementContent}};
pub fn get_router(state: Arc<State>) -> Router {
Router::new()
.route("/element", put(create_element))
.route("/element/:id", get(get_element).post(set_element).delete(remove_element))
.layer(Extension(state))
}
async fn get_element(Path(id): Path<ElementId>, s: Extension<Arc<State>>) -> Response {
let element = s.get_element(&id);
match element {
Some(el) => (StatusCode::OK, Json{0: el}).into_response(),
None => StatusCode::NOT_FOUND.into_response(),
}
}
async fn create_element(s: Extension<Arc<State>>, Json(content): Json<ElementContent>) -> Response {
let element_id = s.create_element(&content);
match element_id {
Ok(id) => (StatusCode::OK, Json{0: &Into::<String>::into(&id)}).into_response(),
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
async fn set_element(Path(id): Path<ElementId>, s: Extension<Arc<State>>, Json(content): Json<ElementContent>) -> Response {
let res = s.set_element_content(&id, &content);
match res {
Ok(_) => StatusCode::OK.into_response(),
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
async fn remove_element(Path(id): Path<ElementId>, s: Extension<Arc<State>>) -> Response {
let res = s.remove_element(&id);
match res {
Ok(_) => StatusCode::OK.into_response(),
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}

View file

@ -0,0 +1,27 @@
use std::sync::Arc;
use tracing::debug;
use crate::state::{State, types::PeerId};
use super::{messages::{Message, MessageContent}, Peer};
pub fn handle(state: Arc<State>, peer: &PeerId, message: Message) {
debug!("Handling message now: {:?}", message);
match message.content() {
MessageContent::Hello { peer_name } => {
state.set_peer(&Peer::new(peer.clone(), peer_name.clone())).expect("Couldn't set peer");
},
MessageContent::CreateElement { id, content } => {
state.set_element(id, content).expect("State failed");
},
MessageContent::SetElement { id, content } => {
state.set_element(id, content).expect("State failed");
},
MessageContent::RemoveElement { id } => {
state.remove_element(id).expect("State failed");
}
}
}

59
src/comm/messages/mod.rs Normal file
View file

@ -0,0 +1,59 @@
use serde::{Serialize, Deserialize};
use crate::state::types::{MessageId, ElementId, ElementContent};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message {
id: MessageId,
signature: MessageSignature,
content: MessageContent,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum MessageContent {
Hello {
peer_name: String,
},
CreateElement {
id: ElementId,
content: ElementContent,
},
SetElement {
id: ElementId,
content: ElementContent,
},
RemoveElement {
id: ElementId,
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MessageSignature {}
impl Message {
pub fn new(content: MessageContent) -> Self {
Message {
id: MessageId::new(),
signature: MessageSignature { },
content: content,
}
}
pub fn content(&self) -> &MessageContent {
&self.content
}
pub fn id(&self) -> &MessageId {
&self.id
}
}
impl MessageSignature {
pub fn verify(&self, _content: &MessageContent) -> bool {
true
}
}

241
src/comm/mod.rs Normal file
View file

@ -0,0 +1,241 @@
pub mod messages;
pub mod message_processor;
mod types;
use tracing::{warn, error, debug};
pub use types::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::io::{Read, Write};
use anyhow::bail;
use i2p::net::{I2pListenerBuilder, I2pListener, I2pSocketAddr, I2pStream, I2pAddr};
use i2p::sam_options::SAMOptions;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::Config;
use crate::state::State;
use crate::state::types::PeerId;
use self::messages::Message;
pub struct CommHandle {
state: Arc<State>,
i2p_server: Arc<I2pListener>,
// Maps peer addresses to existing connections to them
clients: Arc<RwLock<HashMap<I2pSocketAddr, Arc<RwLock<I2pStream>>>>>,
thread: RwLock<Option<JoinHandle<()>>>,
}
impl CommHandle {
pub fn new(state: Arc<State>, config: &Config) -> anyhow::Result<Self> {
let mut listener_builder = I2pListenerBuilder::default()
.with_options(SAMOptions::default());
if let Some(privkey) = &config.i2p_private_key {
listener_builder = listener_builder.with_private_key(privkey.to_string());
}
let listener = listener_builder
.build()
.unwrap();
Ok(CommHandle {
state: state,
i2p_server: Arc::new(listener),
clients: Default::default(),
thread: RwLock::new(None),
})
}
pub async fn run(&self) {
debug!("CommHandle is running now");
let state = self.state.clone();
let i2p_server = self.i2p_server.clone();
let clients = self.clients.clone();
let mut thread_writeguard = self.thread.write().await;
*thread_writeguard = Some(tokio::spawn(async move {
for incoming in i2p_server.incoming() {
if let Ok(stream) = incoming {
if let Ok(addr) = stream.peer_addr() {
// First, save a reference to the new stream in `clients` for later reuse
let wrapped_stream = Arc::new(RwLock::new(stream));
clients.write().await.insert(addr, wrapped_stream.clone());
// Reference to state to be passed to `read_connection()`
let state_arc = state.clone();
// Spawn a blocking task, which (in read_connection) will spawn a non-blocking task
// The "outer" blocking task exists, because the for loop's iterator will block until
// there is another stream - thus, the existing streams will not be read.
// `spawn_blocking` moves the reading task to a special pool of tasks which are
// executed _despite_ other tasks blocking for something.
tokio::task::spawn_blocking(move || Self::read_connection(wrapped_stream, state_arc));
}
}
}
}));
}
pub async fn stop(self) {
if let Some(t) = self.thread.read().await.as_ref() {
t.abort();
}
}
pub async fn broadcast(&self, msg: Message) -> anyhow::Result<()> {
match serde_json::to_string(&msg) {
Ok(msg_string) => {
for peer in self.state.get_peers().unwrap() {
debug!("Sending to peer '{:?}' message '{:?}'", &peer, &msg);
if let Err(e) = self.send_to_addr(&peer.addr(), msg_string.as_bytes()).await {
debug!("Failed to send message.\nError: {:?}\nMessage: {:?}", e, &msg);
}
}
Ok(())
},
Err(e) => bail!(e),
}
}
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
match serde_json::to_string(&msg) {
Ok(msg_string) => {
self.send_to_addr(dest, msg_string.as_bytes()).await?;
Ok(())
},
Err(e) => bail!(e),
}
}
pub async fn send_to_addr(&self, addr: &I2pSocketAddr, msg: &[u8]) -> anyhow::Result<()> {
// Create client for this connection if necessary
if !self.clients.read().await.contains_key(addr) {
match I2pStream::connect(addr) {
Ok(client) => {
//client.inner.sam.conn.set_nodelay(true)?;
//client.inner.sam.conn.set_nonblocking(false)?;
self.clients.write().await.insert(addr.clone(), Arc::new(RwLock::new(client)));
},
Err(e) => bail!(e),
}
}
// Fetch current client for this connection from clients map, and send the message
if let Some(client) = self.clients.read().await.get(&addr) {
let mut writeguard = client.write().await;
match writeguard.write_all(msg) {
Ok(_) => {
writeguard.flush()?;
return Ok(())
},
Err(e) => {
warn!("Error writing to stream: {}", e)
}
}
}
else {
return Err(anyhow::Error::msg("No client found despite trying to add one beforehand."))
}
self.clients.write().await.remove(&addr);
Err(anyhow::Error::msg("Failed to send anything, most likely the stream was broken and has been removed"))
}
pub fn i2p_address(&self) -> anyhow::Result<I2pSocketAddr> {
match self.i2p_server.local_addr() {
Ok(addr) => Ok(addr),
Err(e) => bail!(e),
}
}
pub fn i2p_b32_address(&self) -> anyhow::Result<I2pSocketAddr> {
let mut i2p_dest = self.i2p_address()?;
i2p_dest.set_dest(I2pAddr::from_b64(&i2p_dest.dest().string()).unwrap());
Ok(i2p_dest)
}
fn read_connection(wrapped_stream: Arc<RwLock<I2pStream>>, state: Arc<State>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut stream = wrapped_stream.write().await;
let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into();
// All streams start with a \n byte which does not belong to the payload, take that from the stream.
if let Err(e) = stream.read(&mut [0; 1]) {
error!("Error while reading first byte of stream: {}", e);
return;
}
let iterator = serde_json::Deserializer::from_reader(&mut *stream).into_iter::<serde_json::Value>();
for item in iterator {
match item {
Ok(value) => {
match serde_json::from_value::<Message>(value) {
Ok(message) => {
message_processor::handle(state.clone(), &peer, message);
},
Err(e) => warn!("Deserialization failed: {:?}", e),
}
},
Err(e) => {
warn!("Deserialization failed: {:?}", e);
break;
}
}
}
})
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use i2p::Session;
use i2p::net::I2pListener;
use i2p::sam::StreamForward;
use i2p::sam_options::SAMOptions;
use crate::Config;
use crate::state::State;
use crate::comm::{messages, Message};
use crate::state::types::ElementId;
use super::CommHandle;
#[tokio::test(flavor = "multi_thread")]
pub async fn msg() {
let ch = CommHandle::new(State::new().await.unwrap(), &Config::default() ).unwrap();
ch.run().await;
println!("My address: {:?}", ch.i2p_b32_address());
ch.send(
&ch.i2p_address().unwrap(),
Message::new(messages::MessageContent::Hello { peer_name: "a".to_string() })
).await.expect("Could not send hello");
for i in 0..10 {
let result = ch.send(
&ch.i2p_address().unwrap(),
Message::new(messages::MessageContent::CreateElement { id: ElementId::new(), content: crate::state::types::ElementContent::Text(format!("hello world no. {}", i)) })
).await;
tokio::time::sleep(Duration::from_millis(300)).await;
println!("Result of sending: {:?}", result);
}
ch.stop().await;
}
#[test]
pub fn from_privkey() {
let privkey = "DPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gVKUHq0huLwfh0u06PlujRXTgcUJw9pg4Vkh-e0CGQFL6yn2FxCUIvaryyFt3-8mwO1OTkQyB7u1rnO9FpLlKeT9FPSkwmaxZmwQ1kvsuTTIp5ntxQZ1XMCDm2qhRWdcEsYxTKLJIMYxN1Ujk9Y7SqNYORmxrwQWC4ENGnt~VyvbAAAAfAabqgU0GhMWN2syDQ5sYZ61WXDqC4esasxwyLvJ-ES7~k40Uq9htc8t16-RXEq0Q17C499WxW6~GQRcXbgBNd0bMdV-46RsFo1jNgfB6H4nkuTrQXMqXB6s2Fhx2gwcHRk3Lt5DE4N0mvHG8Po974tJWr1hIRiSxQUtSj5kcZOOT~EKWMoCA7qDgZORZAnJavaRr0S-PiPQwAw8HOekdw50CyOByxXEfLBAi-Kz1nhdNvMHIrtcBZ~RpsxOK63O633e0PeYwrOOG7AFVLh7SzdwVvI1-KUe7y2ADBcoHuJRMwk5MEV-BATEfhWA2SzWw1qFRzJyb-pGbgGCJQOoc1YcP8jikBJhtuRbD5K-wK5MXoHL";
let _ = I2pListener {
forward: StreamForward::with_session(&Session::from_destination(
i2p::sam::DEFAULT_API,
&privkey,
SAMOptions::default()).expect("Failed to create session for listener2")
).expect("Failed to create StreamForward for listener2")
};
}
}

51
src/comm/types.rs Normal file
View file

@ -0,0 +1,51 @@
use anyhow::bail;
use i2p::net::I2pSocketAddr;
use serde::{Serialize, Deserialize};
use crate::state::types::PeerId;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Peer {
id: PeerId,
name: String,
family: Vec<PeerId>,
}
impl Peer {
pub fn new(id: PeerId, name: String) -> Self {
Peer {id: id, name: name, family: vec![]}
}
pub fn addr(&self) -> I2pSocketAddr {
self.id.addr()
}
pub fn id(&self) -> PeerId {
self.id.clone()
}
pub fn name(&self) -> String {
self.name.clone()
}
}
impl TryFrom<&str> for Peer {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match serde_json::from_str(value) {
Ok(p) => Ok(p),
Err(e) => bail!(e),
}
}
}
impl TryFrom<String> for Peer {
type Error = anyhow::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
Self::try_from(value.as_str())
}
}

26
src/config.rs Normal file
View file

@ -0,0 +1,26 @@
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Config {
pub i2p_private_key: Option<String>,
pub api_config: ApiConfig,
}
impl Default for Config {
fn default() -> Self {
Config { i2p_private_key: None, api_config: Default::default() }
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ApiConfig {
pub ip: Option<String>,
pub port: Option<u16>,
pub version: Option<String>,
}
impl Default for ApiConfig {
fn default() -> Self {
ApiConfig { ip: None, port: None, version: None }
}
}

83
src/lib.rs Normal file
View file

@ -0,0 +1,83 @@
use std::sync::Arc;
use anyhow::bail;
use api::{Api, ApiBuilder};
use comm::{CommHandle, Peer};
use config::Config;
use i2p::net::I2pSocketAddr;
use serde::{Serialize, Deserialize};
use state::{State, types::{ElementContent, ElementId, MessageId, PeerId}};
mod api;
pub mod comm;
pub mod config;
pub mod state;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MessageRelations {
pub parents: Vec<MessageId>,
}
pub struct Ubisync {
comm_handle: Arc<CommHandle>,
state_handle: Arc<State>,
api: Arc<Api>,
}
impl Ubisync {
pub async fn new(config: &Config) -> anyhow::Result<Self> {
let state = State::new().await?;
let comm_handle = Arc::new(CommHandle::new(state.clone(), config)?);
state.set_comm_handle(comm_handle.clone());
let api = Arc::new(ApiBuilder::from(config.api_config.clone()).build(state.clone()).await);
comm_handle.run().await;
Ok(Ubisync {
comm_handle: comm_handle,
state_handle: state,
api: api
})
}
pub fn api(&self) -> Arc<Api> {
self.api.clone()
}
pub fn add_peer(&self, p: impl TryInto<Peer, Error=anyhow::Error>) -> anyhow::Result<()> {
match p.try_into() {
Ok(peer) => self.state_handle.set_peer(&peer),
Err(e) => bail!(e),
}
}
pub fn add_peer_from_id(&self, id: PeerId) -> anyhow::Result<()> {
// TODO: resolve peer's name before setting
self.state_handle.set_peer(&Peer::new(id, "".to_string()))
}
pub fn get_peers(&self) -> Vec<Peer> {
self.state_handle.get_peers().unwrap_or(vec![])
}
pub fn get_destination(&self) -> anyhow::Result<I2pSocketAddr> {
self.comm_handle.i2p_address()
}
pub fn create_element(&self, content: &ElementContent) -> anyhow::Result<ElementId> {
self.state_handle.create_element(content)
}
}
#[cfg(test)]
mod tests {
#[test]
#[ignore]
fn full_system_test1() {
// Do a test requiring a specific system state (including homeserver state or such)
// Disabled by default, since files need to be set up explicitly for this test
todo!()
}
}

135
src/state/mod.rs Normal file
View file

@ -0,0 +1,135 @@
use std::sync::{Arc, RwLock};
use anyhow::Error;
use cozo::DbInstance;
use tracing::{error, debug};
use crate::comm::{Peer, CommHandle, messages::{Message, MessageContent}};
use self::types::{ElementContent, ElementId, Element, Tag};
pub mod types;
mod queries;
mod schema;
pub struct State {
db: DbInstance,
comm_handle: RwLock<Option<Arc<CommHandle>>>,
}
impl State {
pub async fn new() -> anyhow::Result<Arc<State>> {
let db = DbInstance::new("mem", "", Default::default());
match db {
Ok(d) => {
schema::add_schema(&d)?;
Ok(Arc::new(State {db: d, comm_handle: RwLock::new(None)}))
},
Err(e) => Err(Error::msg(format!("{:?}", e))),
}
}
pub fn set_comm_handle(&self, handle: Arc<CommHandle>) {
*self.comm_handle.write().as_deref_mut().expect("Could not set state's CommHandle") = Some(handle);
}
// Create an element and add it to the database
pub fn create_element(&self, content: &ElementContent) -> anyhow::Result<ElementId> {
let id = ElementId::new();
queries::set_element(&self.db, &id, &content)?;
debug!("Created element with id {:?}: {:?}", &id, self.get_element(&id));
self.send_to_peers(MessageContent::CreateElement { id: id.clone(), content: content.clone() });
Ok(id)
}
// Anyone updated an element, update it in the database
pub fn set_element(&self, element_id: &ElementId, content: &ElementContent) -> anyhow::Result<()> {
let res = queries::set_element(&self.db, element_id, content);
debug!("Set element with id {:?}: {:?}", element_id, self.get_element(element_id));
self.send_to_peers(MessageContent::SetElement { id: element_id.clone(), content: content.clone() });
res
}
pub fn set_element_content(&self, element_id: &ElementId, content: &ElementContent) -> anyhow::Result<()> {
let res = queries::set_element_content(&self.db, element_id, content);
debug!("Set element content with id {:?}: {:?}", element_id, self.get_element(element_id));
self.send_to_peers(MessageContent::SetElement { id: element_id.clone(), content: content.clone() });
res
}
pub fn remove_element(&self, element_id: &ElementId) -> anyhow::Result<()> {
let res = queries::remove_element(&self.db, element_id);
self.send_to_peers(MessageContent::RemoveElement { id: element_id.clone() });
res
}
pub fn get_element(&self, id: &ElementId) -> Option<Element> {
queries::get_element(&self.db, id).ok()
}
pub fn get_elements_by_tag(&self, tag: &Tag) -> Vec<ElementId> {
queries::get_elements_by_tag(&self.db, tag)
.map_err(|e| {error!("{}", e); e})
.unwrap_or(vec![])
}
pub fn set_peer(&self, peer: &Peer) -> anyhow::Result<()> {
queries::add_peer(&self.db, &peer.id(), &peer.name())
}
pub fn get_peers(&self) -> anyhow::Result<Vec<Peer>> {
queries::get_peers(&self.db)
}
fn send_to_peers(&self, ct: MessageContent) {
match self.comm_handle.read() {
Ok(opt) => {
if opt.is_some() {
let arc = opt.as_ref().unwrap().clone();
tokio::spawn(async move {
let _ = arc.broadcast(Message::new(ct)).await;
});
}
},
Err(e) => debug!("{}", e),
}
}
}
#[tokio::test]
#[serial_test::serial]
async fn test_create() {
tracing_subscriber::fmt().pretty().init();
let state = State::new().await.unwrap();
let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap();
let el = state.get_element(&id).unwrap();
assert_eq!(
ElementContent::Text("Test-text".to_string()),
el.content().to_owned()
)
}
#[tokio::test]
#[serial_test::serial]
async fn test_update() {
tracing_subscriber::fmt().pretty().init();
let state = State::new().await.unwrap();
let id = state.create_element(&ElementContent::Text("Test-text".to_string())).unwrap();
state.set_element(&id,&ElementContent::Text("Test-text 2".to_string())).unwrap();
let el = state.get_element(&id).unwrap();
assert_eq!(
ElementContent::Text("Test-text 2".to_string()),
el.content().to_owned()
)
}

View file

@ -0,0 +1,90 @@
use std::collections::BTreeMap;
use anyhow::{Error, bail};
use cozo::{DbInstance, DataValue, JsonData, ScriptMutability};
use serde_json::Value;
use tracing::error;
use crate::{state::{ElementContent, ElementId, Element, types::Tag}, run_query};
pub fn set_element(db: &DbInstance, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(&id)?.into())),
("content", DataValue::Str(serde_json::to_string(content)?.into()))
];
match run_query!(&db, ":put elements {id => content}", params, ScriptMutability::Mutable) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}
pub fn set_element_content(db: &DbInstance, id: &ElementId, content: &ElementContent) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(&id)?.into())),
("content", DataValue::Str(serde_json::to_string(content)?.into()))
];
match run_query!(&db, ":put elements {id => content}", params, ScriptMutability::Mutable) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}
pub fn remove_element(db: &DbInstance, id: &ElementId) -> anyhow::Result<()> {
match run_query!(&db, ":delete elements {id}", vec![("id", DataValue::Str(serde_json::to_string(&id)?.into()))], cozo::ScriptMutability::Mutable) {
Ok(_) => Ok(()),
Err(report) => bail!(report),
}
}
pub fn get_element(db: &DbInstance, id: &ElementId) -> anyhow::Result<Element> {
let mut params = BTreeMap::new();
params.insert("id".to_string(), DataValue::Str(serde_json::to_string(&id)?.into()));
let result = db.run_script("
?[content] := *elements[$id, content]
", params, cozo::ScriptMutability::Immutable);
match result {
Ok(val) => {
if let Some(firstrow) = val.rows.first() {
if let [ DataValue::Json(JsonData(Value::String(content))) ] = firstrow.as_slice() {
return Ok(Element::new(
id.to_owned(),
content.as_str().try_into()?,
));
}
}
return Err(Error::msg("Could not parse db result as Element"));
},
Err(report) => bail!(report),
}
}
pub fn get_elements_by_tag(db: &DbInstance, tag: &Tag) -> anyhow::Result<Vec<ElementId>> {
let mut params = BTreeMap::new();
params.insert("tag".to_string(), DataValue::Str(serde_json::to_string(tag)?.into()));
let result = db.run_script("
?[element] := *tags[$tag, element]
", params, cozo::ScriptMutability::Immutable);
match result {
Ok(named_rows) => {
let mut element_ids = vec![];
for row in named_rows.rows {
if let [ DataValue::Json(JsonData(Value::String(element_id)))] = row.as_slice() {
match serde_json::from_str(&element_id) {
Ok(id) => element_ids.push(id),
Err(e) => {
error!("Error parsing element id {}: {}", element_id, e);
continue
}
}
}
}
Ok(element_ids)
},
Err(report) => bail!(report),
}
}

52
src/state/queries/mod.rs Normal file
View file

@ -0,0 +1,52 @@
mod elements;
pub use elements::*;
mod peers;
pub use peers::*;
#[macro_export]
macro_rules! build_query {
($payload:expr, $params:expr) => {
{
// Build parameters map
let mut params_map: BTreeMap<String, DataValue> = Default::default();
let mut parameters_init = String::new();
if $params.len() > 0 {
for (name, value) in $params {
let _: &str = name; // only for type annotation
params_map.insert(name.to_string(), value);
}
// First line: Initialize parameters, make them available in CozoScript
use itertools::Itertools;
parameters_init += "?[";
parameters_init += &params_map.iter().map(|(name, _)| name).format(", ").to_string();
parameters_init += "] <- [[";
parameters_init += &params_map.iter().map(|(name, _)| format!("${}", name)).format(", ").to_string();
parameters_init += "]]";
}
// Return query string and parameters map
(
format!("{}\n\n{}", parameters_init, $payload),
params_map
)
}
};
}
use build_query;
#[macro_export]
macro_rules! run_query {
($db:expr, $payload:expr, $params:expr, $mutability:expr) => {
{
let (query, parameters) = crate::state::queries::build_query!($payload, $params);
$db.run_script(query.as_str(), parameters, $mutability)
}
};
}

View file

@ -0,0 +1,49 @@
use std::collections::BTreeMap;
use anyhow::Error;
use cozo::{DbInstance, DataValue, ScriptMutability};
use crate::{state::types::PeerId, comm::Peer, run_query};
pub fn add_peer(db: &DbInstance, id: &PeerId, name: &str) -> anyhow::Result<()> {
let params = vec![
("id", DataValue::Str(serde_json::to_string(id)?.into())),
("name", DataValue::Str(serde_json::to_string(name)?.into()))
];
match run_query!(&db, ":put peers {id => name}", params, ScriptMutability::Mutable) {
Ok(_) => Ok(()),
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}
pub fn get_peers(db: &DbInstance) -> anyhow::Result<Vec<Peer>> {
let result = db.run_script("
?[id, name] := *peers{id, name}
", Default::default(), cozo::ScriptMutability::Immutable);
match result {
Ok(rows) => {
Ok(
rows.rows.into_iter().map(
|row| {
match row.as_slice() {
[DataValue::Str(id_string), DataValue::Str(name_string)] => {
if let Ok(id) = serde_json::from_str(&id_string) {
Some(Peer::new(id, name_string.as_str().to_string()))
}
else {
None
}},
_ => None
}
}
)
.flatten()
.collect()
)
},
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
}
}

29
src/state/schema.rs Normal file
View file

@ -0,0 +1,29 @@
use std::collections::BTreeMap;
use anyhow::Error;
use cozo::DbInstance;
pub fn add_schema(db: &DbInstance) -> anyhow::Result<()> {
let params = BTreeMap::new();
match db.run_script("
{:create peers {
id: String,
=>
name: String,
}}
{:create elements {
id: String,
=>
content: Json,
}}
{:create tags {
tag: String,
element: String
}}
", params, cozo::ScriptMutability::Mutable) {
Ok(_) => Ok(()),
Err(e) => Err(Error::msg(format!("Failed to set up schema: {}", e))),
}
}

View file

@ -0,0 +1,25 @@
use serde::{Serialize, Deserialize};
use super::{ElementId, ElementContent};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Element {
// Uuid identifying the element itself
id: ElementId,
content: ElementContent
}
impl Element {
pub fn new(id: ElementId, content: ElementContent) -> Self {
Element { id: id, content: content }
}
pub fn id(&self) -> &ElementId {
&self.id
}
pub fn content(&self) -> &ElementContent {
&self.content
}
}

View file

@ -0,0 +1,34 @@
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum ElementContent {
Text(String),
}
impl TryFrom<ElementContent> for String {
type Error = serde_json::Error;
fn try_from(value: ElementContent) -> Result<Self, Self::Error> {
match serde_json::to_string(&value) {
Ok(s) => Ok(s),
Err(e) => Err(e)
}
}
}
impl TryFrom<&str> for ElementContent {
type Error = serde_json::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match serde_json::from_str(value) {
Ok(ec) => Ok(ec),
Err(e) => Err(e)
}
}
}
#[test]
pub fn test() {
}

View file

@ -0,0 +1,23 @@
use serde::{Serialize, Deserialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ElementId(Uuid);
impl ElementId {
pub fn new() -> Self {
ElementId(Uuid::new_v4())
}
}
impl From<&ElementId> for String {
fn from(value: &ElementId) -> Self {
value.0.to_string()
}
}
impl TryFrom<&str> for ElementId {
type Error = serde_json::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
serde_json::from_str(value)
}
}

View file

@ -0,0 +1,31 @@
use serde::{Serialize, Deserialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MessageId(Uuid);
impl MessageId {
pub fn new() -> Self {
MessageId(Uuid::new_v4())
}
}
impl ToString for MessageId {
fn to_string(&self) -> String {
self.0.to_string()
}
}
impl From<MessageId> for String {
fn from(value: MessageId) -> Self {
value.to_string()
}
}
impl TryFrom<&str> for MessageId {
type Error = serde_json::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
serde_json::from_str(value)
}
}

17
src/state/types/mod.rs Normal file
View file

@ -0,0 +1,17 @@
mod element_content;
pub use element_content::ElementContent;
mod element_id;
pub use element_id::ElementId;
mod element;
pub use element::Element;
mod message_id;
pub use message_id::MessageId;
mod peer_id;
pub use peer_id::PeerId;
mod tag;
pub use tag::Tag;

View file

@ -0,0 +1,51 @@
use anyhow::bail;
use i2p::net::{I2pSocketAddr, ToI2pSocketAddrs};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PeerId {
i2p_addr: I2pSocketAddr,
}
impl PeerId {
pub fn addr(&self) -> I2pSocketAddr {
self.i2p_addr.to_owned()
}
}
impl TryFrom<&str> for PeerId {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self, anyhow::Error> {
match ToI2pSocketAddrs::to_socket_addrs(&value) {
Ok(addr_iter) => {
for addr in addr_iter {
return Ok(PeerId { i2p_addr: addr });
}
return Err(anyhow::Error::msg("No valid I2P address found"));
},
Err(e) => bail!(e),
}
}
}
impl TryFrom<String> for PeerId {
type Error = anyhow::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
Self::try_from(value.as_str())
}
}
impl From<I2pSocketAddr> for PeerId {
fn from(value: I2pSocketAddr) -> Self {
PeerId { i2p_addr: value }
}
}
impl From<PeerId> for I2pSocketAddr {
fn from(value: PeerId) -> Self {
value.i2p_addr
}
}

7
src/state/types/tag.rs Normal file
View file

@ -0,0 +1,7 @@
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Tag {
tag: String,
}

32
tests/api.rs Normal file
View file

@ -0,0 +1,32 @@
use std::time::Duration;
use tracing::{Level, debug};
use ubisync::{Ubisync, config::Config, state::types::{ElementContent, Element, ElementId}};
#[tokio::test(flavor = "multi_thread")]
async fn two_nodes_element_creation() {
tracing_subscriber::fmt().pretty().with_max_level(Level::DEBUG).init();
// Two nodes need to bind to different ports
let mut c2 = Config::default();
c2.api_config.port = Some(9982);
let ubi1 = Ubisync::new(&Config::default()).await.unwrap();
let ubi2 = Ubisync::new(&c2).await.unwrap();
ubi1.add_peer_from_id(ubi2.get_destination().unwrap().into()).unwrap();
let http_client = reqwest::Client::new();
let test_element_content = ElementContent::Text("Text".to_string());
let put_resp = http_client.put(&format!("http://localhost:9981/v0/element")).json(&test_element_content).send().await.unwrap();
debug!("{:?}", &put_resp);
let id = serde_json::from_str::<ElementId>(&put_resp.text().await.expect("No put response body")).expect("Could not deserialize ElementId");
tokio::time::sleep(Duration::from_millis(3000)).await;
let get_resp = http_client.get(&format!("http://localhost:9982/v0/element/{}", Into::<String>::into(&id))).send().await.expect("Get request failed");
let received_element = serde_json::from_str::<Element>(&get_resp.text().await.expect("No get request body")).expect("Could not deserialize Element");
debug!("Other node received this element: {:?}", received_element);
assert_eq!(&test_element_content, received_element.content());
std::process::exit(0);
}