From 883f09e31cca8bc2b59f6a700dd95b00bb514183 Mon Sep 17 00:00:00 2001 From: jesopo Date: Sun, 21 Jun 2020 16:46:36 +0100 Subject: [PATCH] switch _next_lines and _read_lines to generators. taskgroup wait_fors! --- ircrobots/bot.py | 3 +- ircrobots/server.py | 75 +++++++++++++++++++++++---------------------- 2 files changed, 41 insertions(+), 37 deletions(-) diff --git a/ircrobots/bot.py b/ircrobots/bot.py index a65cbd5..6ea6b97 100644 --- a/ircrobots/bot.py +++ b/ircrobots/bot.py @@ -39,7 +39,8 @@ class Bot(IBot): async with anyio.create_task_group() as tg: async def _read(): while True: - await server._read_lines() + async for line, emit in server._read_lines(): + pass await tg.spawn(_read) await tg.spawn(server._send_lines) diff --git a/ircrobots/server.py b/ircrobots/server.py index 3fc3595..3d7927f 100644 --- a/ircrobots/server.py +++ b/ircrobots/server.py @@ -1,10 +1,11 @@ import asyncio from asyncio import Future, PriorityQueue -from typing import (Awaitable, Deque, Dict, List, Optional, Set, Tuple, - Union) +from typing import (AsyncIterable, Awaitable, Deque, Dict, Iterable, List, + Optional, Set, Tuple, Union) from collections import deque from time import monotonic +import anyio from asyncio_throttle import Throttler from async_timeout import timeout from ircstates import Emit, Channel, ChannelUser @@ -64,8 +65,8 @@ class Server(IServer): self._read_queue: Deque[Tuple[Line, Optional[Emit]]] = deque() - self._wait_fors: List[Tuple[WaitFor, Optional[Awaitable]]] = [] - self._wait_for_fut: Optional[Future[WaitFor]] = None + self._wait_fors: List[WaitFor] = [] + self._wait_for_fut: Dict[str, Future[bool]] = {} self._pending_who: Deque[str] = deque() @@ -200,7 +201,7 @@ class Server(IServer): line = await self.wait_for(end) - async def _next_lines(self) -> List[Tuple[Line, Optional[Emit]]]: + async def _next_lines(self) -> AsyncIterable[Tuple[Line, Optional[Emit]]]: ping_sent = False while True: try: @@ -223,45 +224,46 @@ class Server(IServer): self.disconnected = True raise - return lines + for both in lines: + yield both async def _line_or_wait(self, - line_aw: Awaitable - ) -> Optional[Tuple[WaitFor, Awaitable]]: - wait_for_fut: Future[WaitFor] = Future() - self._wait_for_fut = wait_for_fut + line_aw: asyncio.Task + ) -> Optional[Awaitable]: + task_name = line_aw.get_name() + wait_for_fut: Future[bool] = Future() + self._wait_for_fut[task_name] = wait_for_fut done, pend = await asyncio.wait([line_aw, wait_for_fut], return_when=asyncio.FIRST_COMPLETED) - self._wait_for_fut = None + del self._wait_for_fut[task_name] if wait_for_fut.done(): new_line_aw = list(pend)[0] - return (await wait_for_fut), new_line_aw + return new_line_aw else: return None - async def _read_lines(self) -> List[Tuple[Line, Optional[Emit]]]: - lines = await self._next_lines() - for line, emit in lines: - self.line_preread(line) + async def _read_lines(self) -> AsyncIterable[Tuple[Line, Optional[Emit]]]: + async with anyio.create_task_group() as tg: + async for both in self._next_lines(): + line, emit = both + self.line_preread(line) - for i, (wait_for, aw) in enumerate(self._wait_fors): - if wait_for.match(self, line): - wait_for.resolve(line) + for i, wait_for in enumerate(self._wait_fors): + if wait_for.match(self, line): + wait_for.resolve(line) + self._wait_fors.pop(i) + break - if aw is not None: - new_wait_for = await self._line_or_wait(aw) - if new_wait_for is not None: - self._wait_fors.append(new_wait_for) - self._wait_fors.pop(i) - break + line_aw = asyncio.create_task(self._on_read(line, emit)) + new_wait = await self._line_or_wait(line_aw) + if new_wait is not None: + async def _aw(): + await new_wait + await tg.spawn(_aw) - line_aw = self._on_read(line, emit) - new_wait_for = await self._line_or_wait(line_aw) - if new_wait_for is not None: - self._wait_fors.append(new_wait_for) - return lines + yield both async def wait_for(self, response: Union[IMatchResponse, Set[IMatchResponse]], @@ -275,13 +277,14 @@ class Server(IServer): response_obj = response our_wait_for = WaitFor(response_obj) + self._wait_fors.append(our_wait_for) - wait_for_fut = self._wait_for_fut - if wait_for_fut is not None: - self._wait_for_fut = None - wait_for_fut.set_result(our_wait_for) - else: - self._wait_fors.append((our_wait_for, None)) + cur_task = asyncio.current_task() + if cur_task is not None: + cur_task_name = cur_task.get_name() + if cur_task_name in self._wait_for_fut: + wait_for_fut = self._wait_for_fut[cur_task_name] + wait_for_fut.set_result(True) if sent_aw is not None: sent_line = await sent_aw