Fix race condition with multiple escape sequences in quick succession

Resolves #31
This commit is contained in:
Anthony Sottile
2020-03-06 15:07:04 -08:00
parent b11575b998
commit ec7fbba633
3 changed files with 123 additions and 13 deletions

View File

@@ -98,22 +98,66 @@ class Screen:
s = f' {VERSION_STR} {files}{centered}{files}'
self.stdscr.insstr(0, 0, s, curses.A_REVERSE)
def _get_sequence_home_end(self, wch: str) -> str:
try:
c = self.stdscr.get_wch()
except curses.error:
return wch
else:
if isinstance(c, int) or c not in 'HF':
self._buffered_input = c
return wch
else:
return f'{wch}{c}'
def _get_sequence_bracketed(self, wch: str) -> str:
for _ in range(3): # [0-9]{1,2};
try:
c = self.stdscr.get_wch()
except curses.error:
return wch
else:
if isinstance(c, int):
self._buffered_input = c
return wch
else:
wch += c
if c == ';':
break
else:
return wch # unexpected input while searching for `;`
for _ in range(2): # [0-9].
try:
c = self.stdscr.get_wch()
except curses.error:
return wch
else:
if isinstance(c, int):
self._buffered_input = c
return wch
else:
wch += c
return wch
def _get_sequence(self, wch: str) -> str:
self.stdscr.nodelay(True)
try:
while True:
try:
c = self.stdscr.get_wch()
if isinstance(c, str):
wch += c
else: # pragma: no cover (race)
self._buffered_input = c
break
except curses.error:
break
c = self.stdscr.get_wch()
except curses.error:
return wch
else:
if isinstance(c, int): # M-BSpace
return f'{wch}({c})' # TODO
elif c == 'O':
return self._get_sequence_home_end(f'{wch}O')
elif c == '[':
return self._get_sequence_bracketed(f'{wch}[')
else:
return f'{wch}{c}'
finally:
self.stdscr.nodelay(False)
return wch
def _get_string(self, wch: str) -> str:
self.stdscr.nodelay(True)

View File

@@ -33,6 +33,7 @@ def ten_lines(tmpdir):
class Screen:
def __init__(self, width, height):
self.disabled = True
self.nodelay = False
self.width = width
self.height = height
self.lines = [' ' * self.width for _ in range(self.height)]
@@ -137,7 +138,8 @@ class KeyPress(NamedTuple):
class CursesError(NamedTuple):
def __call__(self, screen: Screen) -> None:
raise curses.error()
if screen.nodelay:
raise curses.error()
class CursesScreen:
@@ -160,7 +162,7 @@ class CursesScreen:
pass
def nodelay(self, val):
pass
self._runner.screen.nodelay = val
class Key(NamedTuple):
@@ -286,6 +288,13 @@ class DeferredRunner:
self.press(s)
self.press('Enter')
def press_sequence(self, *ks):
for k in ks:
for op in self._expand_key(k):
if not isinstance(op, CursesError):
self._ops.append(op)
self._ops.append(CursesError())
def answer_no_if_modified(self):
self.press('n')
@@ -374,3 +383,8 @@ def run_tmux(*args, colors=256, **kwargs):
)
def run(request):
return request.param
@pytest.fixture(scope='session', params=[run_fake], ids=['fake'])
def run_only_fake(request):
return request.param

View File

@@ -359,3 +359,55 @@ def test_ctrl_left_triggering_scroll(run, jump_word_file):
h.press('^Left')
h.await_cursor_position(x=11, y=1)
h.assert_cursor_line_equals('hello world')
def test_sequence_handling(run_only_fake):
# this test is run with the fake runner since it simulates some situations
# that are either impossible or due to race conditions (that we can only
# force with the fake runner)
with run_only_fake() as h, and_exit(h):
h.press_sequence('\x1b[1;5C\x1b[1;5D test1') # ^Left + ^Right
h.await_text('test1')
h.await_text_missing('unknown key')
h.press_sequence('\x1bOH', '\x1bOF', ' test2') # Home + End
h.await_text('test1 test2')
h.await_text_missing('unknown key')
h.press_sequence(' tq', 'M-O', 'BSpace', 'est3')
h.await_text('test1 test2 test3')
h.await_text('unknown key')
h.await_text('M-O')
h.press('M-[')
h.await_text_missing('M-O')
h.await_text('M-[')
h.press('M-O')
h.await_text_missing('M-[')
h.await_text('M-O')
h.press_sequence(' tq', 'M-[', 'BSpace', 'est4')
h.await_text('test1 test2 test3 test4')
h.await_text_missing('M-O')
h.await_text('M-[')
# TODO: this is broken for now, not quite sure what to do with it
h.press_sequence('\x1b', 'BSpace')
h.await_text(r'\x1b(263)')
# the sequences after here are "wrong" but I don't think a human
# could type them
h.press_sequence(' tq', '\x1b[1;', 'BSpace', 'est5')
h.await_text('test1 test2 test3 test4 test5')
h.await_text(r'\x1b[1;')
h.press_sequence('\x1b[111', ' test6')
h.await_text('test1 test2 test3 test4 test5 test6')
h.await_text(r'\x1b[111')
h.press('\x1b[1;')
h.press(' test7')
h.await_text('test1 test2 test3 test4 test5 test6 test7')
h.await_text(r'\x1b[1;')