ubisync/src/comm/mod.rs
Philip (a-0) 98393b9bf6 Split State in separate views for Api and CommHandle
- State changes can now be handled differently, depending on whether they were caused locallly (API) or by a remote peer (Comm)
- State functions have more readable names (`write...` and `update...` have similar meanings, but using different names helps readability in the respective (API/Comm) context)
2023-12-08 22:31:47 +01:00

242 lines
9.6 KiB
Rust

pub mod messages;
pub mod message_processor;
mod types;
use tracing::{warn, error, debug};
pub use types::*;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::io::{Read, Write};
use anyhow::bail;
use i2p::net::{I2pListenerBuilder, I2pListener, I2pSocketAddr, I2pStream, I2pAddr};
use i2p::sam_options::SAMOptions;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::Config;
use crate::state::CommState;
use crate::state::types::PeerId;
use self::messages::Message;
pub struct CommHandle {
state: Arc<CommState>,
i2p_server: Arc<I2pListener>,
// Maps peer addresses to existing connections to them
clients: Arc<RwLock<HashMap<I2pSocketAddr, Arc<RwLock<I2pStream>>>>>,
thread: RwLock<Option<JoinHandle<()>>>,
}
impl CommHandle {
pub fn new(state: CommState, config: &Config) -> anyhow::Result<Self> {
let mut listener_builder = I2pListenerBuilder::default()
.with_options(SAMOptions::default());
if let Some(privkey) = &config.i2p_private_key {
listener_builder = listener_builder.with_private_key(privkey.to_string());
}
let listener = listener_builder
.build()
.unwrap();
Ok(CommHandle {
state: Arc::new(state),
i2p_server: Arc::new(listener),
clients: Default::default(),
thread: RwLock::new(None),
})
}
pub async fn run(&self) {
debug!("CommHandle is running now");
let state = self.state.clone();
let i2p_server = self.i2p_server.clone();
let clients = self.clients.clone();
let mut thread_writeguard = self.thread.write().await;
*thread_writeguard = Some(tokio::spawn(async move {
for incoming in i2p_server.incoming() {
if let Ok(stream) = incoming {
if let Ok(addr) = stream.peer_addr() {
// First, save a reference to the new stream in `clients` for later reuse
let wrapped_stream = Arc::new(RwLock::new(stream));
clients.write().await.insert(addr, wrapped_stream.clone());
// Reference to state to be passed to `read_connection()`
let state_arc = state.clone();
// Spawn a blocking task, which (in read_connection) will spawn a non-blocking task
// The "outer" blocking task exists, because the for loop's iterator will block until
// there is another stream - thus, the existing streams will not be read.
// `spawn_blocking` moves the reading task to a special pool of tasks which are
// executed _despite_ other tasks blocking for something.
tokio::task::spawn_blocking(move || Self::read_connection(wrapped_stream, state_arc));
}
}
}
}));
}
pub async fn stop(self) {
if let Some(t) = self.thread.read().await.as_ref() {
t.abort();
}
}
pub async fn broadcast(&self, msg: Message) -> anyhow::Result<()> {
match serde_json::to_string(&msg) {
Ok(msg_string) => {
for peer in self.state.get_peers().unwrap() {
debug!("Sending to peer '{:?}' message '{:?}'", &peer, &msg);
if let Err(e) = self.send_to_addr(&peer.addr(), msg_string.as_bytes()).await {
debug!("Failed to send message.\nError: {:?}\nMessage: {:?}", e, &msg);
}
}
Ok(())
},
Err(e) => bail!(e),
}
}
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message) -> anyhow::Result<()> {
match serde_json::to_string(&msg) {
Ok(msg_string) => {
self.send_to_addr(dest, msg_string.as_bytes()).await?;
Ok(())
},
Err(e) => bail!(e),
}
}
pub async fn send_to_addr(&self, addr: &I2pSocketAddr, msg: &[u8]) -> anyhow::Result<()> {
// Create client for this connection if necessary
if !self.clients.read().await.contains_key(addr) {
match I2pStream::connect(addr) {
Ok(client) => {
//client.inner.sam.conn.set_nodelay(true)?;
//client.inner.sam.conn.set_nonblocking(false)?;
self.clients.write().await.insert(addr.clone(), Arc::new(RwLock::new(client)));
},
Err(e) => bail!(e),
}
}
// Fetch current client for this connection from clients map, and send the message
if let Some(client) = self.clients.read().await.get(&addr) {
let mut writeguard = client.write().await;
match writeguard.write_all(msg) {
Ok(_) => {
writeguard.flush()?;
return Ok(())
},
Err(e) => {
warn!("Error writing to stream: {}", e)
}
}
}
else {
return Err(anyhow::Error::msg("No client found despite trying to add one beforehand."))
}
self.clients.write().await.remove(&addr);
Err(anyhow::Error::msg("Failed to send anything, most likely the stream was broken and has been removed"))
}
pub fn i2p_address(&self) -> anyhow::Result<I2pSocketAddr> {
match self.i2p_server.local_addr() {
Ok(addr) => Ok(addr),
Err(e) => bail!(e),
}
}
pub fn i2p_b32_address(&self) -> anyhow::Result<I2pSocketAddr> {
let mut i2p_dest = self.i2p_address()?;
i2p_dest.set_dest(I2pAddr::from_b64(&i2p_dest.dest().string()).unwrap());
Ok(i2p_dest)
}
fn read_connection(wrapped_stream: Arc<RwLock<I2pStream>>, state: Arc<CommState>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut stream = wrapped_stream.write().await;
let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into();
// All streams start with a \n byte which does not belong to the payload, take that from the stream.
if let Err(e) = stream.read(&mut [0; 1]) {
error!("Error while reading first byte of stream: {}", e);
return;
}
let iterator = serde_json::Deserializer::from_reader(&mut *stream).into_iter::<serde_json::Value>();
for item in iterator {
match item {
Ok(value) => {
match serde_json::from_value::<Message>(value) {
Ok(message) => {
message_processor::handle(state.deref(), &peer, message);
},
Err(e) => warn!("Deserialization failed: {:?}", e),
}
},
Err(e) => {
warn!("Deserialization failed: {:?}", e);
break;
}
}
}
})
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use i2p::Session;
use i2p::net::I2pListener;
use i2p::sam::StreamForward;
use i2p::sam_options::SAMOptions;
use crate::Config;
use crate::state::{State, CommState};
use crate::comm::{messages, Message};
use crate::state::types::ElementId;
use super::CommHandle;
#[tokio::test(flavor = "multi_thread")]
pub async fn msg() {
let ch = CommHandle::new(CommState::new(State::new().await.unwrap()), &Config::default() ).unwrap();
ch.run().await;
println!("My address: {:?}", ch.i2p_b32_address());
ch.send(
&ch.i2p_address().unwrap(),
Message::new(messages::MessageContent::Hello { peer_name: "a".to_string() })
).await.expect("Could not send hello");
for i in 0..10 {
let result = ch.send(
&ch.i2p_address().unwrap(),
Message::new(messages::MessageContent::CreateElement { id: ElementId::new(), content: crate::state::types::ElementContent::Text(format!("hello world no. {}", i)) })
).await;
tokio::time::sleep(Duration::from_millis(300)).await;
println!("Result of sending: {:?}", result);
}
ch.stop().await;
}
#[test]
pub fn from_privkey() {
let privkey = "DPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gVKUHq0huLwfh0u06PlujRXTgcUJw9pg4Vkh-e0CGQFL6yn2FxCUIvaryyFt3-8mwO1OTkQyB7u1rnO9FpLlKeT9FPSkwmaxZmwQ1kvsuTTIp5ntxQZ1XMCDm2qhRWdcEsYxTKLJIMYxN1Ujk9Y7SqNYORmxrwQWC4ENGnt~VyvbAAAAfAabqgU0GhMWN2syDQ5sYZ61WXDqC4esasxwyLvJ-ES7~k40Uq9htc8t16-RXEq0Q17C499WxW6~GQRcXbgBNd0bMdV-46RsFo1jNgfB6H4nkuTrQXMqXB6s2Fhx2gwcHRk3Lt5DE4N0mvHG8Po974tJWr1hIRiSxQUtSj5kcZOOT~EKWMoCA7qDgZORZAnJavaRr0S-PiPQwAw8HOekdw50CyOByxXEfLBAi-Kz1nhdNvMHIrtcBZ~RpsxOK63O633e0PeYwrOOG7AFVLh7SzdwVvI1-KUe7y2ADBcoHuJRMwk5MEV-BATEfhWA2SzWw1qFRzJyb-pGbgGCJQOoc1YcP8jikBJhtuRbD5K-wK5MXoHL";
let _ = I2pListener {
forward: StreamForward::with_session(&Session::from_destination(
i2p::sam::DEFAULT_API,
&privkey,
SAMOptions::default()).expect("Failed to create session for listener2")
).expect("Failed to create StreamForward for listener2")
};
}
}