diff --git a/src/config.rs b/src/config.rs index 3a80937..b9e0bbd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::env; use std::path::Path; @@ -8,7 +9,7 @@ use toml_env::{initialize, Args, AutoMapEnvArgs, Logging}; #[derive(Debug, Deserialize, Serialize)] pub struct BridgeConfig { - pub irc: IrcConfig, + pub irc: HashMap, pub signal: SignalConfig, pub mapping: BiMap, } diff --git a/src/main.rs b/src/main.rs index 21bcd7e..3107797 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,9 @@ mod transform; mod transports; mod config; +use futures::future::join_all; use futures::future::try_join; +use futures::future::try_join_all; use futures::prelude::*; use irc::client::prelude::*; use irc::client::ClientStream; @@ -11,11 +13,34 @@ use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; use serde_json::Value; use tokio::time::sleep; use core::time::Duration; +use std::collections::HashMap; use transform::Bridge; use crate::jsonrpc::RpcClient; use crate::config::BridgeConfig; +async fn init_irc_client(mut client: Client) -> Result<(irc::client::Sender, ClientStream), String> { + client + .identify() + .map_err(|e| format!("Error while initializing irc client: {e}"))?; + + let sender = client.sender(); + let mut stream = client + .stream() + .map_err(|e| format!("Error while initializing irc stream: {e}"))?; + + log::debug!("Initializing IRC"); + while client.list_channels().is_some_and(|x| x.is_empty()) { + stream + .next() + .await + .transpose() + .map_err(|e| format!("error waiting for motd: {e}"))?; + } + log::debug!("Initialized IRC"); + Ok::<(irc::client::Sender, ClientStream), String>((sender, stream)) +} + #[tokio::main] async fn main() -> Result<(), String> { pretty_env_logger::init(); @@ -24,30 +49,45 @@ async fn main() -> Result<(), String> { // Initialisation of components. TODO separate module let init_irc = async { - let mut client = Client::from_config(config.irc.clone()).await.unwrap(); - client - .identify() - .map_err(|e| format!("Error while initializing irc client: {e}"))?; + let senders_and_streams: HashMap> = join_all( + config.irc.into_iter() + .map(|(name, config)| async { + (name, init_irc_client(Client::from_config(config) + .await + .expect(&format!("Failed to parse configuration for {name}"))) + .await) + }) + // Filter out failed connections + .collect() + ).await.into_iter().collect(); - let sender = client.sender(); - let mut stream = client - .stream() - .map_err(|e| format!("Error while initializing irc stream: {e}"))?; + let filtered_senders_and_streams: Vec<(String, irc::client::Sender, ClientStream)> = senders_and_streams + .into_iter() + .filter_map(|(name, result)| { + match result { + Ok((sender, stream)) => Some((name, sender, stream)), + Err(reason) => { + log::warn!("Failed to initialize server {name}: {reason}, disabling this server."); + None + } + } + }) + .collect(); - log::debug!("Initializing IRC"); - while client.list_channels().is_some_and(|x| x.is_empty()) { - stream - .next() - .await - .transpose() - .map_err(|e| format!("error waiting for motd: {e}"))?; + let filtered_senders = HashMap::new(); + let filtered_streams = HashMap::new(); + + for (name, sender, stream) in filtered_senders_and_streams { + filtered_senders.insert(name, sender); + filtered_streams.insert(name, stream); } - log::debug!("Initialized IRC"); - Ok::<(irc::client::Sender, ClientStream), String>((sender, stream)) + + (filtered_senders, filtered_streams) }; let init_signal = async { log::debug!("Initializing Signal"); let signal_client = (async { + // TODO(raito): use exponential backoff here! for i in 0..10 { match jsonrpc::connect_unix(&config.signal.socket).await { Ok(client) => { @@ -68,8 +108,10 @@ async fn main() -> Result<(), String> { log::debug!("Initialized Signal"); Ok((signal_stream, signal_client)) }; - let ((sender, mut stream), (mut signal_stream, signal_client)) = - try_join(init_irc, init_signal).await?; + + let (mut signal_stream, signal_client) = init_signal.await?; + let (irc_senders, irc_streams) = init_irc.await; + let bridge = Bridge::new( &config.signal.media_dir, &config.signal.signal_cli_dir, @@ -78,11 +120,18 @@ async fn main() -> Result<(), String> { ); // Run !! - log::info!("Bridge is up and running"); + log::info!("Bridge is ready to run"); + + // TODO: if only one of the future fails, we should not crash the rest of the threads but log a + // very good error. + let irc_handlers = try_join_all(irc_streams.into_iter().map(|(name, stream)| { + log::info!("Listening for IRC messages for {name}"); + handle_irc(&mut stream, signal_client, &bridge) + }).collect()); try_join( - handle_signal(sender, &mut signal_stream, &bridge), - handle_irc(&mut stream, signal_client, &bridge), + handle_signal(irc_senders, &mut signal_stream, &bridge), + irc_handlers ) .await?; unreachable!(); @@ -136,7 +185,7 @@ async fn handle_irc( } async fn handle_signal( - sender: Sender, + senders: HashMap, stream: &mut Subscription, bridge: &Bridge<'_>, ) -> Result<(), String> { @@ -147,8 +196,15 @@ async fn handle_signal( } Some(Ok(val)) => { log::trace!("[SIGNAL RCV] {:?}", val); - if let Some((channel, msg)) = bridge.signal2irc(val).await { - sender + if let Some((server, channel, msg)) = bridge.signal2irc(val).await { + if !senders.contains_key(server) { + log::error!("Tried to send message to server {server} but is unavailable!"); + return Err( + format!("unavailable IRC server '{server}' when receiving a Signal message for it") + ); + } + + senders[server] .send_privmsg(channel, msg) .map_err(|e| format!("error while sending to irc: {e}"))?; } diff --git a/src/transform.rs b/src/transform.rs index c329a2d..5ae520f 100644 --- a/src/transform.rs +++ b/src/transform.rs @@ -11,7 +11,8 @@ struct SignalMessageBuilder<'a> { message: Option, attachments_urls: Vec, mentions: Vec<(usize, usize, String)>, // TODO Add type - target: Option<&'a String>, + target_channel: Option<&'a String>, + target_server: Option<&'a str>, } impl<'b> SignalMessageBuilder<'b> { @@ -32,12 +33,17 @@ impl<'b> SignalMessageBuilder<'b> { } pub fn target(&mut self, channel: Option<&'b String>) { - self.target = channel; + self.target_channel = channel; } - pub fn build(mut self) -> Option<(&'b String, String)> + pub fn server(&mut self, server: Option<&'b str>) { + self.target_server = server; + } + + pub fn build(mut self) -> Option<(&'b str, &'b String, String)> { self.mentions.sort_by(|(pos1, _, _), (pos2, _, _)| pos1.cmp(pos2)); + log::trace!("building the final message: {:?} {:?}", self.mentions, self.message); self.message.as_mut().map(|message| { let mut offset: usize = 0; for (pos, len, name) in self.mentions.into_iter() { @@ -46,7 +52,7 @@ impl<'b> SignalMessageBuilder<'b> { } }); let attachments_text = self.attachments_urls.join(", "); - (match (&self.message, attachments_text.len()) { + let message = (match (&self.message, attachments_text.len()) { (None, 0) => None, (None, _) => self .from @@ -57,7 +63,11 @@ impl<'b> SignalMessageBuilder<'b> { .from .as_ref() .map(|from| format!("<{from}>: {m} ({attachments_text})")), - }).and_then(|m| self.target.map(|e| (e, m))) + })?; + let target_channel = self.target_channel?; + let target_server = self.target_server?; + + Some((target_server, target_channel, message)) } } @@ -117,7 +127,7 @@ impl<'a> Bridge<'a> { if let Some(url) = self.handle_attachment(a).await { attachments_urls.push(url); } else { - log::warn!("droped attachment: {a}"); + log::warn!("dropped attachment: {a}"); } } result.attach(&attachments_urls); @@ -132,7 +142,7 @@ impl<'a> Bridge<'a> { } } - pub async fn signal2irc(&self, signal: Value) -> Option<(&String, String)> { + pub async fn signal2irc(&self, signal: Value) -> Option<(&str, &String, String)> { let mut result: SignalMessageBuilder = SignalMessageBuilder::default(); if let Value::Object(map) = signal { if let Some(Value::Object(envelope)) = map.get("envelope") {