feat: multi-server support #8

Open
rlahfa wants to merge 2 commits from multi-server into master
3 changed files with 45 additions and 22 deletions
Showing only changes of commit 870e9bf9ad - Show all commits

View file

@ -11,7 +11,7 @@ use toml_env::{initialize, Args, AutoMapEnvArgs, Logging};
pub struct BridgeConfig { pub struct BridgeConfig {
pub irc: HashMap<String, IrcConfig>, pub irc: HashMap<String, IrcConfig>,
pub signal: SignalConfig, pub signal: SignalConfig,
pub mapping: BiMap<String, String>, pub mapping: BiMap<String, IrcTarget>,
} }
impl BridgeConfig { impl BridgeConfig {
@ -31,6 +31,12 @@ impl BridgeConfig {
} }
} }
#[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)]
pub struct IrcTarget {
pub server: String, // TODO: Add a more meaningful type to enforce url format
pub channel: String, // Same (leading # ?)
}
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct SignalConfig { pub struct SignalConfig {
pub media_dir: Box<Path>, pub media_dir: Box<Path>,

View file

@ -52,13 +52,13 @@ async fn main() -> Result<(), String> {
let senders_and_streams: HashMap<String, Result<(irc::client::Sender, ClientStream), String>> = join_all( let senders_and_streams: HashMap<String, Result<(irc::client::Sender, ClientStream), String>> = join_all(
config.irc.into_iter() config.irc.into_iter()
.map(|(name, config)| async { .map(|(name, config)| async {
(name, init_irc_client(Client::from_config(config) let irc_client = init_irc_client(Client::from_config(config)
.await .await
.expect(&format!("Failed to parse configuration for {name}"))) .expect(&format!("Failed to parse configuration for {name}"))).await;
.await) (name, irc_client)
}) })
// Filter out failed connections // Filter out failed connections
.collect() //.collect()
).await.into_iter().collect(); ).await.into_iter().collect();
let filtered_senders_and_streams: Vec<(String, irc::client::Sender, ClientStream)> = senders_and_streams let filtered_senders_and_streams: Vec<(String, irc::client::Sender, ClientStream)> = senders_and_streams
@ -74,11 +74,11 @@ async fn main() -> Result<(), String> {
}) })
.collect(); .collect();
let filtered_senders = HashMap::new(); let mut filtered_senders = HashMap::new();
let filtered_streams = HashMap::new(); let mut filtered_streams = HashMap::new();
for (name, sender, stream) in filtered_senders_and_streams { for (name, sender, stream) in filtered_senders_and_streams {
filtered_senders.insert(name, sender); filtered_senders.insert(name.clone(), sender);
filtered_streams.insert(name, stream); filtered_streams.insert(name, stream);
} }
@ -106,7 +106,7 @@ async fn main() -> Result<(), String> {
.await .await
.map_err(|e| format!("failed to subscribe to signal msg: {e}"))?; .map_err(|e| format!("failed to subscribe to signal msg: {e}"))?;
log::debug!("Initialized Signal"); log::debug!("Initialized Signal");
Ok((signal_stream, signal_client)) Ok::<_, String>((signal_stream, signal_client))
}; };
let (mut signal_stream, signal_client) = init_signal.await?; let (mut signal_stream, signal_client) = init_signal.await?;
@ -124,10 +124,10 @@ async fn main() -> Result<(), String> {
// TODO: if only one of the future fails, we should not crash the rest of the threads but log a // TODO: if only one of the future fails, we should not crash the rest of the threads but log a
// very good error. // very good error.
let irc_handlers = try_join_all(irc_streams.into_iter().map(|(name, stream)| { let irc_handlers = try_join_all(irc_streams.into_iter().map(|(name, mut stream)| {
log::info!("Listening for IRC messages for {name}"); log::info!("Listening for IRC messages for {name}");
handle_irc(&mut stream, signal_client, &bridge) handle_irc(name, &mut stream, signal_client, &bridge)
}).collect()); }));
try_join( try_join(
handle_signal(irc_senders, &mut signal_stream, &bridge), handle_signal(irc_senders, &mut signal_stream, &bridge),
@ -138,6 +138,7 @@ async fn main() -> Result<(), String> {
} }
async fn handle_irc( async fn handle_irc(
server_id: String,
stream: &mut ClientStream, stream: &mut ClientStream,
signal_client: impl SubscriptionClientT + std::marker::Sync, signal_client: impl SubscriptionClientT + std::marker::Sync,
bridge: &Bridge<'_>, bridge: &Bridge<'_>,
@ -149,7 +150,7 @@ async fn handle_irc(
.transpose() .transpose()
.map_err(|e| format!("error while retrieving messages from irc: {e}"))? .map_err(|e| format!("error while retrieving messages from irc: {e}"))?
{ {
if let Some((groupid, msg)) = bridge.irc2signal(message) { if let Some((groupid, msg)) = bridge.irc2signal(&server_id, message) {
log::trace!("[SIGNAL SEND] {msg}"); log::trace!("[SIGNAL SEND] {msg}");
signal_client signal_client
.send( .send(

View file

@ -1,6 +1,8 @@
use std::path::Path; use std::path::Path;
use bimap::BiMap; use bimap::BiMap;
use crate::config::IrcTarget;
use irc::client::prelude::*; use irc::client::prelude::*;
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use tokio::fs::copy; use tokio::fs::copy;
@ -11,7 +13,7 @@ struct SignalMessageBuilder<'a> {
message: Option<String>, message: Option<String>,
attachments_urls: Vec<String>, attachments_urls: Vec<String>,
mentions: Vec<(usize, usize, String)>, // TODO Add type mentions: Vec<(usize, usize, String)>, // TODO Add type
target_channel: Option<&'a String>, target_channel: Option<&'a str>,
target_server: Option<&'a str>, target_server: Option<&'a str>,
} }
@ -32,15 +34,29 @@ impl<'b> SignalMessageBuilder<'b> {
self.mentions.extend_from_slice(mentions); self.mentions.extend_from_slice(mentions);
} }
pub fn target(&mut self, channel: Option<&'b String>) { pub fn target(&mut self, target: Option<&'b IrcTarget>) {
match target {
Some(target) => {
self.channel(Some(&target.channel));
self.server(Some(&target.server));
},
None => {
self.channel(None);
self.server(None);
},
};
}
pub fn channel(&mut self, channel: Option<&'b str>) {
self.target_channel = channel; self.target_channel = channel;
} }
pub fn server(&mut self, server: Option<&'b str>) { pub fn server(&mut self, server: Option<&'b str>) {
self.target_server = server; self.target_server = server;
} }
pub fn build(mut self) -> Option<(&'b str, &'b String, String)> pub fn build(mut self) -> Option<(&'b str, &'b str, String)>
{ {
self.mentions.sort_by(|(pos1, _, _), (pos2, _, _)| pos1.cmp(pos2)); self.mentions.sort_by(|(pos1, _, _), (pos2, _, _)| pos1.cmp(pos2));
log::trace!("building the final message: {:?} {:?}", self.mentions, self.message); log::trace!("building the final message: {:?} {:?}", self.mentions, self.message);
@ -75,7 +91,7 @@ pub struct Bridge<'a> {
media_dir: &'a Path, media_dir: &'a Path,
signalcli_dir: &'a Path, signalcli_dir: &'a Path,
url_root: &'a str, url_root: &'a str,
mapping: &'a BiMap<String, String>, mapping: &'a BiMap<String, IrcTarget>,
} }
impl<'a> Bridge<'a> { impl<'a> Bridge<'a> {
@ -83,7 +99,7 @@ impl<'a> Bridge<'a> {
media_dir: &'a Path, media_dir: &'a Path,
signalcli_dir: &'a Path, signalcli_dir: &'a Path,
url_root: &'a str, url_root: &'a str,
mapping: &'a BiMap<String, String>, mapping: &'a BiMap<String, IrcTarget>,
) -> Self { ) -> Self {
Self { Self {
media_dir, media_dir,
@ -137,12 +153,12 @@ impl<'a> Bridge<'a> {
} }
if let Some(Value::Object(group_info)) = message.get("groupInfo") { if let Some(Value::Object(group_info)) = message.get("groupInfo") {
if let Some(Value::String(groupid)) = group_info.get("groupId") { if let Some(Value::String(groupid)) = group_info.get("groupId") {
result.target(self.mapping.get_by_right(groupid)); result.target(self.mapping.get_by_left(groupid));
} }
} }
} }
pub async fn signal2irc(&self, signal: Value) -> Option<(&str, &String, String)> { pub async fn signal2irc(&self, signal: Value) -> Option<(&str, &str, String)> {
let mut result: SignalMessageBuilder = SignalMessageBuilder::default(); let mut result: SignalMessageBuilder = SignalMessageBuilder::default();
if let Value::Object(map) = signal { if let Value::Object(map) = signal {
if let Some(Value::Object(envelope)) = map.get("envelope") { if let Some(Value::Object(envelope)) = map.get("envelope") {
@ -162,7 +178,7 @@ impl<'a> Bridge<'a> {
result.build() result.build()
} }
pub fn irc2signal(&self, message: Message) -> Option<(&String, String)> { pub fn irc2signal(&self, server: &String, message: Message) -> Option<(&String, String)> {
match (message.prefix, message.command) { match (message.prefix, message.command) {
(Some(Prefix::Nickname(from, _, _)), Command::PRIVMSG(channel, message)) => { (Some(Prefix::Nickname(from, _, _)), Command::PRIVMSG(channel, message)) => {
Some((channel, format!("<{from}>: {message}"))) Some((channel, format!("<{from}>: {message}")))
@ -172,6 +188,6 @@ impl<'a> Bridge<'a> {
} }
_ => None, _ => None,
} }
.and_then(|(channel, e)| self.mapping.get_by_left(&channel).map(|c| (c, e))) .and_then(|(channel, e)| self.mapping.get_by_right(&IrcTarget { server: *server, channel}).map(|c| (c, e)))
} }
} }