feat: multi-server support

This is done.

Signed-off-by: Ryan Lahfa <ryan@dgnum.eu>
This commit is contained in:
Ryan Lahfa 2024-10-10 19:16:27 +02:00
parent 9123e6fbe5
commit 105f1b709f
3 changed files with 100 additions and 33 deletions

View file

@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::env; use std::env;
use std::path::Path; use std::path::Path;
@ -8,7 +9,7 @@ use toml_env::{initialize, Args, AutoMapEnvArgs, Logging};
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct BridgeConfig { pub struct BridgeConfig {
pub irc: IrcConfig, pub irc: HashMap<String, IrcConfig>,
pub signal: SignalConfig, pub signal: SignalConfig,
pub mapping: BiMap<String, String>, pub mapping: BiMap<String, String>,
} }

View file

@ -3,7 +3,9 @@ mod transform;
mod transports; mod transports;
mod config; mod config;
use futures::future::join_all;
use futures::future::try_join; use futures::future::try_join;
use futures::future::try_join_all;
use futures::prelude::*; use futures::prelude::*;
use irc::client::prelude::*; use irc::client::prelude::*;
use irc::client::ClientStream; use irc::client::ClientStream;
@ -11,11 +13,34 @@ use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use serde_json::Value; use serde_json::Value;
use tokio::time::sleep; use tokio::time::sleep;
use core::time::Duration; use core::time::Duration;
use std::collections::HashMap;
use transform::Bridge; use transform::Bridge;
use crate::jsonrpc::RpcClient; use crate::jsonrpc::RpcClient;
use crate::config::BridgeConfig; use crate::config::BridgeConfig;
async fn init_irc_client(mut client: Client) -> Result<(irc::client::Sender, ClientStream), String> {
client
.identify()
.map_err(|e| format!("Error while initializing irc client: {e}"))?;
let sender = client.sender();
let mut stream = client
.stream()
.map_err(|e| format!("Error while initializing irc stream: {e}"))?;
log::debug!("Initializing IRC");
while client.list_channels().is_some_and(|x| x.is_empty()) {
stream
.next()
.await
.transpose()
.map_err(|e| format!("error waiting for motd: {e}"))?;
}
log::debug!("Initialized IRC");
Ok::<(irc::client::Sender, ClientStream), String>((sender, stream))
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), String> { async fn main() -> Result<(), String> {
pretty_env_logger::init(); pretty_env_logger::init();
@ -24,30 +49,45 @@ async fn main() -> Result<(), String> {
// Initialisation of components. TODO separate module // Initialisation of components. TODO separate module
let init_irc = async { let init_irc = async {
let mut client = Client::from_config(config.irc.clone()).await.unwrap(); let senders_and_streams: HashMap<String, Result<(irc::client::Sender, ClientStream), String>> = join_all(
client config.irc.into_iter()
.identify() .map(|(name, config)| async {
.map_err(|e| format!("Error while initializing irc client: {e}"))?; (name, init_irc_client(Client::from_config(config)
.await
.expect(&format!("Failed to parse configuration for {name}")))
.await)
})
// Filter out failed connections
.collect()
).await.into_iter().collect();
let sender = client.sender(); let filtered_senders_and_streams: Vec<(String, irc::client::Sender, ClientStream)> = senders_and_streams
let mut stream = client .into_iter()
.stream() .filter_map(|(name, result)| {
.map_err(|e| format!("Error while initializing irc stream: {e}"))?; match result {
Ok((sender, stream)) => Some((name, sender, stream)),
Err(reason) => {
log::warn!("Failed to initialize server {name}: {reason}, disabling this server.");
None
}
}
})
.collect();
log::debug!("Initializing IRC"); let filtered_senders = HashMap::new();
while client.list_channels().is_some_and(|x| x.is_empty()) { let filtered_streams = HashMap::new();
stream
.next() for (name, sender, stream) in filtered_senders_and_streams {
.await filtered_senders.insert(name, sender);
.transpose() filtered_streams.insert(name, stream);
.map_err(|e| format!("error waiting for motd: {e}"))?;
} }
log::debug!("Initialized IRC");
Ok::<(irc::client::Sender, ClientStream), String>((sender, stream)) (filtered_senders, filtered_streams)
}; };
let init_signal = async { let init_signal = async {
log::debug!("Initializing Signal"); log::debug!("Initializing Signal");
let signal_client = (async { let signal_client = (async {
// TODO(raito): use exponential backoff here!
for i in 0..10 { for i in 0..10 {
match jsonrpc::connect_unix(&config.signal.socket).await { match jsonrpc::connect_unix(&config.signal.socket).await {
Ok(client) => { Ok(client) => {
@ -68,8 +108,10 @@ async fn main() -> Result<(), String> {
log::debug!("Initialized Signal"); log::debug!("Initialized Signal");
Ok((signal_stream, signal_client)) Ok((signal_stream, signal_client))
}; };
let ((sender, mut stream), (mut signal_stream, signal_client)) =
try_join(init_irc, init_signal).await?; let (mut signal_stream, signal_client) = init_signal.await?;
let (irc_senders, irc_streams) = init_irc.await;
let bridge = Bridge::new( let bridge = Bridge::new(
&config.signal.media_dir, &config.signal.media_dir,
&config.signal.signal_cli_dir, &config.signal.signal_cli_dir,
@ -78,11 +120,18 @@ async fn main() -> Result<(), String> {
); );
// Run !! // Run !!
log::info!("Bridge is up and running"); log::info!("Bridge is ready to run");
// TODO: if only one of the future fails, we should not crash the rest of the threads but log a
// very good error.
let irc_handlers = try_join_all(irc_streams.into_iter().map(|(name, stream)| {
log::info!("Listening for IRC messages for {name}");
handle_irc(&mut stream, signal_client, &bridge)
}).collect());
try_join( try_join(
handle_signal(sender, &mut signal_stream, &bridge), handle_signal(irc_senders, &mut signal_stream, &bridge),
handle_irc(&mut stream, signal_client, &bridge), irc_handlers
) )
.await?; .await?;
unreachable!(); unreachable!();
@ -136,7 +185,7 @@ async fn handle_irc(
} }
async fn handle_signal( async fn handle_signal(
sender: Sender, senders: HashMap<String, Sender>,
stream: &mut Subscription<Value>, stream: &mut Subscription<Value>,
bridge: &Bridge<'_>, bridge: &Bridge<'_>,
) -> Result<(), String> { ) -> Result<(), String> {
@ -147,8 +196,15 @@ async fn handle_signal(
} }
Some(Ok(val)) => { Some(Ok(val)) => {
log::trace!("[SIGNAL RCV] {:?}", val); log::trace!("[SIGNAL RCV] {:?}", val);
if let Some((channel, msg)) = bridge.signal2irc(val).await { if let Some((server, channel, msg)) = bridge.signal2irc(val).await {
sender if !senders.contains_key(server) {
log::error!("Tried to send message to server {server} but is unavailable!");
return Err(
format!("unavailable IRC server '{server}' when receiving a Signal message for it")
);
}
senders[server]
.send_privmsg(channel, msg) .send_privmsg(channel, msg)
.map_err(|e| format!("error while sending to irc: {e}"))?; .map_err(|e| format!("error while sending to irc: {e}"))?;
} }

View file

@ -11,7 +11,8 @@ struct SignalMessageBuilder<'a> {
message: Option<String>, message: Option<String>,
attachments_urls: Vec<String>, attachments_urls: Vec<String>,
mentions: Vec<(usize, usize, String)>, // TODO Add type mentions: Vec<(usize, usize, String)>, // TODO Add type
target: Option<&'a String>, target_channel: Option<&'a String>,
target_server: Option<&'a str>,
} }
impl<'b> SignalMessageBuilder<'b> { impl<'b> SignalMessageBuilder<'b> {
@ -32,12 +33,17 @@ impl<'b> SignalMessageBuilder<'b> {
} }
pub fn target(&mut self, channel: Option<&'b String>) { pub fn target(&mut self, channel: Option<&'b String>) {
self.target = channel; self.target_channel = channel;
} }
pub fn build(mut self) -> Option<(&'b String, String)> pub fn server(&mut self, server: Option<&'b str>) {
self.target_server = server;
}
pub fn build(mut self) -> Option<(&'b str, &'b String, String)>
{ {
self.mentions.sort_by(|(pos1, _, _), (pos2, _, _)| pos1.cmp(pos2)); self.mentions.sort_by(|(pos1, _, _), (pos2, _, _)| pos1.cmp(pos2));
log::trace!("building the final message: {:?} {:?}", self.mentions, self.message);
self.message.as_mut().map(|message| { self.message.as_mut().map(|message| {
let mut offset: usize = 0; let mut offset: usize = 0;
for (pos, len, name) in self.mentions.into_iter() { for (pos, len, name) in self.mentions.into_iter() {
@ -46,7 +52,7 @@ impl<'b> SignalMessageBuilder<'b> {
} }
}); });
let attachments_text = self.attachments_urls.join(", "); let attachments_text = self.attachments_urls.join(", ");
(match (&self.message, attachments_text.len()) { let message = (match (&self.message, attachments_text.len()) {
(None, 0) => None, (None, 0) => None,
(None, _) => self (None, _) => self
.from .from
@ -57,7 +63,11 @@ impl<'b> SignalMessageBuilder<'b> {
.from .from
.as_ref() .as_ref()
.map(|from| format!("<{from}>: {m} ({attachments_text})")), .map(|from| format!("<{from}>: {m} ({attachments_text})")),
}).and_then(|m| self.target.map(|e| (e, m))) })?;
let target_channel = self.target_channel?;
let target_server = self.target_server?;
Some((target_server, target_channel, message))
} }
} }
@ -117,7 +127,7 @@ impl<'a> Bridge<'a> {
if let Some(url) = self.handle_attachment(a).await { if let Some(url) = self.handle_attachment(a).await {
attachments_urls.push(url); attachments_urls.push(url);
} else { } else {
log::warn!("droped attachment: {a}"); log::warn!("dropped attachment: {a}");
} }
} }
result.attach(&attachments_urls); result.attach(&attachments_urls);
@ -132,7 +142,7 @@ impl<'a> Bridge<'a> {
} }
} }
pub async fn signal2irc(&self, signal: Value) -> Option<(&String, String)> { pub async fn signal2irc(&self, signal: Value) -> Option<(&str, &String, String)> {
let mut result: SignalMessageBuilder = SignalMessageBuilder::default(); let mut result: SignalMessageBuilder = SignalMessageBuilder::default();
if let Value::Object(map) = signal { if let Value::Object(map) = signal {
if let Some(Value::Object(envelope)) = map.get("envelope") { if let Some(Value::Object(envelope)) = map.get("envelope") {