import contextlib import curses import enum import re from typing import List from typing import Tuple from hecate import Runner class Token(enum.Enum): FG_ESC = re.compile(r'\x1b\[38;5;(\d+)m') BG_ESC = re.compile(r'\x1b\[48;5;(\d+)m') RESET = re.compile(r'\x1b\[0?m') ESC = re.compile(r'\x1b\[(\d+)m') NL = re.compile(r'\n') CHAR = re.compile('.') def tokenize_colors(s): i = 0 while i < len(s): for tp in Token: match = tp.value.match(s, i) if match is not None: yield tp, match i = match.end() break else: raise AssertionError(f'unreachable: not matched at {i}?') def to_attrs(screen, width): fg = bg = -1 attr = 0 idx = 0 ret: List[List[Tuple[int, int, int]]] ret = [[] for _ in range(len(screen.splitlines()))] for tp, match in tokenize_colors(screen): if tp is Token.FG_ESC: fg = int(match[1]) elif tp is Token.BG_ESC: bg = int(match[1]) elif tp is Token.RESET: fg = bg = -1 attr = 0 elif tp is Token.ESC: if match[1] == '7': attr |= curses.A_REVERSE elif match[1] == '39': fg = -1 elif match[1] == '49': bg = -1 elif 40 <= int(match[1]) <= 47: bg = int(match[1]) - 40 else: raise AssertionError(f'unknown escape {match[1]}') elif tp is Token.NL: ret[idx].extend([(fg, bg, attr)] * (width - len(ret[idx]))) idx += 1 elif tp is Token.CHAR: ret[idx].append((fg, bg, attr)) else: raise AssertionError(f'unreachable {tp} {match}') return ret class PrintsErrorRunner(Runner): def __init__(self, *args, **kwargs): self._prev_screenshot = None super().__init__(*args, **kwargs) def screenshot(self, *args, **kwargs): ret = super().screenshot(*args, **kwargs) if ret != self._prev_screenshot: print('=' * 79, flush=True) print(ret, end='', flush=True) print('=' * 79, flush=True) self._prev_screenshot = ret return ret def color_screenshot(self): ret = self.tmux.execute_command('capture-pane', '-ept0') if ret != self._prev_screenshot: print('=' * 79, flush=True) print(ret, end='\x1b[m', flush=True) print('=' * 79, flush=True) self._prev_screenshot = ret return ret def get_attrs(self): width, _ = self.get_pane_size() return to_attrs(self.color_screenshot(), width) def await_text(self, text, timeout=None): """copied from the base implementation but doesn't munge newlines""" for _ in self.poll_until_timeout(timeout): screen = self.screenshot() if text in screen: # pragma: no branch return raise AssertionError( f'Timeout while waiting for text {text!r} to appear', ) def await_text_missing(self, s): """largely based on await_text""" for _ in self.poll_until_timeout(): screen = self.screenshot() munged = screen.replace('\n', '') if s not in munged: # pragma: no branch return raise AssertionError( f'Timeout while waiting for text {s!r} to disappear', ) def assert_cursor_line_equals(self, s): cursor_line = self._get_cursor_line() assert cursor_line == s, (cursor_line, s) def assert_screen_line_equals(self, n, s): screen_line = self._get_screen_line(n) assert screen_line == s, (screen_line, s) def assert_screen_attr_equals(self, n, attr): attr_line = self.get_attrs()[n] assert attr_line == attr, (n, attr_line, attr) def assert_full_contents(self, s): contents = self.screenshot() assert contents == s def get_pane_size(self): cmd = ('display', '-t0', '-p', '#{pane_width}\t#{pane_height}') w, h = self.tmux.execute_command(*cmd).split() return int(w), int(h) def _get_cursor_position(self): cmd = ('display', '-t0', '-p', '#{cursor_x}\t#{cursor_y}') x, y = self.tmux.execute_command(*cmd).split() return int(x), int(y) def await_cursor_position(self, *, x, y): for _ in self.poll_until_timeout(): pos = self._get_cursor_position() if pos == (x, y): # pragma: no branch return raise AssertionError( f'Timeout while waiting for cursor to reach {(x, y)}\n' f'Last cursor position: {pos}', ) def _get_screen_line(self, n): return self.screenshot().splitlines()[n] def _get_cursor_line(self): _, y = self._get_cursor_position() return self._get_screen_line(y) @contextlib.contextmanager def resize(self, width, height): current_w, current_h = self.get_pane_size() sleep_cmd = ( 'bash', '-c', f'echo {"*" * (current_w * current_h)} && ' f'exec sleep infinity', ) panes = 0 hsplit_w = current_w - width - 1 if hsplit_w > 0: cmd = ('split-window', '-ht0', '-l', hsplit_w, *sleep_cmd) self.tmux.execute_command(*cmd) panes += 1 vsplit_h = current_h - height - 1 if vsplit_h > 0: # pragma: no branch # TODO cmd = ('split-window', '-vt0', '-l', vsplit_h, *sleep_cmd) self.tmux.execute_command(*cmd) panes += 1 assert self.get_pane_size() == (width, height) try: yield finally: for _ in range(panes): self.tmux.execute_command('kill-pane', '-t1') def press_and_enter(self, s): self.press(s) self.press('Enter') def answer_no_if_modified(self): if '*' in self._get_screen_line(0): self.press('n') def run(self, callback): # this is a bit of a hack, the in-process fake defers all execution callback() @contextlib.contextmanager def on_error(self): try: yield except AssertionError: # pragma: no cover (only on failure) self.screenshot() raise @contextlib.contextmanager def and_exit(h): yield # only try and exit in non-exceptional cases h.press('^X') h.answer_no_if_modified() h.await_exit() def trigger_command_mode(h): # in order to enter a steady state, trigger an unknown key first and then # press escape to open the command mode. this is necessary as `Escape` is # the start of "escape sequences" and sending characters too quickly will # be interpreted as a single keypress h.press('^J') h.await_text('unknown key') h.press('Escape') h.await_text_missing('unknown key')