232 lines
9.3 KiB
Rust
232 lines
9.3 KiB
Rust
mod conflict_resolution;
|
|
pub mod message_processor;
|
|
|
|
use tracing::{debug, error, warn};
|
|
use ubisync_lib::messages::Message;
|
|
use ubisync_lib::types::PeerId;
|
|
|
|
use std::collections::HashMap;
|
|
use std::io::{Read, Write};
|
|
use std::ops::Deref;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::{anyhow, bail};
|
|
use i2p::net::{I2pAddr, I2pListener, I2pListenerBuilder, I2pSocketAddr, I2pStream};
|
|
use i2p::sam_options::SAMOptions;
|
|
use tokio::sync::RwLock;
|
|
use tokio::task::JoinHandle;
|
|
|
|
use crate::state::CommState;
|
|
use crate::Config;
|
|
|
|
pub struct CommHandle {
|
|
state: Arc<CommState>,
|
|
i2p_server: Arc<I2pListener>,
|
|
peer_id: PeerId,
|
|
// Maps peer addresses to existing connections to them
|
|
clients: Arc<RwLock<HashMap<I2pSocketAddr, Arc<RwLock<I2pStream>>>>>,
|
|
thread: RwLock<Option<JoinHandle<()>>>,
|
|
}
|
|
|
|
impl CommHandle {
|
|
pub fn new(state: CommState, config: &Config) -> anyhow::Result<Self> {
|
|
let mut listener_builder =
|
|
I2pListenerBuilder::default().with_options(SAMOptions::default());
|
|
|
|
if let Some(privkey) = &config.i2p_private_key {
|
|
listener_builder = listener_builder.with_private_key(privkey.to_string());
|
|
}
|
|
|
|
let listener = listener_builder.build().unwrap();
|
|
let own_peer_id: PeerId = (&listener).local_addr().map_err(|e| anyhow!(e))?.into();
|
|
|
|
Ok(CommHandle {
|
|
state: Arc::new(state),
|
|
i2p_server: Arc::new(listener),
|
|
peer_id: own_peer_id,
|
|
clients: Default::default(),
|
|
thread: RwLock::new(None),
|
|
})
|
|
}
|
|
|
|
pub async fn run(&self) {
|
|
debug!("CommHandle is running now");
|
|
let state = self.state.clone();
|
|
let i2p_server = self.i2p_server.clone();
|
|
let mut thread_writeguard = self.thread.write().await;
|
|
*thread_writeguard = Some(tokio::spawn(async move {
|
|
for incoming in i2p_server.incoming() {
|
|
if let Ok(stream) = incoming {
|
|
let state_arc = state.clone();
|
|
// Spawn a blocking task, which (in read_connection) will spawn a non-blocking task
|
|
// The "outer" blocking task exists, because the for loop's iterator will block until
|
|
// there is another stream - thus, the existing streams will not be read.
|
|
// `spawn_blocking` moves the reading task to a special pool of tasks which are
|
|
// executed _despite_ other tasks blocking for something.
|
|
tokio::task::spawn_blocking(move || Self::read_connection(stream, state_arc));
|
|
}
|
|
}
|
|
}));
|
|
}
|
|
|
|
pub async fn stop(self) {
|
|
if let Some(t) = self.thread.read().await.as_ref() {
|
|
t.abort();
|
|
}
|
|
}
|
|
|
|
pub async fn broadcast(&self, msg: Message) -> anyhow::Result<()> {
|
|
match serde_json::to_string(&msg) {
|
|
Ok(msg_string) => {
|
|
for peer in self.state.get_peers().unwrap() {
|
|
debug!("Sending to peer '{:?}' message '{:?}'", &peer, &msg);
|
|
if let Err(e) = self.send_to_addr(&peer.addr(), msg_string.as_bytes()).await {
|
|
debug!(
|
|
"Failed to send message.\nError: {:?}\nMessage: {:?}",
|
|
e, &msg
|
|
);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
Err(e) => bail!(e),
|
|
}
|
|
}
|
|
|
|
pub async fn send(&self, dest: &I2pSocketAddr, msg: Message, custom_retry_settings: Option<(u64, u64)>) -> anyhow::Result<()> {
|
|
match serde_json::to_string(&msg) {
|
|
Ok(msg_string) => {
|
|
// Default retry settings
|
|
let (tries, base_delay) = match custom_retry_settings {
|
|
Some((ct, d)) => (ct, d),
|
|
None => (3, 200),
|
|
};
|
|
|
|
// Send message until success or too many retries
|
|
let mut ctr = 0;
|
|
while ctr < tries {
|
|
debug!("Sending message...\nFrom '{:?}'\nTo '{dest:?}'\nAttempt loop ctr: {ctr}\nMessage: '{msg:?}", self.own_peer_id().unwrap().addr());
|
|
match self.send_to_addr(dest, msg_string.as_bytes()).await {
|
|
Ok(_) => break,
|
|
Err(e) => {
|
|
debug!("{e:?}");
|
|
// Return last error if still unsuccessful
|
|
if ctr >= tries - 1 {
|
|
bail!(e)
|
|
}
|
|
|
|
// Continue loop otherwise
|
|
ctr = ctr + 1;
|
|
tokio::time::sleep(Duration::from_millis(base_delay * ctr)).await;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
Err(e) => bail!(e),
|
|
}
|
|
}
|
|
|
|
pub async fn send_to_addr(&self, addr: &I2pSocketAddr, msg: &[u8]) -> anyhow::Result<()> {
|
|
// Create client for this connection if necessary
|
|
if !self.clients.read().await.contains_key(addr) {
|
|
debug!("No client exists for requested connection, creating one");
|
|
match I2pStream::connect_with_session(self.i2p_server.session(), addr) {
|
|
Ok(client) => {
|
|
//client.inner.sam.conn.set_nodelay(true)?;
|
|
//client.inner.sam.conn.set_nonblocking(false)?;
|
|
self.clients
|
|
.write()
|
|
.await
|
|
.insert(addr.clone(), Arc::new(RwLock::new(client)));
|
|
}
|
|
Err(e) => bail!(e),
|
|
}
|
|
}
|
|
|
|
// Fetch current client for this connection from clients map, and send the message
|
|
if let Some(client) = self.clients.read().await.get(&addr) {
|
|
let mut writeguard = client.write().await;
|
|
match writeguard.write_all(msg) {
|
|
Ok(_) => {
|
|
writeguard.flush()?;
|
|
return Ok(());
|
|
}
|
|
Err(e) => {
|
|
warn!("Error writing to stream: {}", e)
|
|
}
|
|
}
|
|
} else {
|
|
return Err(anyhow::Error::msg(
|
|
"No client found despite trying to add one beforehand.",
|
|
));
|
|
}
|
|
self.clients.write().await.remove(&addr);
|
|
Err(anyhow::Error::msg(
|
|
"Failed to send anything, most likely the stream was broken and has been removed",
|
|
))
|
|
}
|
|
|
|
pub fn i2p_address(&self) -> I2pSocketAddr {
|
|
self.peer_id.addr()
|
|
}
|
|
|
|
pub fn i2p_b32_address(&self) -> I2pAddr {
|
|
self.peer_id.addr().dest()
|
|
}
|
|
|
|
pub fn own_peer_id(&self) -> anyhow::Result<PeerId> {
|
|
Ok(self.peer_id.to_owned())
|
|
}
|
|
|
|
fn read_connection(stream: I2pStream, state: Arc<CommState>) -> JoinHandle<()> {
|
|
let mut stream = stream;
|
|
tokio::spawn(async move {
|
|
let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into();
|
|
|
|
// All streams start with a \n byte which does not belong to the payload, take that from the stream.
|
|
if let Err(e) = stream.read(&mut [0; 1]) {
|
|
error!("Error while reading first byte of stream: {}", e);
|
|
return;
|
|
}
|
|
|
|
let iterator =
|
|
serde_json::Deserializer::from_reader(stream).into_iter::<serde_json::Value>();
|
|
for item in iterator {
|
|
match item {
|
|
Ok(value) => match serde_json::from_value::<Message>(value) {
|
|
Ok(message) => {
|
|
message_processor::handle(state.deref(), &peer, message);
|
|
}
|
|
Err(e) => warn!("Deserialization failed: {:?}", e),
|
|
},
|
|
Err(e) => {
|
|
warn!("Deserialization failed: {:?}", e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use i2p::net::I2pListener;
|
|
use i2p::sam::StreamForward;
|
|
use i2p::sam_options::SAMOptions;
|
|
use i2p::Session;
|
|
|
|
#[test]
|
|
pub fn from_privkey() {
|
|
let privkey = "DPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gVKUHq0huLwfh0u06PlujRXTgcUJw9pg4Vkh-e0CGQFL6yn2FxCUIvaryyFt3-8mwO1OTkQyB7u1rnO9FpLlKeT9FPSkwmaxZmwQ1kvsuTTIp5ntxQZ1XMCDm2qhRWdcEsYxTKLJIMYxN1Ujk9Y7SqNYORmxrwQWC4ENGnt~VyvbAAAAfAabqgU0GhMWN2syDQ5sYZ61WXDqC4esasxwyLvJ-ES7~k40Uq9htc8t16-RXEq0Q17C499WxW6~GQRcXbgBNd0bMdV-46RsFo1jNgfB6H4nkuTrQXMqXB6s2Fhx2gwcHRk3Lt5DE4N0mvHG8Po974tJWr1hIRiSxQUtSj5kcZOOT~EKWMoCA7qDgZORZAnJavaRr0S-PiPQwAw8HOekdw50CyOByxXEfLBAi-Kz1nhdNvMHIrtcBZ~RpsxOK63O633e0PeYwrOOG7AFVLh7SzdwVvI1-KUe7y2ADBcoHuJRMwk5MEV-BATEfhWA2SzWw1qFRzJyb-pGbgGCJQOoc1YcP8jikBJhtuRbD5K-wK5MXoHL";
|
|
let _ = I2pListener {
|
|
forward: StreamForward::with_session(
|
|
&Session::from_destination(i2p::sam::DEFAULT_API, &privkey, SAMOptions::default())
|
|
.expect("Failed to create session for listener2"),
|
|
)
|
|
.expect("Failed to create StreamForward for listener2"),
|
|
};
|
|
}
|
|
}
|