Refactor prompt to be more extensible
This commit is contained in:
364
babi.py
364
babi.py
@@ -215,30 +215,6 @@ class Status:
|
||||
def __init__(self) -> None:
|
||||
self._status = ''
|
||||
self._action_counter = -1
|
||||
self._history: Dict[str, List[str]] = collections.defaultdict(list)
|
||||
self._history_orig_len: Dict[str, int] = collections.defaultdict(int)
|
||||
self._history_prev: Dict[str, str] = {}
|
||||
|
||||
@contextlib.contextmanager
|
||||
def save_history(self) -> Generator[None, None, None]:
|
||||
history_dir = os.path.join(
|
||||
os.environ.get('XDG_DATA_HOME') or
|
||||
os.path.expanduser('~/.local/share'),
|
||||
'babi/history',
|
||||
)
|
||||
os.makedirs(history_dir, exist_ok=True)
|
||||
for filename in os.listdir(history_dir):
|
||||
with open(os.path.join(history_dir, filename)) as f:
|
||||
self._history[filename] = f.read().splitlines()
|
||||
self._history_orig_len[filename] = len(self._history[filename])
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
for k, v in self._history.items():
|
||||
new_history = v[self._history_orig_len[k]:]
|
||||
if new_history:
|
||||
with open(os.path.join(history_dir, k), 'a+') as f:
|
||||
f.write('\n'.join(new_history) + '\n')
|
||||
|
||||
def update(self, status: str) -> None:
|
||||
self._status = status
|
||||
@@ -267,149 +243,186 @@ class Status:
|
||||
if self._action_counter < 0:
|
||||
self.clear()
|
||||
|
||||
def cancelled(self) -> Union[str, PromptResult]:
|
||||
def cancelled(self) -> PromptResult:
|
||||
self.update('cancelled')
|
||||
return PromptResult.CANCELLED
|
||||
|
||||
def prompt(
|
||||
self,
|
||||
screen: 'Screen',
|
||||
prompt: str,
|
||||
*,
|
||||
allow_empty: bool = False,
|
||||
history: Optional[str] = None,
|
||||
default_prev: bool = False,
|
||||
default: Optional[str] = None,
|
||||
) -> Union[str, PromptResult]:
|
||||
self.clear()
|
||||
default = default or ''
|
||||
if history is not None:
|
||||
lst = [*self._history[history], default]
|
||||
lst_pos = len(lst) - 1
|
||||
if default_prev and history in self._history_prev:
|
||||
prompt = f'{prompt} [{self._history_prev[history]}]'
|
||||
|
||||
class Prompt:
|
||||
def __init__(self, screen: 'Screen', prompt: str, lst: List[str]) -> None:
|
||||
self._screen = screen
|
||||
self._prompt = prompt
|
||||
self._lst = lst
|
||||
self._y = len(lst) - 1
|
||||
self._x = 0
|
||||
|
||||
@property
|
||||
def _s(self) -> str:
|
||||
return self._lst[self._y]
|
||||
|
||||
@_s.setter
|
||||
def _s(self, s: str) -> None:
|
||||
self._lst[self._y] = s
|
||||
|
||||
def _render_prompt(self, *, base: Optional[str] = None) -> None:
|
||||
base = base or self._prompt
|
||||
if not base or curses.COLS < 7:
|
||||
prompt_s = ''
|
||||
elif len(base) > curses.COLS - 6:
|
||||
prompt_s = f'{base[:curses.COLS - 7]}…: '
|
||||
else:
|
||||
lst = [default]
|
||||
lst_pos = 0
|
||||
pos = 0
|
||||
prompt_s = f'{base}: '
|
||||
width = curses.COLS - len(prompt_s)
|
||||
line = _scrolled_line(self._s, self._x, width, current=True)
|
||||
cmd = f'{prompt_s}{line}'
|
||||
self._screen.stdscr.insstr(curses.LINES - 1, 0, cmd, curses.A_REVERSE)
|
||||
x = len(prompt_s) + self._x - _line_x(self._x, width)
|
||||
self._screen.stdscr.move(curses.LINES - 1, x)
|
||||
|
||||
def buf() -> str:
|
||||
return lst[lst_pos]
|
||||
def _resize(self) -> None:
|
||||
self._screen.resize()
|
||||
|
||||
def set_buf(s: str) -> None:
|
||||
lst[lst_pos] = s
|
||||
def _left(self) -> None:
|
||||
self._x = max(0, self._x - 1)
|
||||
|
||||
def _save_history_and_get_retv() -> Union[str, PromptResult]:
|
||||
if history is not None:
|
||||
prev = self._history_prev.get(history)
|
||||
entry = buf()
|
||||
if entry: # only put non-empty things in history
|
||||
history_lst = self._history[history]
|
||||
if not history_lst or history_lst[-1] != entry:
|
||||
history_lst.append(entry)
|
||||
self._history_prev[history] = entry
|
||||
def _right(self) -> None:
|
||||
self._x = min(len(self._s), self._x + 1)
|
||||
|
||||
if (
|
||||
default_prev and
|
||||
prev is not None and
|
||||
lst_pos == len(lst) - 1 and
|
||||
not lst[lst_pos]
|
||||
):
|
||||
return prev
|
||||
def _up(self) -> None:
|
||||
self._y = max(0, self._y - 1)
|
||||
self._x = len(self._s)
|
||||
|
||||
if not allow_empty and not buf():
|
||||
return self.cancelled()
|
||||
else:
|
||||
return buf()
|
||||
def _down(self) -> None:
|
||||
self._y = min(len(self._lst) - 1, self._y + 1)
|
||||
self._x = len(self._s)
|
||||
|
||||
def _render_prompt(*, base: str = prompt) -> None:
|
||||
if not base or curses.COLS < 7:
|
||||
prompt_s = ''
|
||||
elif len(base) > curses.COLS - 6:
|
||||
prompt_s = f'{base[:curses.COLS - 7]}…: '
|
||||
else:
|
||||
prompt_s = f'{base}: '
|
||||
width = curses.COLS - len(prompt_s)
|
||||
line = _scrolled_line(lst[lst_pos], pos, width, current=True)
|
||||
cmd = f'{prompt_s}{line}'
|
||||
screen.stdscr.insstr(curses.LINES - 1, 0, cmd, curses.A_REVERSE)
|
||||
line_x = _line_x(pos, width)
|
||||
screen.stdscr.move(curses.LINES - 1, len(prompt_s) + pos - line_x)
|
||||
def _home(self) -> None:
|
||||
self._x = 0
|
||||
|
||||
def _end(self) -> None:
|
||||
self._x = len(self._s)
|
||||
|
||||
def _backspace(self) -> None:
|
||||
if self._x > 0:
|
||||
self._s = self._s[:self._x - 1] + self._s[self._x:]
|
||||
self._x -= 1
|
||||
|
||||
def _delete(self) -> None:
|
||||
if self._x < len(self._s):
|
||||
self._s = self._s[:self._x] + self._s[self._x + 1:]
|
||||
|
||||
def _cut_to_end(self) -> None:
|
||||
self._s = self._s[:self._x]
|
||||
|
||||
def _reverse_search(self) -> Union[None, str, PromptResult]:
|
||||
reverse_s = ''
|
||||
reverse_idx = self._y
|
||||
while True:
|
||||
_render_prompt()
|
||||
key = _get_char(screen.stdscr)
|
||||
reverse_failed = False
|
||||
for search_idx in range(reverse_idx, -1, -1):
|
||||
if reverse_s in self._lst[search_idx]:
|
||||
reverse_idx = self._y = search_idx
|
||||
self._x = len(self._s)
|
||||
break
|
||||
else:
|
||||
reverse_failed = True
|
||||
|
||||
if reverse_failed:
|
||||
base = f'{self._prompt}(failed reverse-search)`{reverse_s}`'
|
||||
else:
|
||||
base = f'{self._prompt}(reverse-search)`{reverse_s}`'
|
||||
|
||||
self._render_prompt(base=base)
|
||||
key = _get_char(self._screen.stdscr)
|
||||
|
||||
if key.key == curses.KEY_RESIZE:
|
||||
screen.resize()
|
||||
elif key.key == curses.KEY_LEFT:
|
||||
pos = max(0, pos - 1)
|
||||
elif key.key == curses.KEY_RIGHT:
|
||||
pos = min(len(lst[lst_pos]), pos + 1)
|
||||
elif key.key == curses.KEY_UP:
|
||||
lst_pos = max(0, lst_pos - 1)
|
||||
pos = len(lst[lst_pos])
|
||||
elif key.key == curses.KEY_DOWN:
|
||||
lst_pos = min(len(lst) - 1, lst_pos + 1)
|
||||
pos = len(lst[lst_pos])
|
||||
elif key.key == curses.KEY_HOME or key.keyname == b'^A':
|
||||
pos = 0
|
||||
elif key.key == curses.KEY_END or key.keyname == b'^E':
|
||||
pos = len(lst[lst_pos])
|
||||
self._screen.resize()
|
||||
elif key.key == curses.KEY_BACKSPACE:
|
||||
if pos > 0:
|
||||
set_buf(buf()[:pos - 1] + buf()[pos:])
|
||||
pos -= 1
|
||||
elif key.key == curses.KEY_DC:
|
||||
if pos < len(lst[lst_pos]):
|
||||
set_buf(buf()[:pos] + buf()[pos + 1:])
|
||||
reverse_s = reverse_s[:-1]
|
||||
elif isinstance(key.wch, str) and key.wch.isprintable():
|
||||
set_buf(buf()[:pos] + key.wch + buf()[pos:])
|
||||
pos += 1
|
||||
elif key.keyname == b'^K':
|
||||
set_buf(buf()[:pos])
|
||||
reverse_s += key.wch
|
||||
elif key.keyname == b'^R':
|
||||
reverse_s = ''
|
||||
reverse_idx = lst_pos
|
||||
while True:
|
||||
reverse_failed = False
|
||||
for search_idx in range(reverse_idx, -1, -1):
|
||||
if reverse_s in lst[search_idx]:
|
||||
reverse_idx = lst_pos = search_idx
|
||||
pos = len(buf())
|
||||
break
|
||||
else:
|
||||
reverse_failed = True
|
||||
|
||||
if reverse_failed:
|
||||
base = f'{prompt}(failed reverse-search)`{reverse_s}`'
|
||||
else:
|
||||
base = f'{prompt}(reverse-search)`{reverse_s}`'
|
||||
|
||||
_render_prompt(base=base)
|
||||
key = _get_char(screen.stdscr)
|
||||
|
||||
if key.key == curses.KEY_RESIZE:
|
||||
screen.resize()
|
||||
elif key.key == curses.KEY_BACKSPACE:
|
||||
reverse_s = reverse_s[:-1]
|
||||
elif isinstance(key.wch, str) and key.wch.isprintable():
|
||||
reverse_s += key.wch
|
||||
elif key.keyname == b'^R':
|
||||
reverse_idx = max(0, reverse_idx - 1)
|
||||
elif key.keyname == b'^C':
|
||||
return self.cancelled()
|
||||
elif key.key == ord('\r'):
|
||||
return _save_history_and_get_retv()
|
||||
else:
|
||||
# python3.8+ optimizes this out
|
||||
# https://github.com/nedbat/coveragepy/issues/772
|
||||
break # pragma: no cover
|
||||
|
||||
reverse_idx = max(0, reverse_idx - 1)
|
||||
elif key.keyname == b'^C':
|
||||
return self.cancelled()
|
||||
return self._screen.status.cancelled()
|
||||
elif key.key == ord('\r'):
|
||||
return _save_history_and_get_retv()
|
||||
return self._s
|
||||
else:
|
||||
return None
|
||||
|
||||
def _cancel(self) -> PromptResult:
|
||||
return self._screen.status.cancelled()
|
||||
|
||||
def _submit(self) -> str:
|
||||
return self._s
|
||||
|
||||
DISPATCH = {
|
||||
curses.KEY_RESIZE: _resize,
|
||||
curses.KEY_LEFT: _left,
|
||||
curses.KEY_RIGHT: _right,
|
||||
curses.KEY_UP: _up,
|
||||
curses.KEY_DOWN: _down,
|
||||
curses.KEY_HOME: _home,
|
||||
curses.KEY_END: _end,
|
||||
curses.KEY_BACKSPACE: _backspace,
|
||||
curses.KEY_DC: _delete,
|
||||
ord('\r'): _submit,
|
||||
}
|
||||
DISPATCH_KEY = {
|
||||
b'^A': _home,
|
||||
b'^E': _end,
|
||||
b'^K': _cut_to_end,
|
||||
b'^R': _reverse_search,
|
||||
b'^C': _cancel,
|
||||
}
|
||||
|
||||
def _c(self, c: str) -> None:
|
||||
self._s = self._s[:self._x] + c + self._s[self._x:]
|
||||
self._x += 1
|
||||
|
||||
def run(self) -> Union[PromptResult, str]:
|
||||
while True:
|
||||
self._render_prompt()
|
||||
key = _get_char(self._screen.stdscr)
|
||||
|
||||
ret: Union[None, str, PromptResult] = None
|
||||
if key.key in Prompt.DISPATCH:
|
||||
ret = Prompt.DISPATCH[key.key](self)
|
||||
elif key.keyname in Prompt.DISPATCH_KEY:
|
||||
ret = Prompt.DISPATCH_KEY[key.keyname](self)
|
||||
elif isinstance(key.wch, str) and key.wch.isprintable():
|
||||
self._c(key.wch)
|
||||
|
||||
if ret is not None:
|
||||
return ret
|
||||
|
||||
|
||||
class History:
|
||||
def __init__(self) -> None:
|
||||
self._orig_len: Dict[str, int] = collections.defaultdict(int)
|
||||
self.data: Dict[str, List[str]] = collections.defaultdict(list)
|
||||
self.prev: Dict[str, str] = {}
|
||||
|
||||
@contextlib.contextmanager
|
||||
def save(self) -> Generator[None, None, None]:
|
||||
history_dir = os.path.join(
|
||||
os.environ.get('XDG_DATA_HOME') or
|
||||
os.path.expanduser('~/.local/share'),
|
||||
'babi/history',
|
||||
)
|
||||
os.makedirs(history_dir, exist_ok=True)
|
||||
for filename in os.listdir(history_dir):
|
||||
with open(os.path.join(history_dir, filename)) as f:
|
||||
self.data[filename] = f.read().splitlines()
|
||||
self._orig_len[filename] = len(self.data[filename])
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
for k, v in self.data.items():
|
||||
new_history = v[self._orig_len[k]:]
|
||||
if new_history:
|
||||
with open(os.path.join(history_dir, k), 'a+') as f:
|
||||
f.write('\n'.join(new_history) + '\n')
|
||||
|
||||
|
||||
def _restore_lines_eof_invariant(lines: MutableSequenceNoSlice) -> None:
|
||||
@@ -1029,6 +1042,7 @@ class Screen:
|
||||
self.stdscr = stdscr
|
||||
self.files = files
|
||||
self.i = 0
|
||||
self.history = History()
|
||||
self.status = Status()
|
||||
self.margin = Margin.from_current_screen()
|
||||
self.prevkey = Key('', 0, b'')
|
||||
@@ -1098,8 +1112,42 @@ class Screen:
|
||||
assert isinstance(key.wch, str) # mypy doesn't know
|
||||
return key.wch
|
||||
|
||||
def prompt(
|
||||
self,
|
||||
prompt: str,
|
||||
*,
|
||||
allow_empty: bool = False,
|
||||
history: Optional[str] = None,
|
||||
default_prev: bool = False,
|
||||
default: Optional[str] = None,
|
||||
) -> Union[str, PromptResult]:
|
||||
default = default or ''
|
||||
self.status.clear()
|
||||
if history is not None:
|
||||
history_data = [*self.history.data[history], default]
|
||||
if default_prev and history in self.history.prev:
|
||||
prompt = f'{prompt} [{self.history.prev[history]}]'
|
||||
else:
|
||||
history_data = [default]
|
||||
|
||||
ret = Prompt(self, prompt, history_data).run()
|
||||
|
||||
if ret is not PromptResult.CANCELLED and history is not None:
|
||||
if ret: # only put non-empty things in history
|
||||
history_lst = self.history.data[history]
|
||||
if not history_lst or history_lst[-1] != ret:
|
||||
history_lst.append(ret)
|
||||
self.history.prev[history] = ret
|
||||
elif default_prev and history in self.history.prev:
|
||||
return self.history.prev[history]
|
||||
|
||||
if not allow_empty and not ret:
|
||||
return self.status.cancelled()
|
||||
else:
|
||||
return ret
|
||||
|
||||
def go_to_line(self) -> None:
|
||||
response = self.status.prompt(self, 'enter line number')
|
||||
response = self.prompt('enter line number')
|
||||
if response is not PromptResult.CANCELLED:
|
||||
try:
|
||||
lineno = int(response)
|
||||
@@ -1126,9 +1174,7 @@ class Screen:
|
||||
self.file.uncut(self.cut_buffer, self.margin)
|
||||
|
||||
def _get_search_re(self, prompt: str) -> Union[Pattern[str], PromptResult]:
|
||||
response = self.status.prompt(
|
||||
self, prompt, history='search', default_prev=True,
|
||||
)
|
||||
response = self.prompt(prompt, history='search', default_prev=True)
|
||||
if response is PromptResult.CANCELLED:
|
||||
return response
|
||||
try:
|
||||
@@ -1165,14 +1211,14 @@ class Screen:
|
||||
def replace(self) -> None:
|
||||
search_response = self._get_search_re('search (to replace)')
|
||||
if search_response is not PromptResult.CANCELLED:
|
||||
response = self.status.prompt(
|
||||
self, 'replace with', history='replace', allow_empty=True,
|
||||
response = self.prompt(
|
||||
'replace with', history='replace', allow_empty=True,
|
||||
)
|
||||
if response is not PromptResult.CANCELLED:
|
||||
self.file.replace(self, search_response, response)
|
||||
|
||||
def command(self) -> Optional[EditResult]:
|
||||
response = self.status.prompt(self, '', history='command')
|
||||
response = self.prompt('', history='command')
|
||||
if response == ':q':
|
||||
return EditResult.EXIT
|
||||
elif response == ':w':
|
||||
@@ -1192,7 +1238,7 @@ class Screen:
|
||||
# TODO: strip trailing whitespace?
|
||||
# TODO: save atomically?
|
||||
if self.file.filename is None:
|
||||
filename = self.status.prompt(self, 'enter filename')
|
||||
filename = self.prompt('enter filename')
|
||||
if filename is PromptResult.CANCELLED:
|
||||
return PromptResult.CANCELLED
|
||||
else:
|
||||
@@ -1231,9 +1277,7 @@ class Screen:
|
||||
return None
|
||||
|
||||
def save_filename(self) -> Optional[PromptResult]:
|
||||
response = self.status.prompt(
|
||||
self, 'enter filename', default=self.file.filename,
|
||||
)
|
||||
response = self.prompt('enter filename', default=self.file.filename)
|
||||
if response is PromptResult.CANCELLED:
|
||||
return PromptResult.CANCELLED
|
||||
else:
|
||||
@@ -1396,7 +1440,7 @@ def c_main(stdscr: 'curses._CursesWindow', args: argparse.Namespace) -> None:
|
||||
if args.color_test:
|
||||
return _color_test(stdscr)
|
||||
screen = Screen(stdscr, [File(f) for f in args.filenames or [None]])
|
||||
with screen.status.save_history():
|
||||
with screen.history.save():
|
||||
while screen.files:
|
||||
screen.i = screen.i % len(screen.files)
|
||||
res = _edit(screen)
|
||||
|
||||
Reference in New Issue
Block a user