Split repository into 3 crates
This commit is contained in:
parent
d8f1733eb3
commit
31dc4fd4a3
35 changed files with 4011 additions and 498 deletions
3427
ubisync-lib/Cargo.lock
generated
Normal file
3427
ubisync-lib/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
28
ubisync-lib/Cargo.toml
Normal file
28
ubisync-lib/Cargo.toml
Normal file
|
@ -0,0 +1,28 @@
|
|||
[package]
|
||||
name = "ubisync-lib"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.71"
|
||||
async-trait = "0.1.73"
|
||||
axum = { version = "0.7.2", features = [ "macros" ] }
|
||||
chrono = "0.4.31"
|
||||
itertools = "0.12.0"
|
||||
jsonwebtoken = "9.2.0"
|
||||
serde = { version = "1.0.166", features = [ "derive" ] }
|
||||
serde_json = "1.0.99"
|
||||
serde_with = "3.3.0"
|
||||
serial_test = "2.0.0"
|
||||
tokio = { version = "1.32.0", features = ["full"] }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = "0.3.17"
|
||||
uuid = "1.4.1"
|
||||
|
||||
i2p = { path = "../../i2p-rs" }
|
||||
|
||||
cozo = { version = "0.7.5", features = [ "storage-rocksdb", "requests", "graph-algo" ] }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
reqwest = { version = "0.11.20", features = [ "json" ] }
|
74
ubisync-lib/src/api/mod.rs
Normal file
74
ubisync-lib/src/api/mod.rs
Normal file
|
@ -0,0 +1,74 @@
|
|||
use axum::Router;
|
||||
use tokio::{net::TcpListener, task::JoinHandle};
|
||||
|
||||
use crate::{config::ApiConfig, state::ApiState};
|
||||
|
||||
pub mod v0;
|
||||
|
||||
pub struct Api {
|
||||
server_thread: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
pub fn stop(&self) {
|
||||
self.server_thread.abort();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ApiBuilder {
|
||||
bind_ip: Option<String>,
|
||||
port: Option<u16>,
|
||||
version: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for ApiBuilder {
|
||||
fn default() -> Self {
|
||||
ApiBuilder {
|
||||
bind_ip: None,
|
||||
port: None,
|
||||
version: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ApiConfig> for ApiBuilder {
|
||||
fn from(value: ApiConfig) -> Self {
|
||||
ApiBuilder {
|
||||
bind_ip: value.ip,
|
||||
port: value.port,
|
||||
version: value.version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiBuilder {
|
||||
pub async fn build(&self, state: ApiState) -> Api {
|
||||
let mut app: Router = Router::new();
|
||||
match &self.version {
|
||||
Some(v) if v == "v0" => app = app.nest(&format!("/{}", v), v0::get_router(state)),
|
||||
_ => app = app.nest("/v0", v0::get_router(state)),
|
||||
}
|
||||
|
||||
let ip = match &self.bind_ip {
|
||||
Some(addr) => addr,
|
||||
None => "127.0.0.1",
|
||||
};
|
||||
let port = match &self.port {
|
||||
Some(p) => p,
|
||||
None => &9981,
|
||||
};
|
||||
let listener = TcpListener::bind(&format!("{}:{}", ip, port))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let task_handle = tokio::spawn({
|
||||
async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
Api {
|
||||
server_thread: task_handle,
|
||||
}
|
||||
}
|
||||
}
|
98
ubisync-lib/src/api/v0/app.rs
Normal file
98
ubisync-lib/src/api/v0/app.rs
Normal file
|
@ -0,0 +1,98 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::Request,
|
||||
http::{header::AUTHORIZATION, StatusCode},
|
||||
middleware::Next,
|
||||
response::{IntoResponse, Response},
|
||||
Extension, Json,
|
||||
};
|
||||
use jsonwebtoken::{decode, Header};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{debug, error, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::state::ApiState;
|
||||
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct AppId(Uuid);
|
||||
|
||||
impl AppId {
|
||||
pub fn new() -> Self {
|
||||
AppId { 0: Uuid::new_v4() }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct JWTClaims {
|
||||
sub: AppId,
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AppDescription {
|
||||
pub name: String,
|
||||
pub desc_text: String,
|
||||
}
|
||||
|
||||
|
||||
pub(super) async fn auth(
|
||||
s: Extension<Arc<ApiState>>,
|
||||
mut request: Request<Body>,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
if let Some(auth_header) = request.headers().get(AUTHORIZATION) {
|
||||
if let Ok(header_string) = auth_header.to_str() {
|
||||
if header_string.starts_with("Bearer") {
|
||||
if let Ok(token) = decode::<JWTClaims>(
|
||||
&header_string[7..],
|
||||
s.jwt_decoding_key(),
|
||||
s.jwt_validation(),
|
||||
) {
|
||||
if let Ok(true) = s.app_exists(&token.claims.sub) {
|
||||
debug!("Authentication for {:?} succeeded.", &token.claims.sub);
|
||||
request.extensions_mut().insert(token.claims.sub);
|
||||
return next.run(request).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!("Authentication failed for request '{:?}'.", &request);
|
||||
StatusCode::UNAUTHORIZED.into_response()
|
||||
}
|
||||
|
||||
pub(super) async fn register(
|
||||
s: Extension<Arc<ApiState>>,
|
||||
Json(data): Json<AppDescription>,
|
||||
) -> Response {
|
||||
// Maybe ask for consent by user
|
||||
|
||||
// If user wants registration, proceed
|
||||
let result = s.add_app(&data);
|
||||
|
||||
match result {
|
||||
Ok(id) => {
|
||||
// Build JWT, respond
|
||||
let jwt = jsonwebtoken::encode(
|
||||
&Header::default(),
|
||||
&JWTClaims { sub: id },
|
||||
&s.jwt_encoding_key(),
|
||||
);
|
||||
match jwt {
|
||||
Ok(token) => (StatusCode::OK, token).into_response(),
|
||||
Err(e) => {
|
||||
warn!("Failed to encode token: {:?}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to register new application! {:?}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
63
ubisync-lib/src/api/v0/element.rs
Normal file
63
ubisync-lib/src/api/v0/element.rs
Normal file
|
@ -0,0 +1,63 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use axum::{
|
||||
extract::{Json, Path},
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
Extension,
|
||||
};
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::state::{
|
||||
types::{ElementContent, ElementId},
|
||||
ApiState,
|
||||
};
|
||||
|
||||
pub(super) async fn get(Path(id): Path<ElementId>, s: Extension<Arc<ApiState>>) -> Response {
|
||||
let element = s.get_element(&id);
|
||||
match element {
|
||||
Ok(el) => (StatusCode::OK, Json { 0: el }).into_response(),
|
||||
Err(e) => {
|
||||
warn!("Element not found:\n{:?}", e);
|
||||
StatusCode::NOT_FOUND.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn create(
|
||||
s: Extension<Arc<ApiState>>,
|
||||
Json(content): Json<ElementContent>,
|
||||
) -> Response {
|
||||
let element_id = s.create_element(&content);
|
||||
debug!("{:?}", element_id);
|
||||
match element_id {
|
||||
Ok(id) => (
|
||||
StatusCode::OK,
|
||||
Json {
|
||||
0: &Into::<String>::into(&id),
|
||||
},
|
||||
)
|
||||
.into_response(),
|
||||
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn set(
|
||||
Path(id): Path<ElementId>,
|
||||
s: Extension<Arc<ApiState>>,
|
||||
Json(content): Json<ElementContent>,
|
||||
) -> Response {
|
||||
let res = s.write_element_content(&id, &content);
|
||||
match res {
|
||||
Ok(_) => StatusCode::OK.into_response(),
|
||||
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn remove(Path(id): Path<ElementId>, s: Extension<Arc<ApiState>>) -> Response {
|
||||
let res = s.remove_element(&id);
|
||||
match res {
|
||||
Ok(_) => StatusCode::OK.into_response(),
|
||||
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
||||
}
|
||||
}
|
25
ubisync-lib/src/api/v0/mod.rs
Normal file
25
ubisync-lib/src/api/v0/mod.rs
Normal file
|
@ -0,0 +1,25 @@
|
|||
use axum::{
|
||||
middleware::from_fn,
|
||||
routing::{get, put},
|
||||
Extension, Router,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::state::ApiState;
|
||||
|
||||
pub mod app;
|
||||
pub mod element;
|
||||
|
||||
pub fn get_router(state: ApiState) -> Router {
|
||||
Router::new()
|
||||
// authenticated routes
|
||||
.route("/element", put(element::create))
|
||||
.route(
|
||||
"/element/:id",
|
||||
get(element::get).post(element::set).delete(element::remove),
|
||||
)
|
||||
.layer(from_fn(app::auth))
|
||||
// public / unauthenticated routes
|
||||
.route("/app/register", put(app::register))
|
||||
.layer(Extension(Arc::new(state)))
|
||||
}
|
27
ubisync-lib/src/comm/message_processor.rs
Normal file
27
ubisync-lib/src/comm/message_processor.rs
Normal file
|
@ -0,0 +1,27 @@
|
|||
use tracing::debug;
|
||||
|
||||
use crate::state::{types::PeerId, CommState};
|
||||
|
||||
use super::messages::{Message, MessageContent};
|
||||
|
||||
pub fn handle(state: &CommState, peer: &PeerId, message: Message) {
|
||||
debug!("Handling message now: {:?}", message);
|
||||
match message.content() {
|
||||
MessageContent::Hello { peer_name } => {
|
||||
state.set_peer(peer, peer_name).expect("State failed");
|
||||
}
|
||||
MessageContent::CreateElement { id, content } => {
|
||||
state
|
||||
.add_received_element(id, content, message.id())
|
||||
.expect("State failed");
|
||||
}
|
||||
MessageContent::SetElement { id, content } => {
|
||||
state
|
||||
.update_element_content(id, content, message.id())
|
||||
.expect("State failed");
|
||||
}
|
||||
MessageContent::RemoveElement { id } => {
|
||||
state.remove_element(id).expect("State failed");
|
||||
}
|
||||
}
|
||||
}
|
55
ubisync-lib/src/comm/messages/mod.rs
Normal file
55
ubisync-lib/src/comm/messages/mod.rs
Normal file
|
@ -0,0 +1,55 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::state::types::{ElementContent, ElementId, MessageId};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct Message {
|
||||
id: MessageId,
|
||||
signature: MessageSignature,
|
||||
content: MessageContent,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum MessageContent {
|
||||
Hello {
|
||||
peer_name: String,
|
||||
},
|
||||
CreateElement {
|
||||
id: ElementId,
|
||||
content: ElementContent,
|
||||
},
|
||||
SetElement {
|
||||
id: ElementId,
|
||||
content: ElementContent,
|
||||
},
|
||||
RemoveElement {
|
||||
id: ElementId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct MessageSignature {}
|
||||
|
||||
impl Message {
|
||||
pub fn new(content: MessageContent) -> Self {
|
||||
Message {
|
||||
id: MessageId::new(),
|
||||
signature: MessageSignature {},
|
||||
content: content,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn content(&self) -> &MessageContent {
|
||||
&self.content
|
||||
}
|
||||
|
||||
pub fn id(&self) -> &MessageId {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageSignature {
|
||||
pub fn verify(&self, _content: &MessageContent) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
265
ubisync-lib/src/comm/mod.rs
Normal file
265
ubisync-lib/src/comm/mod.rs
Normal file
|
@ -0,0 +1,265 @@
|
|||
pub mod message_processor;
|
||||
pub mod messages;
|
||||
mod types;
|
||||
use tracing::{debug, error, warn};
|
||||
pub use types::*;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Read, Write};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use i2p::net::{I2pAddr, I2pListener, I2pListenerBuilder, I2pSocketAddr, I2pStream};
|
||||
use i2p::sam_options::SAMOptions;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::state::types::PeerId;
|
||||
use crate::state::CommState;
|
||||
use crate::Config;
|
||||
|
||||
use self::messages::Message;
|
||||
|
||||
pub struct CommHandle {
|
||||
state: Arc<CommState>,
|
||||
i2p_server: Arc<I2pListener>,
|
||||
// Maps peer addresses to existing connections to them
|
||||
clients: Arc<RwLock<HashMap<I2pSocketAddr, Arc<RwLock<I2pStream>>>>>,
|
||||
thread: RwLock<Option<JoinHandle<()>>>,
|
||||
}
|
||||
|
||||
impl CommHandle {
|
||||
pub fn new(state: CommState, config: &Config) -> anyhow::Result<Self> {
|
||||
let mut listener_builder =
|
||||
I2pListenerBuilder::default().with_options(SAMOptions::default());
|
||||
|
||||
if let Some(privkey) = &config.i2p_private_key {
|
||||
listener_builder = listener_builder.with_private_key(privkey.to_string());
|
||||
}
|
||||
|
||||
let listener = listener_builder.build().unwrap();
|
||||
|
||||
Ok(CommHandle {
|
||||
state: Arc::new(state),
|
||||
i2p_server: Arc::new(listener),
|
||||
clients: Default::default(),
|
||||
thread: RwLock::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(&self) {
|
||||
debug!("CommHandle is running now");
|
||||
let state = self.state.clone();
|
||||
let i2p_server = self.i2p_server.clone();
|
||||
let clients = self.clients.clone();
|
||||
let mut thread_writeguard = self.thread.write().await;
|
||||
*thread_writeguard = Some(tokio::spawn(async move {
|
||||
for incoming in i2p_server.incoming() {
|
||||
if let Ok(stream) = incoming {
|
||||
if let Ok(addr) = stream.peer_addr() {
|
||||
// First, save a reference to the new stream in `clients` for later reuse
|
||||
let wrapped_stream = Arc::new(RwLock::new(stream));
|
||||
clients.write().await.insert(addr, wrapped_stream.clone());
|
||||
// Reference to state to be passed to `read_connection()`
|
||||
let state_arc = state.clone();
|
||||
|
||||
// Spawn a blocking task, which (in read_connection) will spawn a non-blocking task
|
||||
// The "outer" blocking task exists, because the for loop's iterator will block until
|
||||
// there is another stream - thus, the existing streams will not be read.
|
||||
// `spawn_blocking` moves the reading task to a special pool of tasks which are
|
||||
// executed _despite_ other tasks blocking for something.
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Self::read_connection(wrapped_stream, state_arc)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
pub async fn stop(self) {
|
||||
if let Some(t) = self.thread.read().await.as_ref() {
|
||||
t.abort();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn broadcast(&self, msg: Message) -> anyhow::Result<()> {
|
||||
match serde_json::to_string(&msg) {
|
||||
Ok(msg_string) => {
|
||||
for peer in self.state.get_peers().unwrap() {
|
||||
debug!("Sending to peer '{:?}' message '{:?}'", &peer, &msg);
|
||||
if let Err(e) = self.send_to_addr(&peer.addr(), msg_string.as_bytes()).await {
|
||||
debug!(
|
||||
"Failed to send message.\nError: {:?}\nMessage: {:?}",
|
||||
e, &msg
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
|
||||
match serde_json::to_string(&msg) {
|
||||
Ok(msg_string) => {
|
||||
self.send_to_addr(dest, msg_string.as_bytes()).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_to_addr(&self, addr: &I2pSocketAddr, msg: &[u8]) -> anyhow::Result<()> {
|
||||
// Create client for this connection if necessary
|
||||
if !self.clients.read().await.contains_key(addr) {
|
||||
match I2pStream::connect(addr) {
|
||||
Ok(client) => {
|
||||
//client.inner.sam.conn.set_nodelay(true)?;
|
||||
//client.inner.sam.conn.set_nonblocking(false)?;
|
||||
self.clients
|
||||
.write()
|
||||
.await
|
||||
.insert(addr.clone(), Arc::new(RwLock::new(client)));
|
||||
}
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch current client for this connection from clients map, and send the message
|
||||
if let Some(client) = self.clients.read().await.get(&addr) {
|
||||
let mut writeguard = client.write().await;
|
||||
match writeguard.write_all(msg) {
|
||||
Ok(_) => {
|
||||
writeguard.flush()?;
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error writing to stream: {}", e)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(anyhow::Error::msg(
|
||||
"No client found despite trying to add one beforehand.",
|
||||
));
|
||||
}
|
||||
self.clients.write().await.remove(&addr);
|
||||
Err(anyhow::Error::msg(
|
||||
"Failed to send anything, most likely the stream was broken and has been removed",
|
||||
))
|
||||
}
|
||||
|
||||
pub fn i2p_address(&self) -> anyhow::Result<I2pSocketAddr> {
|
||||
match self.i2p_server.local_addr() {
|
||||
Ok(addr) => Ok(addr),
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn i2p_b32_address(&self) -> anyhow::Result<I2pSocketAddr> {
|
||||
let mut i2p_dest = self.i2p_address()?;
|
||||
i2p_dest.set_dest(I2pAddr::from_b64(&i2p_dest.dest().string()).unwrap());
|
||||
Ok(i2p_dest)
|
||||
}
|
||||
|
||||
fn read_connection(
|
||||
wrapped_stream: Arc<RwLock<I2pStream>>,
|
||||
state: Arc<CommState>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut stream = wrapped_stream.write().await;
|
||||
let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into();
|
||||
|
||||
// All streams start with a \n byte which does not belong to the payload, take that from the stream.
|
||||
if let Err(e) = stream.read(&mut [0; 1]) {
|
||||
error!("Error while reading first byte of stream: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
let iterator = serde_json::Deserializer::from_reader(&mut *stream)
|
||||
.into_iter::<serde_json::Value>();
|
||||
for item in iterator {
|
||||
match item {
|
||||
Ok(value) => match serde_json::from_value::<Message>(value) {
|
||||
Ok(message) => {
|
||||
message_processor::handle(state.deref(), &peer, message);
|
||||
}
|
||||
Err(e) => warn!("Deserialization failed: {:?}", e),
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("Deserialization failed: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use i2p::net::I2pListener;
|
||||
use i2p::sam::StreamForward;
|
||||
use i2p::sam_options::SAMOptions;
|
||||
use i2p::Session;
|
||||
|
||||
use crate::comm::{messages, Message};
|
||||
use crate::state::types::ElementId;
|
||||
use crate::state::{CommState, State};
|
||||
use crate::Config;
|
||||
|
||||
use super::CommHandle;
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
pub async fn msg() {
|
||||
let ch = CommHandle::new(
|
||||
CommState::new(State::new().await.unwrap()),
|
||||
&Config::default(),
|
||||
)
|
||||
.unwrap();
|
||||
ch.run().await;
|
||||
println!("My address: {:?}", ch.i2p_b32_address());
|
||||
|
||||
ch.send(
|
||||
&ch.i2p_address().unwrap(),
|
||||
Message::new(messages::MessageContent::Hello {
|
||||
peer_name: "a".to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("Could not send hello");
|
||||
for i in 0..10 {
|
||||
let result = ch
|
||||
.send(
|
||||
&ch.i2p_address().unwrap(),
|
||||
Message::new(messages::MessageContent::CreateElement {
|
||||
id: ElementId::new(),
|
||||
content: crate::state::types::ElementContent::Text(format!(
|
||||
"hello world no. {}",
|
||||
i
|
||||
)),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
println!("Result of sending: {:?}", result);
|
||||
}
|
||||
|
||||
ch.stop().await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn from_privkey() {
|
||||
let privkey = "DPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gVKUHq0huLwfh0u06PlujRXTgcUJw9pg4Vkh-e0CGQFL6yn2FxCUIvaryyFt3-8mwO1OTkQyB7u1rnO9FpLlKeT9FPSkwmaxZmwQ1kvsuTTIp5ntxQZ1XMCDm2qhRWdcEsYxTKLJIMYxN1Ujk9Y7SqNYORmxrwQWC4ENGnt~VyvbAAAAfAabqgU0GhMWN2syDQ5sYZ61WXDqC4esasxwyLvJ-ES7~k40Uq9htc8t16-RXEq0Q17C499WxW6~GQRcXbgBNd0bMdV-46RsFo1jNgfB6H4nkuTrQXMqXB6s2Fhx2gwcHRk3Lt5DE4N0mvHG8Po974tJWr1hIRiSxQUtSj5kcZOOT~EKWMoCA7qDgZORZAnJavaRr0S-PiPQwAw8HOekdw50CyOByxXEfLBAi-Kz1nhdNvMHIrtcBZ~RpsxOK63O633e0PeYwrOOG7AFVLh7SzdwVvI1-KUe7y2ADBcoHuJRMwk5MEV-BATEfhWA2SzWw1qFRzJyb-pGbgGCJQOoc1YcP8jikBJhtuRbD5K-wK5MXoHL";
|
||||
let _ = I2pListener {
|
||||
forward: StreamForward::with_session(
|
||||
&Session::from_destination(i2p::sam::DEFAULT_API, &privkey, SAMOptions::default())
|
||||
.expect("Failed to create session for listener2"),
|
||||
)
|
||||
.expect("Failed to create StreamForward for listener2"),
|
||||
};
|
||||
}
|
||||
}
|
54
ubisync-lib/src/comm/types.rs
Normal file
54
ubisync-lib/src/comm/types.rs
Normal file
|
@ -0,0 +1,54 @@
|
|||
use anyhow::bail;
|
||||
use i2p::net::I2pSocketAddr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::state::types::PeerId;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct Peer {
|
||||
id: PeerId,
|
||||
name: String,
|
||||
family: Vec<PeerId>,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
pub fn new(id: PeerId, name: String) -> Self {
|
||||
Peer {
|
||||
id: id,
|
||||
name: name,
|
||||
family: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn addr(&self) -> I2pSocketAddr {
|
||||
self.id.addr()
|
||||
}
|
||||
|
||||
pub fn id(&self) -> PeerId {
|
||||
self.id.clone()
|
||||
}
|
||||
|
||||
pub fn name(&self) -> String {
|
||||
self.name.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for Peer {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||
match serde_json::from_str(value) {
|
||||
Ok(p) => Ok(p),
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<String> for Peer {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: String) -> Result<Self, Self::Error> {
|
||||
Self::try_from(value.as_str())
|
||||
}
|
||||
}
|
35
ubisync-lib/src/config.rs
Normal file
35
ubisync-lib/src/config.rs
Normal file
|
@ -0,0 +1,35 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Config {
|
||||
pub i2p_private_key: Option<String>,
|
||||
pub api_config: ApiConfig,
|
||||
pub jwt_secret: String,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Config {
|
||||
i2p_private_key: None,
|
||||
api_config: Default::default(),
|
||||
jwt_secret: "insecuresecret".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct ApiConfig {
|
||||
pub ip: Option<String>,
|
||||
pub port: Option<u16>,
|
||||
pub version: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for ApiConfig {
|
||||
fn default() -> Self {
|
||||
ApiConfig {
|
||||
ip: None,
|
||||
port: None,
|
||||
version: None,
|
||||
}
|
||||
}
|
||||
}
|
84
ubisync-lib/src/lib.rs
Normal file
84
ubisync-lib/src/lib.rs
Normal file
|
@ -0,0 +1,84 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use api::{Api, ApiBuilder};
|
||||
use comm::{CommHandle, Peer};
|
||||
use config::Config;
|
||||
use i2p::net::I2pSocketAddr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use state::{
|
||||
types::{MessageId, PeerId},
|
||||
ApiState, CommState, State,
|
||||
};
|
||||
|
||||
pub mod api;
|
||||
pub mod comm;
|
||||
pub mod config;
|
||||
pub mod state;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct MessageRelations {
|
||||
pub parents: Vec<MessageId>,
|
||||
}
|
||||
|
||||
pub struct Ubisync {
|
||||
comm_handle: Arc<CommHandle>,
|
||||
state_handle: Arc<State>,
|
||||
api: Arc<Api>,
|
||||
}
|
||||
|
||||
impl Ubisync {
|
||||
pub async fn new(config: &Config) -> anyhow::Result<Self> {
|
||||
let state = State::new().await?;
|
||||
let comm_handle = Arc::new(CommHandle::new(CommState::new(state.clone()), config)?);
|
||||
state.set_comm_handle(comm_handle.clone());
|
||||
|
||||
let api = Arc::new(
|
||||
ApiBuilder::from(config.api_config.clone())
|
||||
.build(ApiState::new(state.clone(), &config.jwt_secret))
|
||||
.await,
|
||||
);
|
||||
|
||||
comm_handle.run().await;
|
||||
Ok(Ubisync {
|
||||
comm_handle: comm_handle,
|
||||
state_handle: state,
|
||||
api: api,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn api(&self) -> Arc<Api> {
|
||||
self.api.clone()
|
||||
}
|
||||
|
||||
pub fn add_peer(&self, p: impl TryInto<Peer, Error = anyhow::Error>) -> anyhow::Result<()> {
|
||||
match p.try_into() {
|
||||
Ok(peer) => self.state_handle.set_peer(&peer),
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_peer_from_id(&self, id: PeerId) -> anyhow::Result<()> {
|
||||
// TODO: resolve peer's name before setting
|
||||
self.state_handle.set_peer(&Peer::new(id, "".to_string()))
|
||||
}
|
||||
|
||||
pub fn get_peers(&self) -> Vec<Peer> {
|
||||
self.state_handle.get_peers().unwrap_or(vec![])
|
||||
}
|
||||
|
||||
pub fn get_destination(&self) -> anyhow::Result<I2pSocketAddr> {
|
||||
self.comm_handle.i2p_address()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn full_system_test1() {
|
||||
// Do a test requiring a specific system state (including homeserver state or such)
|
||||
// Disabled by default, since files need to be set up explicitly for this test
|
||||
todo!()
|
||||
}
|
||||
}
|
152
ubisync-lib/src/state/api_state.rs
Normal file
152
ubisync-lib/src/state/api_state.rs
Normal file
|
@ -0,0 +1,152 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use chrono::Utc;
|
||||
use cozo::DbInstance;
|
||||
use jsonwebtoken::{DecodingKey, EncodingKey, Validation};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{
|
||||
api::v0::app::{AppDescription, AppId},
|
||||
comm::messages::MessageContent,
|
||||
state::{queries, types::ElementId},
|
||||
};
|
||||
|
||||
use super::{
|
||||
types::{Element, ElementContent},
|
||||
State,
|
||||
};
|
||||
|
||||
pub struct ApiState {
|
||||
state: Arc<State>,
|
||||
jwt_encoding_key: EncodingKey,
|
||||
jwt_decoding_key: DecodingKey,
|
||||
jwt_validation: Validation,
|
||||
}
|
||||
|
||||
impl ApiState {
|
||||
pub fn new(state: Arc<State>, jwt_secret: &str) -> Self {
|
||||
let mut validation = Validation::default();
|
||||
validation.set_required_spec_claims(&vec!["sub"]);
|
||||
ApiState {
|
||||
state: state,
|
||||
jwt_encoding_key: EncodingKey::from_secret(jwt_secret.as_bytes()),
|
||||
jwt_decoding_key: DecodingKey::from_secret(jwt_secret.as_bytes()),
|
||||
jwt_validation: validation,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_app(&self, description: &AppDescription) -> anyhow::Result<AppId> {
|
||||
let id = AppId::new();
|
||||
let last_access = Utc::now();
|
||||
queries::apps::add(
|
||||
self.db(),
|
||||
&id,
|
||||
&last_access,
|
||||
&description.name,
|
||||
&description.desc_text,
|
||||
)?;
|
||||
debug!("Successfully added app");
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub fn app_exists(&self, id: &AppId) -> anyhow::Result<bool> {
|
||||
queries::apps::exists(self.db(), id)
|
||||
}
|
||||
|
||||
pub fn create_element(&self, content: &ElementContent) -> anyhow::Result<ElementId> {
|
||||
let id = ElementId::new();
|
||||
queries::elements::add(self.db(), &id, &content, None, true)?;
|
||||
debug!("Added element {{{}}}", &id.to_string());
|
||||
|
||||
self.state.send_to_peers(MessageContent::CreateElement {
|
||||
id: id.clone(),
|
||||
content: content.clone(),
|
||||
});
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub fn write_element_content(
|
||||
&self,
|
||||
id: &ElementId,
|
||||
content: &ElementContent,
|
||||
) -> anyhow::Result<()> {
|
||||
queries::elements::set_content(self.db(), id, content)?;
|
||||
queries::elements::set_local_changes(self.db(), id, true)?;
|
||||
debug!("Wrote element content {{{}}}", &id.to_string());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove_element(&self, id: &ElementId) -> anyhow::Result<()> {
|
||||
let res = self.state.remove_element(id);
|
||||
debug!("Removed element {{{}}}", &id.to_string());
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn get_element(&self, id: &ElementId) -> anyhow::Result<Element> {
|
||||
self.state.get_element(id)
|
||||
}
|
||||
|
||||
fn db(&self) -> &DbInstance {
|
||||
&self.state.db
|
||||
}
|
||||
|
||||
pub fn jwt_encoding_key(&self) -> &EncodingKey {
|
||||
&self.jwt_encoding_key
|
||||
}
|
||||
|
||||
pub fn jwt_decoding_key(&self) -> &DecodingKey {
|
||||
&self.jwt_decoding_key
|
||||
}
|
||||
|
||||
pub fn jwt_validation(&self) -> &Validation {
|
||||
&self.jwt_validation
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ApiState;
|
||||
use crate::state::{types::ElementContent, State};
|
||||
|
||||
#[tokio::test]
|
||||
#[serial_test::serial]
|
||||
async fn test_element_create() {
|
||||
tracing_subscriber::fmt().pretty().init();
|
||||
let state = ApiState::new(
|
||||
State::new().await.unwrap(),
|
||||
"abcdabcdabcdabcdabcdabcdabcdabcd",
|
||||
);
|
||||
let id = state
|
||||
.create_element(&ElementContent::Text("Test-text".to_string()))
|
||||
.unwrap();
|
||||
let el = state.get_element(&id).unwrap();
|
||||
assert_eq!(
|
||||
ElementContent::Text("Test-text".to_string()),
|
||||
el.content().to_owned()
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial_test::serial]
|
||||
async fn test_element_write() {
|
||||
tracing_subscriber::fmt().pretty().init();
|
||||
let state = ApiState::new(
|
||||
State::new().await.unwrap(),
|
||||
"abcdabcdabcdabcdabcdabcdabcdabcd",
|
||||
);
|
||||
let id = state
|
||||
.create_element(&ElementContent::Text("Test-text".to_string()))
|
||||
.unwrap();
|
||||
state
|
||||
.write_element_content(&id, &ElementContent::Text("Test-text 2".to_string()))
|
||||
.unwrap();
|
||||
let el = state.get_element(&id).unwrap();
|
||||
assert_eq!(
|
||||
ElementContent::Text("Test-text 2".to_string()),
|
||||
el.content().to_owned()
|
||||
)
|
||||
}
|
||||
}
|
136
ubisync-lib/src/state/comm_state.rs
Normal file
136
ubisync-lib/src/state/comm_state.rs
Normal file
|
@ -0,0 +1,136 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use cozo::DbInstance;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{comm::Peer, state::queries};
|
||||
|
||||
use super::{
|
||||
types::{Element, ElementContent, ElementId, MessageId, PeerId},
|
||||
State,
|
||||
};
|
||||
|
||||
//TODO: Notify API about changes
|
||||
pub struct CommState {
|
||||
state: Arc<State>,
|
||||
}
|
||||
|
||||
impl CommState {
|
||||
pub fn new(state: Arc<State>) -> Self {
|
||||
CommState { state: state }
|
||||
}
|
||||
|
||||
pub fn add_received_element(
|
||||
&self,
|
||||
id: &ElementId,
|
||||
content: &ElementContent,
|
||||
latest_message: &MessageId,
|
||||
) -> anyhow::Result<()> {
|
||||
queries::elements::add(
|
||||
self.db(),
|
||||
&id,
|
||||
&content,
|
||||
Some(latest_message.to_owned()),
|
||||
false,
|
||||
)?;
|
||||
debug!("Added element {{{}}}", &id.to_string());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_element_content(
|
||||
&self,
|
||||
id: &ElementId,
|
||||
content: &ElementContent,
|
||||
latest_message: &MessageId,
|
||||
) -> anyhow::Result<()> {
|
||||
//TODO: resolve potential conflicts with local changes
|
||||
queries::elements::set_content(self.db(), id, content)?;
|
||||
queries::elements::set_latest_message(self.db(), id, Some(latest_message.to_owned()))?;
|
||||
debug!("Updated element {{{}}}", &id.to_string());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove_element(&self, id: &ElementId) -> anyhow::Result<()> {
|
||||
let res = self.state.remove_element(id);
|
||||
debug!("Removed element {{{}}}", &id.to_string());
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn get_element(&self, id: &ElementId) -> anyhow::Result<Element> {
|
||||
self.state.get_element(id)
|
||||
}
|
||||
|
||||
pub fn set_peer(&self, id: &PeerId, name: &str) -> anyhow::Result<()> {
|
||||
queries::peers::put(self.db(), id, name)?;
|
||||
debug!("Set peer {{{}}}", id.to_string());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_peers(&self) -> anyhow::Result<Vec<Peer>> {
|
||||
self.state.get_peers()
|
||||
}
|
||||
|
||||
fn db(&self) -> &DbInstance {
|
||||
&self.state.db
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::CommState;
|
||||
use crate::state::{
|
||||
types::{ElementContent, ElementId, MessageId},
|
||||
State,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
#[serial_test::serial]
|
||||
async fn test_element_add() {
|
||||
tracing_subscriber::fmt().pretty().init();
|
||||
let state = CommState::new(State::new().await.unwrap());
|
||||
let id = ElementId::new();
|
||||
state
|
||||
.add_received_element(
|
||||
&id,
|
||||
&ElementContent::Text("Test-text".to_string()),
|
||||
&MessageId::new(),
|
||||
)
|
||||
.unwrap();
|
||||
let el = state.get_element(&id).unwrap();
|
||||
assert_eq!(
|
||||
ElementContent::Text("Test-text".to_string()),
|
||||
el.content().to_owned()
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial_test::serial]
|
||||
async fn test_element_update() {
|
||||
tracing_subscriber::fmt().pretty().init();
|
||||
let state = CommState::new(State::new().await.unwrap());
|
||||
let id = ElementId::new();
|
||||
state
|
||||
.add_received_element(
|
||||
&id,
|
||||
&ElementContent::Text("Test-text".to_string()),
|
||||
&MessageId::new(),
|
||||
)
|
||||
.unwrap();
|
||||
state
|
||||
.update_element_content(
|
||||
&id,
|
||||
&ElementContent::Text("Test-text 2".to_string()),
|
||||
&MessageId::new(),
|
||||
)
|
||||
.unwrap();
|
||||
let el = state.get_element(&id).unwrap();
|
||||
assert_eq!(
|
||||
ElementContent::Text("Test-text 2".to_string()),
|
||||
el.content().to_owned()
|
||||
)
|
||||
}
|
||||
}
|
114
ubisync-lib/src/state/mod.rs
Normal file
114
ubisync-lib/src/state/mod.rs
Normal file
|
@ -0,0 +1,114 @@
|
|||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use anyhow::Error;
|
||||
use cozo::DbInstance;
|
||||
use tracing::{debug, error};
|
||||
|
||||
use crate::comm::{
|
||||
messages::{Message, MessageContent},
|
||||
CommHandle, Peer,
|
||||
};
|
||||
|
||||
use self::types::{Element, ElementContent, ElementId, Tag};
|
||||
|
||||
pub mod types;
|
||||
|
||||
mod api_state;
|
||||
mod comm_state;
|
||||
mod queries;
|
||||
mod schema;
|
||||
|
||||
pub use api_state::ApiState;
|
||||
pub use comm_state::CommState;
|
||||
|
||||
pub struct State {
|
||||
db: DbInstance,
|
||||
comm_handle: RwLock<Option<Arc<CommHandle>>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub async fn new() -> anyhow::Result<Arc<State>> {
|
||||
let db = DbInstance::new("mem", "", Default::default());
|
||||
match db {
|
||||
Ok(d) => {
|
||||
schema::add_schema(&d)?;
|
||||
Ok(Arc::new(State {
|
||||
db: d,
|
||||
comm_handle: RwLock::new(None),
|
||||
}))
|
||||
}
|
||||
Err(e) => Err(Error::msg(format!("{:?}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_comm_handle(&self, handle: Arc<CommHandle>) {
|
||||
*self
|
||||
.comm_handle
|
||||
.write()
|
||||
.as_deref_mut()
|
||||
.expect("Could not set state's CommHandle") = Some(handle);
|
||||
}
|
||||
|
||||
pub fn set_element_content(
|
||||
&self,
|
||||
element_id: &ElementId,
|
||||
content: &ElementContent,
|
||||
) -> anyhow::Result<()> {
|
||||
let res = queries::elements::set_content(&self.db, element_id, content);
|
||||
debug!(
|
||||
"Set content of element with id {:?}: {:?}",
|
||||
element_id,
|
||||
self.get_element(element_id)
|
||||
);
|
||||
|
||||
self.send_to_peers(MessageContent::SetElement {
|
||||
id: element_id.clone(),
|
||||
content: content.clone(),
|
||||
});
|
||||
res
|
||||
}
|
||||
|
||||
pub fn remove_element(&self, element_id: &ElementId) -> anyhow::Result<()> {
|
||||
let res = queries::elements::remove(&self.db, element_id);
|
||||
|
||||
self.send_to_peers(MessageContent::RemoveElement {
|
||||
id: element_id.clone(),
|
||||
});
|
||||
res
|
||||
}
|
||||
|
||||
pub fn get_element(&self, id: &ElementId) -> anyhow::Result<Element> {
|
||||
queries::elements::get(&self.db, id)
|
||||
}
|
||||
|
||||
pub fn get_elements_by_tag(&self, tag: &Tag) -> Vec<ElementId> {
|
||||
queries::elements::get_by_tag(&self.db, tag)
|
||||
.map_err(|e| {
|
||||
error!("{}", e);
|
||||
e
|
||||
})
|
||||
.unwrap_or(vec![])
|
||||
}
|
||||
|
||||
pub fn set_peer(&self, peer: &Peer) -> anyhow::Result<()> {
|
||||
queries::peers::put(&self.db, &peer.id(), &peer.name())
|
||||
}
|
||||
|
||||
pub fn get_peers(&self) -> anyhow::Result<Vec<Peer>> {
|
||||
queries::peers::get(&self.db)
|
||||
}
|
||||
|
||||
fn send_to_peers(&self, ct: MessageContent) {
|
||||
match self.comm_handle.read() {
|
||||
Ok(opt) => {
|
||||
if opt.is_some() {
|
||||
let arc = opt.as_ref().unwrap().clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = arc.broadcast(Message::new(ct)).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(e) => debug!("{}", e),
|
||||
}
|
||||
}
|
||||
}
|
58
ubisync-lib/src/state/queries/apps.rs
Normal file
58
ubisync-lib/src/state/queries/apps.rs
Normal file
|
@ -0,0 +1,58 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use anyhow::{bail, Error};
|
||||
use chrono::{DateTime, Utc};
|
||||
use cozo::{DataValue, DbInstance, Num};
|
||||
|
||||
use crate::{api::v0::app::AppId, run_query};
|
||||
|
||||
pub fn add(
|
||||
db: &DbInstance,
|
||||
id: &AppId,
|
||||
last_access: &DateTime<Utc>,
|
||||
name: &str,
|
||||
description: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let params = vec![
|
||||
(
|
||||
"id",
|
||||
DataValue::Str(serde_json::to_string(&id).unwrap().into()),
|
||||
),
|
||||
(
|
||||
"last_access",
|
||||
DataValue::Num(Num::Int(last_access.timestamp())),
|
||||
),
|
||||
("name", DataValue::Str(name.into())),
|
||||
("description", DataValue::Str(description.into())),
|
||||
];
|
||||
|
||||
match run_query!(
|
||||
&db,
|
||||
":insert apps {id => last_access, name, description}",
|
||||
params,
|
||||
cozo::ScriptMutability::Mutable
|
||||
) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(report) => bail!(report),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exists(db: &DbInstance, id: &AppId) -> anyhow::Result<bool> {
|
||||
let mut params = BTreeMap::new();
|
||||
params.insert(
|
||||
"id".to_string(),
|
||||
DataValue::Str(serde_json::to_string(&id)?.into()),
|
||||
);
|
||||
|
||||
let result = db.run_script(
|
||||
"?[name] := *apps[$id, last_access, name, description]",
|
||||
params,
|
||||
cozo::ScriptMutability::Immutable,
|
||||
);
|
||||
|
||||
if let Ok(rows) = result {
|
||||
return Ok(rows.rows.len() == 1);
|
||||
}
|
||||
|
||||
Err(Error::msg("Could not check whether app is registered"))
|
||||
}
|
190
ubisync-lib/src/state/queries/elements.rs
Normal file
190
ubisync-lib/src/state/queries/elements.rs
Normal file
|
@ -0,0 +1,190 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use anyhow::{bail, Error};
|
||||
use cozo::{DataValue, DbInstance, JsonData, ScriptMutability};
|
||||
use serde_json::Value;
|
||||
use tracing::{debug, error};
|
||||
|
||||
use crate::{
|
||||
run_query,
|
||||
state::{
|
||||
types::{MessageId, Tag},
|
||||
Element, ElementContent, ElementId,
|
||||
},
|
||||
};
|
||||
|
||||
pub fn add(
|
||||
db: &DbInstance,
|
||||
id: &ElementId,
|
||||
content: &ElementContent,
|
||||
latest_message: Option<MessageId>,
|
||||
local_changes: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let params = vec![
|
||||
("id", DataValue::Str(serde_json::to_string(&id)?.into())),
|
||||
(
|
||||
"content",
|
||||
DataValue::Json(JsonData(serde_json::to_value(content)?)),
|
||||
),
|
||||
(
|
||||
"latest_message",
|
||||
match latest_message {
|
||||
Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()),
|
||||
None => DataValue::Null,
|
||||
},
|
||||
),
|
||||
("local_changes", DataValue::Bool(local_changes)),
|
||||
];
|
||||
|
||||
match run_query!(
|
||||
&db,
|
||||
":insert elements {id => content, latest_message, local_changes}",
|
||||
params,
|
||||
ScriptMutability::Mutable
|
||||
) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(report) => bail!(report),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_content(
|
||||
db: &DbInstance,
|
||||
id: &ElementId,
|
||||
content: &ElementContent,
|
||||
) -> anyhow::Result<()> {
|
||||
set_property(
|
||||
db,
|
||||
id,
|
||||
"content",
|
||||
DataValue::Json(JsonData(serde_json::to_value(content)?)),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn set_latest_message(
|
||||
db: &DbInstance,
|
||||
id: &ElementId,
|
||||
latest_message: Option<MessageId>,
|
||||
) -> anyhow::Result<()> {
|
||||
set_property(
|
||||
db,
|
||||
id,
|
||||
"latest_message",
|
||||
match latest_message {
|
||||
Some(m) => DataValue::Str(serde_json::to_string(&m)?.into()),
|
||||
None => DataValue::Null,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn set_local_changes(
|
||||
db: &DbInstance,
|
||||
id: &ElementId,
|
||||
local_changes: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
set_property(db, id, "local_changes", DataValue::Bool(local_changes))
|
||||
}
|
||||
|
||||
pub fn remove(db: &DbInstance, id: &ElementId) -> anyhow::Result<()> {
|
||||
match run_query!(
|
||||
&db,
|
||||
":delete elements {id}",
|
||||
vec![("id", DataValue::Str(serde_json::to_string(&id)?.into()))],
|
||||
cozo::ScriptMutability::Mutable
|
||||
) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(report) => bail!(report),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(db: &DbInstance, id: &ElementId) -> anyhow::Result<Element> {
|
||||
let mut params = BTreeMap::new();
|
||||
params.insert(
|
||||
"id".to_string(),
|
||||
DataValue::Str(serde_json::to_string(&id)?.into()),
|
||||
);
|
||||
|
||||
let result = db.run_script("
|
||||
?[content, latest_message, local_changes] := *elements[$id, content, latest_message, local_changes]
|
||||
", params, cozo::ScriptMutability::Immutable);
|
||||
match result {
|
||||
Ok(val) => {
|
||||
if let Some(firstrow) = val.rows.first() {
|
||||
debug!("db result: {:?}", &firstrow.as_slice());
|
||||
if let [DataValue::Json(JsonData(content)), latest_message, DataValue::Bool(local_changes)] =
|
||||
firstrow.as_slice()
|
||||
{
|
||||
return Ok(Element::from((
|
||||
id.to_owned(),
|
||||
serde_json::from_value(content.to_owned())?,
|
||||
match latest_message {
|
||||
DataValue::Str(s) => Some(serde_json::from_str(s)?),
|
||||
_ => None,
|
||||
},
|
||||
local_changes.to_owned(),
|
||||
)));
|
||||
}
|
||||
return Err(Error::msg("Could not parse db result as Element"));
|
||||
} else {
|
||||
return Err(Error::msg("No rows returned for element query"));
|
||||
}
|
||||
}
|
||||
Err(report) => bail!(report),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_by_tag(db: &DbInstance, tag: &Tag) -> anyhow::Result<Vec<ElementId>> {
|
||||
let mut params = BTreeMap::new();
|
||||
params.insert(
|
||||
"tag".to_string(),
|
||||
DataValue::Str(serde_json::to_string(tag)?.into()),
|
||||
);
|
||||
|
||||
let result = db.run_script(
|
||||
"
|
||||
?[element] := *tags[$tag, element]
|
||||
",
|
||||
params,
|
||||
cozo::ScriptMutability::Immutable,
|
||||
);
|
||||
|
||||
match result {
|
||||
Ok(named_rows) => {
|
||||
let mut element_ids = vec![];
|
||||
for row in named_rows.rows {
|
||||
if let [DataValue::Json(JsonData(Value::String(element_id)))] = row.as_slice() {
|
||||
match serde_json::from_str(&element_id) {
|
||||
Ok(id) => element_ids.push(id),
|
||||
Err(e) => {
|
||||
error!("Error parsing element id {}: {}", element_id, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(element_ids)
|
||||
}
|
||||
Err(report) => bail!(report),
|
||||
}
|
||||
}
|
||||
|
||||
fn set_property(
|
||||
db: &DbInstance,
|
||||
id: &ElementId,
|
||||
key: &str,
|
||||
value: DataValue,
|
||||
) -> anyhow::Result<()> {
|
||||
let params = vec![
|
||||
("id", DataValue::Str(serde_json::to_string(id)?.into())),
|
||||
(key, value),
|
||||
];
|
||||
|
||||
match run_query!(
|
||||
&db,
|
||||
format!(":update elements {{id => {key}}}"),
|
||||
params,
|
||||
ScriptMutability::Mutable
|
||||
) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(report) => bail!(report),
|
||||
}
|
||||
}
|
50
ubisync-lib/src/state/queries/mod.rs
Normal file
50
ubisync-lib/src/state/queries/mod.rs
Normal file
|
@ -0,0 +1,50 @@
|
|||
pub mod apps;
|
||||
pub mod elements;
|
||||
pub mod peers;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! build_query {
|
||||
($payload:expr, $params:expr) => {{
|
||||
use cozo::DataValue;
|
||||
use std::collections::BTreeMap;
|
||||
// Build parameters map
|
||||
let mut params_map: BTreeMap<String, DataValue> = Default::default();
|
||||
let mut parameters_init = String::new();
|
||||
|
||||
if $params.len() > 0 {
|
||||
for (name, value) in $params {
|
||||
let _: &str = name; // only for type annotation
|
||||
params_map.insert(name.to_string(), value);
|
||||
}
|
||||
|
||||
// First line: Initialize parameters, make them available in CozoScript
|
||||
use itertools::Itertools;
|
||||
parameters_init += "?[";
|
||||
parameters_init += ¶ms_map
|
||||
.iter()
|
||||
.map(|(name, _)| name)
|
||||
.format(", ")
|
||||
.to_string();
|
||||
parameters_init += "] <- [[";
|
||||
parameters_init += ¶ms_map
|
||||
.iter()
|
||||
.map(|(name, _)| format!("${}", name))
|
||||
.format(", ")
|
||||
.to_string();
|
||||
parameters_init += "]]";
|
||||
}
|
||||
|
||||
// Return query string and parameters map
|
||||
(format!("{}\n\n{}", parameters_init, $payload), params_map)
|
||||
}};
|
||||
}
|
||||
|
||||
use build_query;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! run_query {
|
||||
($db:expr, $payload:expr, $params:expr, $mutability:expr) => {{
|
||||
let (query, parameters) = crate::state::queries::build_query!($payload, $params);
|
||||
$db.run_script(query.as_str(), parameters, $mutability)
|
||||
}};
|
||||
}
|
49
ubisync-lib/src/state/queries/peers.rs
Normal file
49
ubisync-lib/src/state/queries/peers.rs
Normal file
|
@ -0,0 +1,49 @@
|
|||
use anyhow::Error;
|
||||
use cozo::{DataValue, DbInstance, ScriptMutability};
|
||||
|
||||
use crate::{comm::Peer, run_query, state::types::PeerId};
|
||||
|
||||
pub fn put(db: &DbInstance, id: &PeerId, name: &str) -> anyhow::Result<()> {
|
||||
let params = vec![
|
||||
("id", DataValue::Str(serde_json::to_string(id)?.into())),
|
||||
("name", DataValue::Str(serde_json::to_string(name)?.into())),
|
||||
];
|
||||
|
||||
match run_query!(
|
||||
&db,
|
||||
":put peers {id => name}",
|
||||
params,
|
||||
ScriptMutability::Mutable
|
||||
) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(db: &DbInstance) -> anyhow::Result<Vec<Peer>> {
|
||||
let result = db.run_script(
|
||||
"
|
||||
?[id, name] := *peers{id, name}
|
||||
",
|
||||
Default::default(),
|
||||
cozo::ScriptMutability::Immutable,
|
||||
);
|
||||
match result {
|
||||
Ok(rows) => Ok(rows
|
||||
.rows
|
||||
.into_iter()
|
||||
.map(|row| match row.as_slice() {
|
||||
[DataValue::Str(id_string), DataValue::Str(name_string)] => {
|
||||
if let Ok(id) = serde_json::from_str(&id_string) {
|
||||
Some(Peer::new(id, name_string.as_str().to_string()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.flatten()
|
||||
.collect()),
|
||||
Err(report) => Err(Error::msg(format!("Query failed: {}", report))),
|
||||
}
|
||||
}
|
40
ubisync-lib/src/state/schema.rs
Normal file
40
ubisync-lib/src/state/schema.rs
Normal file
|
@ -0,0 +1,40 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use anyhow::Error;
|
||||
use cozo::DbInstance;
|
||||
|
||||
pub fn add_schema(db: &DbInstance) -> anyhow::Result<()> {
|
||||
let params = BTreeMap::new();
|
||||
match db.run_script(
|
||||
"
|
||||
{:create apps {
|
||||
id: String,
|
||||
=>
|
||||
last_access: Int,
|
||||
name: String,
|
||||
description: String,
|
||||
}}
|
||||
{:create peers {
|
||||
id: String,
|
||||
=>
|
||||
name: String,
|
||||
}}
|
||||
{:create elements {
|
||||
id: String,
|
||||
=>
|
||||
content: Json,
|
||||
latest_message: String?,
|
||||
local_changes: Bool,
|
||||
}}
|
||||
{:create tags {
|
||||
tag: String,
|
||||
element: String,
|
||||
}}
|
||||
",
|
||||
params,
|
||||
cozo::ScriptMutability::Mutable,
|
||||
) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(Error::msg(format!("Failed to set up schema: {}", e))),
|
||||
}
|
||||
}
|
42
ubisync-lib/src/state/types/element.rs
Normal file
42
ubisync-lib/src/state/types/element.rs
Normal file
|
@ -0,0 +1,42 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::{ElementContent, ElementId, MessageId};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Element {
|
||||
// Uuid identifying the element itself
|
||||
id: ElementId,
|
||||
content: ElementContent,
|
||||
latest_message: Option<MessageId>,
|
||||
local_changes: bool,
|
||||
}
|
||||
|
||||
impl From<(ElementId, ElementContent, Option<MessageId>, bool)> for Element {
|
||||
fn from(value: (ElementId, ElementContent, Option<MessageId>, bool)) -> Self {
|
||||
Element {
|
||||
id: value.0,
|
||||
content: value.1,
|
||||
latest_message: value.2,
|
||||
local_changes: value.3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Element {
|
||||
pub fn new(id: ElementId, content: ElementContent) -> Self {
|
||||
// A new element with no latest message must have local changes
|
||||
Element {
|
||||
id: id,
|
||||
content: content,
|
||||
latest_message: None,
|
||||
local_changes: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn id(&self) -> &ElementId {
|
||||
&self.id
|
||||
}
|
||||
pub fn content(&self) -> &ElementContent {
|
||||
&self.content
|
||||
}
|
||||
}
|
26
ubisync-lib/src/state/types/element_content.rs
Normal file
26
ubisync-lib/src/state/types/element_content.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub enum ElementContent {
|
||||
Text(String),
|
||||
}
|
||||
|
||||
impl TryFrom<ElementContent> for String {
|
||||
type Error = serde_json::Error;
|
||||
fn try_from(value: ElementContent) -> Result<Self, Self::Error> {
|
||||
match serde_json::to_string(&value) {
|
||||
Ok(s) => Ok(s),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for ElementContent {
|
||||
type Error = serde_json::Error;
|
||||
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||
match serde_json::from_str(value) {
|
||||
Ok(ec) => Ok(ec),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
29
ubisync-lib/src/state/types/element_id.rs
Normal file
29
ubisync-lib/src/state/types/element_id.rs
Normal file
|
@ -0,0 +1,29 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct ElementId(Uuid);
|
||||
impl ElementId {
|
||||
pub fn new() -> Self {
|
||||
ElementId(Uuid::new_v4())
|
||||
}
|
||||
}
|
||||
|
||||
impl ToString for ElementId {
|
||||
fn to_string(&self) -> String {
|
||||
self.0.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ElementId> for String {
|
||||
fn from(value: &ElementId) -> Self {
|
||||
value.0.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for ElementId {
|
||||
type Error = serde_json::Error;
|
||||
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||
serde_json::from_str(value)
|
||||
}
|
||||
}
|
30
ubisync-lib/src/state/types/message_id.rs
Normal file
30
ubisync-lib/src/state/types/message_id.rs
Normal file
|
@ -0,0 +1,30 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct MessageId(Uuid);
|
||||
|
||||
impl MessageId {
|
||||
pub fn new() -> Self {
|
||||
MessageId(Uuid::new_v4())
|
||||
}
|
||||
}
|
||||
|
||||
impl ToString for MessageId {
|
||||
fn to_string(&self) -> String {
|
||||
self.0.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MessageId> for String {
|
||||
fn from(value: MessageId) -> Self {
|
||||
value.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for MessageId {
|
||||
type Error = serde_json::Error;
|
||||
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||
serde_json::from_str(value)
|
||||
}
|
||||
}
|
17
ubisync-lib/src/state/types/mod.rs
Normal file
17
ubisync-lib/src/state/types/mod.rs
Normal file
|
@ -0,0 +1,17 @@
|
|||
mod element_content;
|
||||
pub use element_content::ElementContent;
|
||||
|
||||
mod element_id;
|
||||
pub use element_id::ElementId;
|
||||
|
||||
mod element;
|
||||
pub use element::Element;
|
||||
|
||||
mod message_id;
|
||||
pub use message_id::MessageId;
|
||||
|
||||
mod peer_id;
|
||||
pub use peer_id::PeerId;
|
||||
|
||||
mod tag;
|
||||
pub use tag::Tag;
|
56
ubisync-lib/src/state/types/peer_id.rs
Normal file
56
ubisync-lib/src/state/types/peer_id.rs
Normal file
|
@ -0,0 +1,56 @@
|
|||
use anyhow::bail;
|
||||
use i2p::net::{I2pSocketAddr, ToI2pSocketAddrs};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct PeerId {
|
||||
i2p_addr: I2pSocketAddr,
|
||||
}
|
||||
|
||||
impl PeerId {
|
||||
pub fn addr(&self) -> I2pSocketAddr {
|
||||
self.i2p_addr.to_owned()
|
||||
}
|
||||
}
|
||||
|
||||
impl ToString for PeerId {
|
||||
fn to_string(&self) -> String {
|
||||
self.i2p_addr.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for PeerId {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self, anyhow::Error> {
|
||||
match ToI2pSocketAddrs::to_socket_addrs(&value) {
|
||||
Ok(addr_iter) => {
|
||||
for addr in addr_iter {
|
||||
return Ok(PeerId { i2p_addr: addr });
|
||||
}
|
||||
return Err(anyhow::Error::msg("No valid I2P address found"));
|
||||
}
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<String> for PeerId {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: String) -> Result<Self, Self::Error> {
|
||||
Self::try_from(value.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<I2pSocketAddr> for PeerId {
|
||||
fn from(value: I2pSocketAddr) -> Self {
|
||||
PeerId { i2p_addr: value }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PeerId> for I2pSocketAddr {
|
||||
fn from(value: PeerId) -> Self {
|
||||
value.i2p_addr
|
||||
}
|
||||
}
|
6
ubisync-lib/src/state/types/tag.rs
Normal file
6
ubisync-lib/src/state/types/tag.rs
Normal file
|
@ -0,0 +1,6 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct Tag {
|
||||
tag: String,
|
||||
}
|
86
ubisync-lib/tests/api.rs
Normal file
86
ubisync-lib/tests/api.rs
Normal file
|
@ -0,0 +1,86 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use tracing::{debug, Level};
|
||||
use ubisync_lib::{
|
||||
api::v0::app::AppDescription,
|
||||
config::Config,
|
||||
state::types::{Element, ElementContent, ElementId},
|
||||
Ubisync,
|
||||
};
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn two_nodes_element_creation() {
|
||||
tracing_subscriber::fmt()
|
||||
.pretty()
|
||||
.with_max_level(Level::DEBUG)
|
||||
.init();
|
||||
// Two nodes need to bind to different ports
|
||||
let mut c2 = Config::default();
|
||||
c2.api_config.port = Some(9982);
|
||||
let ubi1 = Ubisync::new(&Config::default()).await.unwrap();
|
||||
let ubi2 = Ubisync::new(&c2).await.unwrap();
|
||||
ubi1.add_peer_from_id(ubi2.get_destination().unwrap().into())
|
||||
.unwrap();
|
||||
|
||||
let http_client = reqwest::Client::new();
|
||||
let register_response = http_client
|
||||
.put("http://localhost:9981/v0/app/register")
|
||||
.json(&AppDescription {
|
||||
name: "Test".to_string(),
|
||||
desc_text: "desc".to_string(),
|
||||
})
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let jwt1 = register_response
|
||||
.text()
|
||||
.await
|
||||
.expect("Couldn't fetch token from response");
|
||||
let register_response = http_client
|
||||
.put("http://localhost:9982/v0/app/register")
|
||||
.json(&AppDescription {
|
||||
name: "Test".to_string(),
|
||||
desc_text: "desc".to_string(),
|
||||
})
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let jwt2 = register_response
|
||||
.text()
|
||||
.await
|
||||
.expect("Couldn't fetch token from response");
|
||||
|
||||
let test_element_content = ElementContent::Text("Text".to_string());
|
||||
let put_resp = http_client
|
||||
.put(&format!("http://localhost:9981/v0/element"))
|
||||
.json(&test_element_content)
|
||||
.header("Authorization", &format!("Bearer {}", &jwt1))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
debug!("{:?}", &put_resp);
|
||||
let put_resp_text = put_resp.text().await.expect("No put response body");
|
||||
debug!("{}", put_resp_text);
|
||||
let id =
|
||||
serde_json::from_str::<ElementId>(&put_resp_text).expect("Could not deserialize ElementId");
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(3000)).await;
|
||||
|
||||
let get_resp = http_client
|
||||
.get(&format!(
|
||||
"http://localhost:9982/v0/element/{}",
|
||||
Into::<String>::into(&id)
|
||||
))
|
||||
.header("Authorization", &format!("Bearer {}", &jwt2))
|
||||
.send()
|
||||
.await
|
||||
.expect("Get request failed");
|
||||
let get_resp_text = get_resp.text().await.expect("No get request body");
|
||||
debug!("{}", get_resp_text);
|
||||
let received_element =
|
||||
serde_json::from_str::<Element>(&get_resp_text).expect("Could not deserialize Element");
|
||||
debug!("Other node received this element: {:?}", received_element);
|
||||
|
||||
assert_eq!(&test_element_content, received_element.content());
|
||||
std::process::exit(0);
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue