397 lines
11 KiB
Python
397 lines
11 KiB
Python
import contextlib
|
|
import curses
|
|
import os
|
|
import sys
|
|
from typing import List
|
|
from typing import NamedTuple
|
|
from typing import Union
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
from babi._types import Protocol
|
|
from babi.main import main
|
|
from babi.screen import VERSION_STR
|
|
from testing.runner import PrintsErrorRunner
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def xdg_data_home(tmpdir):
|
|
data_home = tmpdir.join('data_home')
|
|
with mock.patch.dict(os.environ, {'XDG_DATA_HOME': str(data_home)}):
|
|
yield data_home
|
|
|
|
|
|
@pytest.fixture
|
|
def ten_lines(tmpdir):
|
|
f = tmpdir.join('f')
|
|
f.write('\n'.join(f'line_{i}' for i in range(10)))
|
|
return f
|
|
|
|
|
|
class Screen:
|
|
def __init__(self, width, height):
|
|
self.disabled = True
|
|
self.nodelay = False
|
|
self.width = width
|
|
self.height = height
|
|
self.lines = [' ' * self.width for _ in range(self.height)]
|
|
self.x = self.y = 0
|
|
self._prev_screenshot = None
|
|
|
|
def screenshot(self):
|
|
ret = ''.join(f'{line.rstrip()}\n' for line in self.lines)
|
|
if ret != self._prev_screenshot:
|
|
print('=' * 79)
|
|
print(ret, end='')
|
|
print('=' * 79)
|
|
self._prev_screenshot = ret
|
|
return ret
|
|
|
|
def insstr(self, y, x, s):
|
|
line = self.lines[y]
|
|
self.lines[y] = (line[:x] + s + line[x:])[:self.width]
|
|
|
|
def move(self, y, x):
|
|
assert 0 <= y < self.height
|
|
assert 0 <= x < self.width
|
|
print(f'MOVE: y: {y}, x: {x}')
|
|
self.y, self.x = y, x
|
|
|
|
def resize(self, *, width, height):
|
|
if height > self.height:
|
|
self.lines.extend([''] * (height - self.height))
|
|
else:
|
|
self.lines = self.lines[:height]
|
|
if width > self.width:
|
|
self.lines[:] = [line.ljust(width) for line in self.lines]
|
|
else:
|
|
self.lines[:] = [line[:width] for line in self.lines]
|
|
self.width, self.height = width, height
|
|
|
|
|
|
class Op(Protocol):
|
|
def __call__(self, screen: Screen) -> None: ...
|
|
|
|
|
|
class AwaitText(NamedTuple):
|
|
text: str
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
if self.text not in screen.screenshot():
|
|
raise AssertionError(f'expected: {self.text!r}')
|
|
|
|
|
|
class AwaitTextMissing(NamedTuple):
|
|
text: str
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
if self.text in screen.screenshot():
|
|
raise AssertionError(f'expected missing: {self.text!r}')
|
|
|
|
|
|
class AwaitCursorPosition(NamedTuple):
|
|
x: int
|
|
y: int
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
assert (self.x, self.y) == (screen.x, screen.y)
|
|
|
|
|
|
class AssertCursorLineEquals(NamedTuple):
|
|
line: str
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
assert screen.lines[screen.y].rstrip() == self.line
|
|
|
|
|
|
class AssertScreenLineEquals(NamedTuple):
|
|
n: int
|
|
line: str
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
assert screen.lines[self.n].rstrip() == self.line
|
|
|
|
|
|
class AssertFullContents(NamedTuple):
|
|
contents: str
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
assert screen.screenshot() == self.contents
|
|
|
|
|
|
class Resize(NamedTuple):
|
|
width: int
|
|
height: int
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
screen.resize(width=self.width, height=self.height)
|
|
|
|
|
|
class KeyPress(NamedTuple):
|
|
wch: Union[int, str]
|
|
|
|
def __call__(self, screen: Screen) -> None:
|
|
raise AssertionError('unreachable')
|
|
|
|
|
|
class CursesError(NamedTuple):
|
|
def __call__(self, screen: Screen) -> None:
|
|
if screen.nodelay:
|
|
raise curses.error()
|
|
|
|
|
|
class CursesScreen:
|
|
def __init__(self, runner):
|
|
self._runner = runner
|
|
|
|
def keypad(self, val):
|
|
pass
|
|
|
|
def nodelay(self, val):
|
|
self._runner.screen.nodelay = val
|
|
|
|
def insstr(self, y, x, s, attr=0):
|
|
self._runner.screen.insstr(y, x, s)
|
|
|
|
def clrtoeol(self):
|
|
s = self._runner.screen.width * ' '
|
|
self.insstr(self._runner.screen.y, self._runner.screen.x, s)
|
|
|
|
def chgat(self, y, x, n, color):
|
|
pass
|
|
|
|
def move(self, y, x):
|
|
self._runner.screen.move(y, x)
|
|
|
|
def get_wch(self):
|
|
return self._runner._get_wch()
|
|
|
|
|
|
class Key(NamedTuple):
|
|
tmux: str
|
|
curses: bytes
|
|
wch: Union[int, str]
|
|
|
|
@property
|
|
def value(self) -> int:
|
|
return self.wch if isinstance(self.wch, int) else ord(self.wch)
|
|
|
|
|
|
KEYS = [
|
|
Key('Enter', b'^M', '\r'),
|
|
Key('Tab', b'^I', '\t'),
|
|
Key('BTab', b'KEY_BTAB', curses.KEY_BTAB),
|
|
Key('DC', b'KEY_DC', curses.KEY_DC),
|
|
Key('BSpace', b'KEY_BACKSPACE', curses.KEY_BACKSPACE),
|
|
Key('Up', b'KEY_UP', curses.KEY_UP),
|
|
Key('Down', b'KEY_DOWN', curses.KEY_DOWN),
|
|
Key('Right', b'KEY_RIGHT', curses.KEY_RIGHT),
|
|
Key('Left', b'KEY_LEFT', curses.KEY_LEFT),
|
|
Key('Home', b'KEY_HOME', curses.KEY_HOME),
|
|
Key('End', b'KEY_END', curses.KEY_END),
|
|
Key('PageUp', b'KEY_PPAGE', curses.KEY_PPAGE),
|
|
Key('PageDown', b'KEY_NPAGE', curses.KEY_NPAGE),
|
|
Key('^Up', b'kUP5', 566),
|
|
Key('^Down', b'kDN5', 525),
|
|
Key('^Right', b'kRIT5', 560),
|
|
Key('^Left', b'kLFT5', 545),
|
|
Key('^Home', b'kHOM5', 535),
|
|
Key('^End', b'kEND5', 530),
|
|
Key('M-Right', b'kRIT3', 558),
|
|
Key('M-Left', b'kLFT3', 543),
|
|
Key('S-Up', b'KEY_SR', curses.KEY_SR),
|
|
Key('S-Down', b'KEY_SF', curses.KEY_SF),
|
|
Key('S-Right', b'KEY_SRIGHT', curses.KEY_SRIGHT),
|
|
Key('S-Left', b'KEY_SLEFT', curses.KEY_SLEFT),
|
|
Key('S-Home', b'KEY_SHOME', curses.KEY_SHOME),
|
|
Key('S-End', b'KEY_SEND', curses.KEY_SEND),
|
|
Key('^A', b'^A', '\x01'),
|
|
Key('^C', b'^C', '\x03'),
|
|
Key('^H', b'^H', '\x08'),
|
|
Key('^K', b'^K', '\x0b'),
|
|
Key('^E', b'^E', '\x05'),
|
|
Key('^J', b'^J', '\n'),
|
|
Key('^O', b'^O', '\x0f'),
|
|
Key('^R', b'^R', '\x12'),
|
|
Key('^S', b'^S', '\x13'),
|
|
Key('^U', b'^U', '\x15'),
|
|
Key('^V', b'^V', '\x16'),
|
|
Key('^W', b'^W', '\x17'),
|
|
Key('^X', b'^X', '\x18'),
|
|
Key('^Y', b'^Y', '\x19'),
|
|
Key('^[', b'^[', '\x1b'),
|
|
Key('^_', b'^_', '\x1f'),
|
|
Key('^\\', b'^\\', '\x1c'),
|
|
Key('!resize', b'KEY_RESIZE', curses.KEY_RESIZE),
|
|
]
|
|
KEYS_TMUX = {k.tmux: k.value for k in KEYS}
|
|
KEYS_CURSES = {k.value: k.curses for k in KEYS}
|
|
|
|
|
|
class DeferredRunner:
|
|
def __init__(self, command, width=80, height=24, term='screen'):
|
|
self.command = command
|
|
self._i = 0
|
|
self._ops: List[Op] = []
|
|
self.screen = Screen(width, height)
|
|
self._n_colors, self._can_change_color = {
|
|
'screen': (8, False),
|
|
'screen-256color': (256, False),
|
|
'xterm-256color': (256, True),
|
|
}[term]
|
|
|
|
def _get_wch(self):
|
|
while not isinstance(self._ops[self._i], KeyPress):
|
|
self._i += 1
|
|
try:
|
|
self._ops[self._i - 1](self.screen)
|
|
except AssertionError: # pragma: no cover (only on failures)
|
|
self.screen.screenshot()
|
|
raise
|
|
self._i += 1
|
|
keypress_event = self._ops[self._i - 1]
|
|
assert isinstance(keypress_event, KeyPress)
|
|
print(f'KEY: {keypress_event.wch!r}')
|
|
return keypress_event.wch
|
|
|
|
def await_text(self, text):
|
|
self._ops.append(AwaitText(text))
|
|
|
|
def await_text_missing(self, text):
|
|
self._ops.append(AwaitTextMissing(text))
|
|
|
|
def await_cursor_position(self, *, x, y):
|
|
self._ops.append(AwaitCursorPosition(x, y))
|
|
|
|
def assert_cursor_line_equals(self, line):
|
|
self._ops.append(AssertCursorLineEquals(line))
|
|
|
|
def assert_screen_line_equals(self, n, line):
|
|
self._ops.append(AssertScreenLineEquals(n, line))
|
|
|
|
def assert_full_contents(self, contents):
|
|
self._ops.append(AssertFullContents(contents))
|
|
|
|
def run(self, callback):
|
|
self._ops.append(lambda screen: callback())
|
|
|
|
def _expand_key(self, s):
|
|
if s == 'Escape':
|
|
return [KeyPress('\x1b'), CursesError()]
|
|
elif s in KEYS_TMUX:
|
|
return [KeyPress(KEYS_TMUX[s])]
|
|
elif s.startswith('^') and len(s) > 1 and s[1].isupper():
|
|
raise AssertionError(f'unknown key {s}')
|
|
elif s.startswith('M-'):
|
|
return [KeyPress('\x1b'), KeyPress(s[2:]), CursesError()]
|
|
else:
|
|
return [*(KeyPress(k) for k in s), CursesError()]
|
|
|
|
def press(self, s):
|
|
self._ops.extend(self._expand_key(s))
|
|
|
|
def press_and_enter(self, s):
|
|
self.press(s)
|
|
self.press('Enter')
|
|
|
|
def press_sequence(self, *ks):
|
|
for k in ks:
|
|
for op in self._expand_key(k):
|
|
if not isinstance(op, CursesError):
|
|
self._ops.append(op)
|
|
self._ops.append(CursesError())
|
|
|
|
def answer_no_if_modified(self):
|
|
self.press('n')
|
|
|
|
@contextlib.contextmanager
|
|
def resize(self, *, width, height):
|
|
orig_width, orig_height = self.screen.width, self.screen.height
|
|
self._ops.append(Resize(width, height))
|
|
self._ops.append(KeyPress(curses.KEY_RESIZE))
|
|
try:
|
|
yield
|
|
finally:
|
|
self._ops.append(Resize(orig_width, orig_height))
|
|
self._ops.append(KeyPress(curses.KEY_RESIZE))
|
|
|
|
def _curses__noop(self, *_, **__):
|
|
pass
|
|
|
|
_curses_cbreak = _curses_init_pair = _curses_noecho = _curses__noop
|
|
_curses_nonl = _curses_raw = _curses_use_default_colors = _curses__noop
|
|
|
|
_curses_error = curses.error # so we don't mock the exception
|
|
|
|
def _curses_keyname(self, k):
|
|
return KEYS_CURSES.get(k, b'')
|
|
|
|
def _curses_update_lines_cols(self):
|
|
curses.LINES = self.screen.height
|
|
curses.COLS = self.screen.width
|
|
|
|
def _curses_start_color(self):
|
|
curses.COLORS = self._n_colors
|
|
|
|
def _curses_initscr(self):
|
|
self._curses_update_lines_cols()
|
|
self.screen.disabled = False
|
|
return CursesScreen(self)
|
|
|
|
def _curses_endwin(self):
|
|
self.screen.disabled = True
|
|
|
|
def _curses_not_implemented(self, fn):
|
|
def fn_inner(*args, **kwargs):
|
|
raise NotImplementedError(fn)
|
|
return fn_inner
|
|
|
|
def _patch_curses(self):
|
|
patches = {
|
|
k: getattr(self, f'_curses_{k}', self._curses_not_implemented(k))
|
|
for k in dir(curses)
|
|
if not k.startswith('_') and callable(getattr(curses, k))
|
|
}
|
|
return mock.patch.multiple(curses, **patches)
|
|
|
|
def await_exit(self):
|
|
with self._patch_curses():
|
|
main(self.command)
|
|
# we have already exited -- check remaining things
|
|
# KeyPress with failing condition or error
|
|
for i in range(self._i, len(self._ops)):
|
|
if self._ops[i] not in {KeyPress('n'), CursesError()}:
|
|
raise AssertionError(self._ops[i:])
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def run_fake(*cmd, **kwargs):
|
|
h = DeferredRunner(cmd, **kwargs)
|
|
h.await_text(VERSION_STR)
|
|
yield h
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def run_tmux(*args, term='screen', **kwargs):
|
|
cmd = (sys.executable, '-mcoverage', 'run', '-m', 'babi', *args)
|
|
cmd = ('env', f'TERM={term}', *cmd)
|
|
with PrintsErrorRunner(*cmd, **kwargs) as h, h.on_error():
|
|
# startup with coverage can be slow
|
|
h.await_text(VERSION_STR, timeout=2)
|
|
yield h
|
|
|
|
|
|
@pytest.fixture(
|
|
scope='session',
|
|
params=[run_fake, run_tmux],
|
|
ids=['fake', 'tmux'],
|
|
)
|
|
def run(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(scope='session', params=[run_fake], ids=['fake'])
|
|
def run_only_fake(request):
|
|
return request.param
|