#!/usr/bin/env python3 import asyncio from ircrobots.interface import IBot from ollama import Client as OllamaClient from loadcredential import Credentials import base64 from irctokens.line import build, Line from ircrobots.bot import Bot as BaseBot from ircrobots.server import Server as BaseServer from ircrobots.params import ConnectionParams import aiohttp BRIDGE_NICKNAME = "hermes" SERVERS = [ ("dgnum", "irc.dgnum.eu") ] TEAMS = { "fai": ("tomate", "elias", "JeMaGius", "Luj", "catvayor", "Raito"), "marketing": ("cst1", "elias"), "bureau": ("Raito", "JeMaGius", "Luj", "gdd") } # times format is 0700-29092024 TRIGGER = '!' async def create_meet(title: str, times: list[str], timezone: str = "UTC") -> str: async with aiohttp.ClientSession() as session: payload = { 'name': title, 'times': times, 'timezone': timezone } async with session.post('https://api.meet.dgnum.eu/event', json=payload) as response: response.raise_for_status() id = (await response.json()).get('id') if not id: raise RuntimeError('No ID attributed to a meet') return f'https://meet.dgnum.eu/{id}' def expand_times(times: list[str]) -> list[str]: expanded = [] # TODO: verify the date exist in the calendar # TODO: verify that we don't write any duplicates. for time in times: if '-' not in time: for i in range(7, 20): expanded.append(f'{i:02}00-{time}') else: expanded.append(time) return expanded def bridge_stripped(possible_command: str, origin_nick: str) -> str | None: if origin_nick.lower() == BRIDGE_NICKNAME: stripped_user = possible_command.split(':')[1].lstrip() return stripped_user if stripped_user.startswith(TRIGGER) else None else: return possible_command if possible_command.startswith(TRIGGER) else None class Server(BaseServer): def __init__(self, bot: IBot, name: str, llm_client: OllamaClient): super().__init__(bot, name) self.llm_client = llm_client def extract_valid_command(self, line: Line) -> str | None: me = self.nickname_lower if line.command == "PRIVMSG" and \ self.has_channel(line.params[0]) and \ line.hostmask is not None and \ self.casefold(line.hostmask.nickname) != me and \ self.has_user(line.hostmask.nickname): return bridge_stripped(line.params[1], line.hostmask.nickname) async def line_read(self, line: Line): print(f"{self.name} < {line.format()}") if line.command == "001": print(f"connected to {self.isupport.network}") await self.send(build("JOIN", ["#dgnum-bridge-test"])) # In case `!probe_meet