Compare commits
1 commit
multi-serv
...
master
Author | SHA1 | Date | |
---|---|---|---|
2a761dcac5 |
4 changed files with 45 additions and 133 deletions
|
@ -5,7 +5,7 @@
|
||||||
rustPlatform,
|
rustPlatform,
|
||||||
}:
|
}:
|
||||||
|
|
||||||
rustPlatform.buildRustPackage rec {
|
rustPlatform.buildRustPackage {
|
||||||
pname = "signal-irc-bridge";
|
pname = "signal-irc-bridge";
|
||||||
version = "0.1";
|
version = "0.1";
|
||||||
|
|
||||||
|
@ -24,4 +24,6 @@ rustPlatform.buildRustPackage rec {
|
||||||
buildInputs = [ openssl ];
|
buildInputs = [ openssl ];
|
||||||
|
|
||||||
cargoHash = "sha256-Q0j3FWaHDVpI1gHJpUuRzAKx1f0rZ2LcYrFD90DiNj0=";
|
cargoHash = "sha256-Q0j3FWaHDVpI1gHJpUuRzAKx1f0rZ2LcYrFD90DiNj0=";
|
||||||
|
|
||||||
|
meta.mainProgram = "signal-irc-bridge";
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
|
@ -9,9 +8,9 @@ use toml_env::{initialize, Args, AutoMapEnvArgs, Logging};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub struct BridgeConfig {
|
pub struct BridgeConfig {
|
||||||
pub irc: HashMap<String, IrcConfig>,
|
pub irc: IrcConfig,
|
||||||
pub signal: SignalConfig,
|
pub signal: SignalConfig,
|
||||||
pub mapping: BiMap<String, IrcTarget>,
|
pub mapping: BiMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BridgeConfig {
|
impl BridgeConfig {
|
||||||
|
@ -31,12 +30,6 @@ impl BridgeConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Hash, PartialEq, Eq, Deserialize, Serialize)]
|
|
||||||
pub struct IrcTarget {
|
|
||||||
pub server: String, // TODO: Add a more meaningful type to enforce url format
|
|
||||||
pub channel: String, // Same (leading # ?)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub struct SignalConfig {
|
pub struct SignalConfig {
|
||||||
pub media_dir: Box<Path>,
|
pub media_dir: Box<Path>,
|
||||||
|
|
111
src/main.rs
111
src/main.rs
|
@ -3,9 +3,7 @@ mod transform;
|
||||||
mod transports;
|
mod transports;
|
||||||
mod config;
|
mod config;
|
||||||
|
|
||||||
use futures::future::join_all;
|
|
||||||
use futures::future::try_join;
|
use futures::future::try_join;
|
||||||
use futures::future::try_join_all;
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use irc::client::prelude::*;
|
use irc::client::prelude::*;
|
||||||
use irc::client::ClientStream;
|
use irc::client::ClientStream;
|
||||||
|
@ -13,34 +11,11 @@ use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use core::time::Duration;
|
use core::time::Duration;
|
||||||
use std::collections::HashMap;
|
|
||||||
use transform::Bridge;
|
use transform::Bridge;
|
||||||
|
|
||||||
use crate::jsonrpc::RpcClient;
|
use crate::jsonrpc::RpcClient;
|
||||||
use crate::config::BridgeConfig;
|
use crate::config::BridgeConfig;
|
||||||
|
|
||||||
async fn init_irc_client(mut client: Client) -> Result<(irc::client::Sender, ClientStream), String> {
|
|
||||||
client
|
|
||||||
.identify()
|
|
||||||
.map_err(|e| format!("Error while initializing irc client: {e}"))?;
|
|
||||||
|
|
||||||
let sender = client.sender();
|
|
||||||
let mut stream = client
|
|
||||||
.stream()
|
|
||||||
.map_err(|e| format!("Error while initializing irc stream: {e}"))?;
|
|
||||||
|
|
||||||
log::debug!("Initializing IRC");
|
|
||||||
while client.list_channels().is_some_and(|x| x.is_empty()) {
|
|
||||||
stream
|
|
||||||
.next()
|
|
||||||
.await
|
|
||||||
.transpose()
|
|
||||||
.map_err(|e| format!("error waiting for motd: {e}"))?;
|
|
||||||
}
|
|
||||||
log::debug!("Initialized IRC");
|
|
||||||
Ok::<(irc::client::Sender, ClientStream), String>((sender, stream))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), String> {
|
async fn main() -> Result<(), String> {
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
|
@ -49,45 +24,30 @@ async fn main() -> Result<(), String> {
|
||||||
|
|
||||||
// Initialisation of components. TODO separate module
|
// Initialisation of components. TODO separate module
|
||||||
let init_irc = async {
|
let init_irc = async {
|
||||||
let senders_and_streams: HashMap<String, Result<(irc::client::Sender, ClientStream), String>> = join_all(
|
let mut client = Client::from_config(config.irc.clone()).await.unwrap();
|
||||||
config.irc.into_iter()
|
client
|
||||||
.map(|(name, config)| async {
|
.identify()
|
||||||
let irc_client = init_irc_client(Client::from_config(config)
|
.map_err(|e| format!("Error while initializing irc client: {e}"))?;
|
||||||
.await
|
|
||||||
.expect(&format!("Failed to parse configuration for {name}"))).await;
|
|
||||||
(name, irc_client)
|
|
||||||
})
|
|
||||||
// Filter out failed connections
|
|
||||||
//.collect()
|
|
||||||
).await.into_iter().collect();
|
|
||||||
|
|
||||||
let filtered_senders_and_streams: Vec<(String, irc::client::Sender, ClientStream)> = senders_and_streams
|
let sender = client.sender();
|
||||||
.into_iter()
|
let mut stream = client
|
||||||
.filter_map(|(name, result)| {
|
.stream()
|
||||||
match result {
|
.map_err(|e| format!("Error while initializing irc stream: {e}"))?;
|
||||||
Ok((sender, stream)) => Some((name, sender, stream)),
|
|
||||||
Err(reason) => {
|
|
||||||
log::warn!("Failed to initialize server {name}: {reason}, disabling this server.");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut filtered_senders = HashMap::new();
|
log::debug!("Initializing IRC");
|
||||||
let mut filtered_streams = HashMap::new();
|
while client.list_channels().is_some_and(|x| x.is_empty()) {
|
||||||
|
stream
|
||||||
for (name, sender, stream) in filtered_senders_and_streams {
|
.next()
|
||||||
filtered_senders.insert(name.clone(), sender);
|
.await
|
||||||
filtered_streams.insert(name, stream);
|
.transpose()
|
||||||
|
.map_err(|e| format!("error waiting for motd: {e}"))?;
|
||||||
}
|
}
|
||||||
|
log::debug!("Initialized IRC");
|
||||||
(filtered_senders, filtered_streams)
|
Ok::<(irc::client::Sender, ClientStream), String>((sender, stream))
|
||||||
};
|
};
|
||||||
let init_signal = async {
|
let init_signal = async {
|
||||||
log::debug!("Initializing Signal");
|
log::debug!("Initializing Signal");
|
||||||
let signal_client = (async {
|
let signal_client = (async {
|
||||||
// TODO(raito): use exponential backoff here!
|
|
||||||
for i in 0..10 {
|
for i in 0..10 {
|
||||||
match jsonrpc::connect_unix(&config.signal.socket).await {
|
match jsonrpc::connect_unix(&config.signal.socket).await {
|
||||||
Ok(client) => {
|
Ok(client) => {
|
||||||
|
@ -106,12 +66,10 @@ async fn main() -> Result<(), String> {
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("failed to subscribe to signal msg: {e}"))?;
|
.map_err(|e| format!("failed to subscribe to signal msg: {e}"))?;
|
||||||
log::debug!("Initialized Signal");
|
log::debug!("Initialized Signal");
|
||||||
Ok::<_, String>((signal_stream, signal_client))
|
Ok((signal_stream, signal_client))
|
||||||
};
|
};
|
||||||
|
let ((sender, mut stream), (mut signal_stream, signal_client)) =
|
||||||
let (mut signal_stream, signal_client) = init_signal.await?;
|
try_join(init_irc, init_signal).await?;
|
||||||
let (irc_senders, irc_streams) = init_irc.await;
|
|
||||||
|
|
||||||
let bridge = Bridge::new(
|
let bridge = Bridge::new(
|
||||||
&config.signal.media_dir,
|
&config.signal.media_dir,
|
||||||
&config.signal.signal_cli_dir,
|
&config.signal.signal_cli_dir,
|
||||||
|
@ -120,25 +78,17 @@ async fn main() -> Result<(), String> {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Run !!
|
// Run !!
|
||||||
log::info!("Bridge is ready to run");
|
log::info!("Bridge is up and running");
|
||||||
|
|
||||||
// TODO: if only one of the future fails, we should not crash the rest of the threads but log a
|
|
||||||
// very good error.
|
|
||||||
let irc_handlers = try_join_all(irc_streams.into_iter().map(|(name, mut stream)| {
|
|
||||||
log::info!("Listening for IRC messages for {name}");
|
|
||||||
handle_irc(name, &mut stream, signal_client, &bridge)
|
|
||||||
}));
|
|
||||||
|
|
||||||
try_join(
|
try_join(
|
||||||
handle_signal(irc_senders, &mut signal_stream, &bridge),
|
handle_signal(sender, &mut signal_stream, &bridge),
|
||||||
irc_handlers
|
handle_irc(&mut stream, signal_client, &bridge),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
unreachable!();
|
unreachable!();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_irc(
|
async fn handle_irc(
|
||||||
server_id: String,
|
|
||||||
stream: &mut ClientStream,
|
stream: &mut ClientStream,
|
||||||
signal_client: impl SubscriptionClientT + std::marker::Sync,
|
signal_client: impl SubscriptionClientT + std::marker::Sync,
|
||||||
bridge: &Bridge<'_>,
|
bridge: &Bridge<'_>,
|
||||||
|
@ -150,7 +100,7 @@ async fn handle_irc(
|
||||||
.transpose()
|
.transpose()
|
||||||
.map_err(|e| format!("error while retrieving messages from irc: {e}"))?
|
.map_err(|e| format!("error while retrieving messages from irc: {e}"))?
|
||||||
{
|
{
|
||||||
if let Some((groupid, msg)) = bridge.irc2signal(&server_id, message) {
|
if let Some((groupid, msg)) = bridge.irc2signal(message) {
|
||||||
log::trace!("[SIGNAL SEND] {msg}");
|
log::trace!("[SIGNAL SEND] {msg}");
|
||||||
signal_client
|
signal_client
|
||||||
.send(
|
.send(
|
||||||
|
@ -186,7 +136,7 @@ async fn handle_irc(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_signal(
|
async fn handle_signal(
|
||||||
senders: HashMap<String, Sender>,
|
sender: Sender,
|
||||||
stream: &mut Subscription<Value>,
|
stream: &mut Subscription<Value>,
|
||||||
bridge: &Bridge<'_>,
|
bridge: &Bridge<'_>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
|
@ -197,15 +147,8 @@ async fn handle_signal(
|
||||||
}
|
}
|
||||||
Some(Ok(val)) => {
|
Some(Ok(val)) => {
|
||||||
log::trace!("[SIGNAL RCV] {:?}", val);
|
log::trace!("[SIGNAL RCV] {:?}", val);
|
||||||
if let Some((server, channel, msg)) = bridge.signal2irc(val).await {
|
if let Some((channel, msg)) = bridge.signal2irc(val).await {
|
||||||
if !senders.contains_key(server) {
|
sender
|
||||||
log::error!("Tried to send message to server {server} but is unavailable!");
|
|
||||||
return Err(
|
|
||||||
format!("unavailable IRC server '{server}' when receiving a Signal message for it")
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
senders[server]
|
|
||||||
.send_privmsg(channel, msg)
|
.send_privmsg(channel, msg)
|
||||||
.map_err(|e| format!("error while sending to irc: {e}"))?;
|
.map_err(|e| format!("error while sending to irc: {e}"))?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use bimap::BiMap;
|
use bimap::BiMap;
|
||||||
|
|
||||||
use crate::config::IrcTarget;
|
|
||||||
|
|
||||||
use irc::client::prelude::*;
|
use irc::client::prelude::*;
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
use tokio::fs::copy;
|
use tokio::fs::copy;
|
||||||
|
@ -13,8 +11,7 @@ struct SignalMessageBuilder<'a> {
|
||||||
message: Option<String>,
|
message: Option<String>,
|
||||||
attachments_urls: Vec<String>,
|
attachments_urls: Vec<String>,
|
||||||
mentions: Vec<(usize, usize, String)>, // TODO Add type
|
mentions: Vec<(usize, usize, String)>, // TODO Add type
|
||||||
target_channel: Option<&'a str>,
|
target: Option<&'a String>,
|
||||||
target_server: Option<&'a str>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'b> SignalMessageBuilder<'b> {
|
impl<'b> SignalMessageBuilder<'b> {
|
||||||
|
@ -34,32 +31,13 @@ impl<'b> SignalMessageBuilder<'b> {
|
||||||
self.mentions.extend_from_slice(mentions);
|
self.mentions.extend_from_slice(mentions);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn target(&mut self, target: Option<&'b IrcTarget>) {
|
pub fn target(&mut self, channel: Option<&'b String>) {
|
||||||
match target {
|
self.target = channel;
|
||||||
Some(target) => {
|
|
||||||
self.channel(Some(&target.channel));
|
|
||||||
self.server(Some(&target.server));
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
self.channel(None);
|
|
||||||
self.server(None);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn channel(&mut self, channel: Option<&'b str>) {
|
pub fn build(mut self) -> Option<(&'b String, String)>
|
||||||
self.target_channel = channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub fn server(&mut self, server: Option<&'b str>) {
|
|
||||||
self.target_server = server;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build(mut self) -> Option<(&'b str, &'b str, String)>
|
|
||||||
{
|
{
|
||||||
self.mentions.sort_by(|(pos1, _, _), (pos2, _, _)| pos1.cmp(pos2));
|
self.mentions.sort_by(|(pos1, _, _), (pos2, _, _)| pos1.cmp(pos2));
|
||||||
log::trace!("building the final message: {:?} {:?}", self.mentions, self.message);
|
|
||||||
self.message.as_mut().map(|message| {
|
self.message.as_mut().map(|message| {
|
||||||
let mut offset: usize = 0;
|
let mut offset: usize = 0;
|
||||||
for (pos, len, name) in self.mentions.into_iter() {
|
for (pos, len, name) in self.mentions.into_iter() {
|
||||||
|
@ -68,7 +46,7 @@ impl<'b> SignalMessageBuilder<'b> {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let attachments_text = self.attachments_urls.join(", ");
|
let attachments_text = self.attachments_urls.join(", ");
|
||||||
let message = (match (&self.message, attachments_text.len()) {
|
(match (&self.message, attachments_text.len()) {
|
||||||
(None, 0) => None,
|
(None, 0) => None,
|
||||||
(None, _) => self
|
(None, _) => self
|
||||||
.from
|
.from
|
||||||
|
@ -79,11 +57,7 @@ impl<'b> SignalMessageBuilder<'b> {
|
||||||
.from
|
.from
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|from| format!("<{from}>: {m} ({attachments_text})")),
|
.map(|from| format!("<{from}>: {m} ({attachments_text})")),
|
||||||
})?;
|
}).and_then(|m| self.target.map(|e| (e, m)))
|
||||||
let target_channel = self.target_channel?;
|
|
||||||
let target_server = self.target_server?;
|
|
||||||
|
|
||||||
Some((target_server, target_channel, message))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +65,7 @@ pub struct Bridge<'a> {
|
||||||
media_dir: &'a Path,
|
media_dir: &'a Path,
|
||||||
signalcli_dir: &'a Path,
|
signalcli_dir: &'a Path,
|
||||||
url_root: &'a str,
|
url_root: &'a str,
|
||||||
mapping: &'a BiMap<String, IrcTarget>,
|
mapping: &'a BiMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Bridge<'a> {
|
impl<'a> Bridge<'a> {
|
||||||
|
@ -99,7 +73,7 @@ impl<'a> Bridge<'a> {
|
||||||
media_dir: &'a Path,
|
media_dir: &'a Path,
|
||||||
signalcli_dir: &'a Path,
|
signalcli_dir: &'a Path,
|
||||||
url_root: &'a str,
|
url_root: &'a str,
|
||||||
mapping: &'a BiMap<String, IrcTarget>,
|
mapping: &'a BiMap<String, String>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
media_dir,
|
media_dir,
|
||||||
|
@ -143,7 +117,7 @@ impl<'a> Bridge<'a> {
|
||||||
if let Some(url) = self.handle_attachment(a).await {
|
if let Some(url) = self.handle_attachment(a).await {
|
||||||
attachments_urls.push(url);
|
attachments_urls.push(url);
|
||||||
} else {
|
} else {
|
||||||
log::warn!("dropped attachment: {a}");
|
log::warn!("droped attachment: {a}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result.attach(&attachments_urls);
|
result.attach(&attachments_urls);
|
||||||
|
@ -153,12 +127,12 @@ impl<'a> Bridge<'a> {
|
||||||
}
|
}
|
||||||
if let Some(Value::Object(group_info)) = message.get("groupInfo") {
|
if let Some(Value::Object(group_info)) = message.get("groupInfo") {
|
||||||
if let Some(Value::String(groupid)) = group_info.get("groupId") {
|
if let Some(Value::String(groupid)) = group_info.get("groupId") {
|
||||||
result.target(self.mapping.get_by_left(groupid));
|
result.target(self.mapping.get_by_right(groupid));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn signal2irc(&self, signal: Value) -> Option<(&str, &str, String)> {
|
pub async fn signal2irc(&self, signal: Value) -> Option<(&String, String)> {
|
||||||
let mut result: SignalMessageBuilder = SignalMessageBuilder::default();
|
let mut result: SignalMessageBuilder = SignalMessageBuilder::default();
|
||||||
if let Value::Object(map) = signal {
|
if let Value::Object(map) = signal {
|
||||||
if let Some(Value::Object(envelope)) = map.get("envelope") {
|
if let Some(Value::Object(envelope)) = map.get("envelope") {
|
||||||
|
@ -178,7 +152,7 @@ impl<'a> Bridge<'a> {
|
||||||
result.build()
|
result.build()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn irc2signal(&self, server: &String, message: Message) -> Option<(&String, String)> {
|
pub fn irc2signal(&self, message: Message) -> Option<(&String, String)> {
|
||||||
match (message.prefix, message.command) {
|
match (message.prefix, message.command) {
|
||||||
(Some(Prefix::Nickname(from, _, _)), Command::PRIVMSG(channel, message)) => {
|
(Some(Prefix::Nickname(from, _, _)), Command::PRIVMSG(channel, message)) => {
|
||||||
Some((channel, format!("<{from}>: {message}")))
|
Some((channel, format!("<{from}>: {message}")))
|
||||||
|
@ -188,6 +162,6 @@ impl<'a> Bridge<'a> {
|
||||||
}
|
}
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
.and_then(|(channel, e)| self.mapping.get_by_right(&IrcTarget { server: *server, channel}).map(|c| (c, e)))
|
.and_then(|(channel, e)| self.mapping.get_by_left(&channel).map(|c| (c, e)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue