Implement save-on-exit
This commit is contained in:
211
babi.py
211
babi.py
@@ -35,6 +35,8 @@ else:
|
||||
|
||||
VERSION_STR = 'babi v0'
|
||||
TCallable = TypeVar('TCallable', bound=Callable[..., Any])
|
||||
EditResult = enum.Enum('EditResult', 'EXIT NEXT PREV')
|
||||
PromptResult = enum.Enum('PromptResult', 'CANCELLED')
|
||||
|
||||
|
||||
def _line_x(x: int, width: int) -> int:
|
||||
@@ -259,15 +261,20 @@ class Status:
|
||||
if self._action_counter < 0:
|
||||
self.clear()
|
||||
|
||||
def _cancel(self) -> Union[str, PromptResult]:
|
||||
self.update('cancelled')
|
||||
return PromptResult.CANCELLED
|
||||
|
||||
def prompt(
|
||||
self,
|
||||
screen: 'Screen',
|
||||
prompt: str,
|
||||
*,
|
||||
allow_empty: bool = False,
|
||||
history: Optional[str] = None,
|
||||
default_prev: bool = False,
|
||||
default: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
) -> Union[str, PromptResult]:
|
||||
self.clear()
|
||||
default = default or ''
|
||||
if history is not None:
|
||||
@@ -286,7 +293,7 @@ class Status:
|
||||
def set_buf(s: str) -> None:
|
||||
lst[lst_pos] = s
|
||||
|
||||
def _save_history_and_get_retv() -> str:
|
||||
def _save_history_and_get_retv() -> Union[str, PromptResult]:
|
||||
if history is not None:
|
||||
prev = self._history_prev.get(history)
|
||||
entry = buf()
|
||||
@@ -304,7 +311,10 @@ class Status:
|
||||
):
|
||||
return prev
|
||||
|
||||
return buf()
|
||||
if not allow_empty and not buf():
|
||||
return self._cancel()
|
||||
else:
|
||||
return buf()
|
||||
|
||||
def _render_prompt(*, base: str = prompt) -> None:
|
||||
if not base or curses.COLS < 7:
|
||||
@@ -380,7 +390,7 @@ class Status:
|
||||
elif key.keyname == b'^R':
|
||||
reverse_idx = max(0, reverse_idx - 1)
|
||||
elif key.keyname == b'^C':
|
||||
return None
|
||||
return self._cancel()
|
||||
elif key.key == ord('\r'):
|
||||
return _save_history_and_get_retv()
|
||||
else:
|
||||
@@ -389,7 +399,7 @@ class Status:
|
||||
break # pragma: no cover
|
||||
|
||||
elif key.keyname == b'^C':
|
||||
return None
|
||||
return self._cancel()
|
||||
elif key.key == ord('\r'):
|
||||
return _save_history_and_get_retv()
|
||||
|
||||
@@ -398,8 +408,9 @@ class Status:
|
||||
screen: 'Screen',
|
||||
prompt: str,
|
||||
options: FrozenSet[str],
|
||||
resize: Callable[[], None],
|
||||
) -> Optional[str]:
|
||||
*,
|
||||
resize: Optional[Callable[[], None]] = None,
|
||||
) -> Union[str, PromptResult]:
|
||||
while True:
|
||||
s = prompt.ljust(curses.COLS)
|
||||
if len(s) > curses.COLS:
|
||||
@@ -411,9 +422,10 @@ class Status:
|
||||
key = _get_char(screen.stdscr)
|
||||
if key.key == curses.KEY_RESIZE:
|
||||
screen.resize()
|
||||
resize()
|
||||
if resize is not None:
|
||||
resize()
|
||||
elif key.keyname == b'^C':
|
||||
return None
|
||||
return self._cancel()
|
||||
elif key.wch in options:
|
||||
assert isinstance(key.wch, str) # mypy doesn't know
|
||||
return key.wch
|
||||
@@ -796,7 +808,7 @@ class File:
|
||||
screen.stdscr.addstr(y, x, s, curses.A_REVERSE)
|
||||
|
||||
count = 0
|
||||
res: Optional[str] = ''
|
||||
res: Union[str, PromptResult] = ''
|
||||
search = _SearchIter(self, reg, offset=0)
|
||||
for line_y, match in search:
|
||||
self.cursor_y = line_y
|
||||
@@ -807,7 +819,7 @@ class File:
|
||||
highlight()
|
||||
res = screen.status.quick_prompt(
|
||||
screen, 'replace [y(es), n(o), a(ll)]?',
|
||||
frozenset('yna'), highlight,
|
||||
frozenset('yna'), resize=highlight,
|
||||
)
|
||||
if res in {'y', 'a'}:
|
||||
count += 1
|
||||
@@ -821,8 +833,7 @@ class File:
|
||||
elif res == 'n':
|
||||
search.offset = 1
|
||||
else:
|
||||
assert res is None
|
||||
screen.status.update('cancelled')
|
||||
assert res is PromptResult.CANCELLED
|
||||
return
|
||||
|
||||
if res == '': # we never went through the loop
|
||||
@@ -1026,51 +1037,6 @@ class File:
|
||||
'redo', self.redo_stack, self.undo_stack, status, margin,
|
||||
)
|
||||
|
||||
@action
|
||||
def save(self, screen: 'Screen', status: Status) -> None:
|
||||
# TODO: make directories if they don't exist
|
||||
# TODO: maybe use mtime / stat as a shortcut for hashing below
|
||||
# TODO: strip trailing whitespace?
|
||||
# TODO: save atomically?
|
||||
if self.filename is None:
|
||||
filename = status.prompt(screen, 'enter filename')
|
||||
if not filename:
|
||||
status.update('cancelled')
|
||||
return
|
||||
else:
|
||||
self.filename = filename
|
||||
|
||||
if os.path.isfile(self.filename):
|
||||
with open(self.filename) as f:
|
||||
*_, sha256 = _get_lines(f)
|
||||
else:
|
||||
sha256 = hashlib.sha256(b'').hexdigest()
|
||||
|
||||
contents = self.nl.join(self.lines)
|
||||
sha256_to_save = hashlib.sha256(contents.encode()).hexdigest()
|
||||
|
||||
# the file on disk is the same as when we opened it
|
||||
if sha256 not in (self.sha256, sha256_to_save):
|
||||
status.update('(file changed on disk, not implemented)')
|
||||
return
|
||||
|
||||
with open(self.filename, 'w') as f:
|
||||
f.write(contents)
|
||||
|
||||
self.modified = False
|
||||
self.sha256 = sha256_to_save
|
||||
num_lines = len(self.lines) - 1
|
||||
lines = 'lines' if num_lines != 1 else 'line'
|
||||
status.update(f'saved! ({num_lines} {lines} written)')
|
||||
|
||||
# fix up modified state in undo / redo stacks
|
||||
for stack in (self.undo_stack, self.redo_stack):
|
||||
first = True
|
||||
for action in reversed(stack):
|
||||
action.end_modified = not first
|
||||
action.start_modified = True
|
||||
first = False
|
||||
|
||||
# positioning
|
||||
|
||||
def rendered_y(self, margin: Margin) -> int:
|
||||
@@ -1234,7 +1200,90 @@ def _get_char(stdscr: 'curses._CursesWindow') -> Key:
|
||||
return Key(wch, key, keyname)
|
||||
|
||||
|
||||
EditResult = enum.Enum('EditResult', 'EXIT NEXT PREV')
|
||||
def _save(screen: Screen) -> Optional[PromptResult]:
|
||||
screen.file.mark_previous_action_as_final()
|
||||
|
||||
# TODO: make directories if they don't exist
|
||||
# TODO: maybe use mtime / stat as a shortcut for hashing below
|
||||
# TODO: strip trailing whitespace?
|
||||
# TODO: save atomically?
|
||||
if screen.file.filename is None:
|
||||
filename = screen.status.prompt(screen, 'enter filename')
|
||||
if filename is PromptResult.CANCELLED:
|
||||
return PromptResult.CANCELLED
|
||||
else:
|
||||
screen.file.filename = filename
|
||||
|
||||
if os.path.isfile(screen.file.filename):
|
||||
with open(screen.file.filename) as f:
|
||||
*_, sha256 = _get_lines(f)
|
||||
else:
|
||||
sha256 = hashlib.sha256(b'').hexdigest()
|
||||
|
||||
contents = screen.file.nl.join(screen.file.lines)
|
||||
sha256_to_save = hashlib.sha256(contents.encode()).hexdigest()
|
||||
|
||||
# the file on disk is the same as when we opened it
|
||||
if sha256 not in (screen.file.sha256, sha256_to_save):
|
||||
screen.status.update('(file changed on disk, not implemented)')
|
||||
return PromptResult.CANCELLED
|
||||
|
||||
with open(screen.file.filename, 'w') as f:
|
||||
f.write(contents)
|
||||
|
||||
screen.file.modified = False
|
||||
screen.file.sha256 = sha256_to_save
|
||||
num_lines = len(screen.file.lines) - 1
|
||||
lines = 'lines' if num_lines != 1 else 'line'
|
||||
screen.status.update(f'saved! ({num_lines} {lines} written)')
|
||||
|
||||
# fix up modified state in undo / redo stacks
|
||||
for stack in (screen.file.undo_stack, screen.file.redo_stack):
|
||||
first = True
|
||||
for action in reversed(stack):
|
||||
action.end_modified = not first
|
||||
action.start_modified = True
|
||||
first = False
|
||||
return None
|
||||
|
||||
|
||||
def _save_filename(screen: Screen) -> Optional[PromptResult]:
|
||||
response = screen.status.prompt(
|
||||
screen, 'enter filename', default=screen.file.filename,
|
||||
)
|
||||
if response is PromptResult.CANCELLED:
|
||||
return PromptResult.CANCELLED
|
||||
else:
|
||||
screen.file.filename = response
|
||||
return _save(screen)
|
||||
|
||||
|
||||
def _quit(screen: Screen) -> Optional[EditResult]:
|
||||
if screen.file.modified:
|
||||
response = screen.status.quick_prompt(
|
||||
screen, 'file is modified - save [y(es), n(o)]?', frozenset('yn'),
|
||||
)
|
||||
if response == 'y':
|
||||
if _save_filename(screen) is not PromptResult.CANCELLED:
|
||||
return EditResult.EXIT
|
||||
else:
|
||||
return None
|
||||
elif response == 'n':
|
||||
return EditResult.EXIT
|
||||
else:
|
||||
assert response is PromptResult.CANCELLED
|
||||
return None
|
||||
return EditResult.EXIT
|
||||
|
||||
|
||||
ScreenFunc = Callable[[Screen], Union[None, PromptResult, EditResult]]
|
||||
DISPATCH: Dict[bytes, ScreenFunc] = {
|
||||
b'^S': _save,
|
||||
b'^O': _save_filename,
|
||||
b'^X': _quit,
|
||||
b'kLFT3': lambda screen: EditResult.PREV,
|
||||
b'kRIT3': lambda screen: EditResult.NEXT,
|
||||
}
|
||||
|
||||
|
||||
def _edit(screen: Screen) -> EditResult:
|
||||
@@ -1269,9 +1318,7 @@ def _edit(screen: Screen) -> EditResult:
|
||||
screen.file.redo(screen.status, screen.margin)
|
||||
elif key.keyname == b'^_':
|
||||
response = screen.status.prompt(screen, 'enter line number')
|
||||
if not response:
|
||||
screen.status.update('cancelled')
|
||||
else:
|
||||
if response is not PromptResult.CANCELLED:
|
||||
try:
|
||||
lineno = int(response)
|
||||
except ValueError:
|
||||
@@ -1282,9 +1329,7 @@ def _edit(screen: Screen) -> EditResult:
|
||||
response = screen.status.prompt(
|
||||
screen, 'search', history='search', default_prev=True,
|
||||
)
|
||||
if not response:
|
||||
screen.status.update('cancelled')
|
||||
else:
|
||||
if response is not PromptResult.CANCELLED:
|
||||
try:
|
||||
regex = re.compile(response)
|
||||
except re.error:
|
||||
@@ -1296,9 +1341,7 @@ def _edit(screen: Screen) -> EditResult:
|
||||
screen, 'search (to replace)',
|
||||
history='search', default_prev=True,
|
||||
)
|
||||
if not response:
|
||||
screen.status.update('cancelled')
|
||||
else:
|
||||
if response is not PromptResult.CANCELLED:
|
||||
try:
|
||||
regex = re.compile(response)
|
||||
except re.error:
|
||||
@@ -1306,10 +1349,9 @@ def _edit(screen: Screen) -> EditResult:
|
||||
else:
|
||||
response = screen.status.prompt(
|
||||
screen, 'replace with', history='replace',
|
||||
allow_empty=True,
|
||||
)
|
||||
if response is None:
|
||||
screen.status.update('cancelled')
|
||||
else:
|
||||
if response is not PromptResult.CANCELLED:
|
||||
screen.file.replace(screen, regex, response)
|
||||
elif key.keyname == b'^C':
|
||||
screen.file.current_position(screen.status)
|
||||
@@ -1318,29 +1360,16 @@ def _edit(screen: Screen) -> EditResult:
|
||||
if response == ':q':
|
||||
return EditResult.EXIT
|
||||
elif response == ':w':
|
||||
screen.file.save(screen, screen.status)
|
||||
_save(screen)
|
||||
elif response == ':wq':
|
||||
screen.file.save(screen, screen.status)
|
||||
_save(screen)
|
||||
return EditResult.EXIT
|
||||
elif response: # noop / cancel
|
||||
elif response is not PromptResult.CANCELLED:
|
||||
screen.status.update(f'invalid command: {response}')
|
||||
elif key.keyname == b'^S':
|
||||
screen.file.save(screen, screen.status)
|
||||
elif key.keyname == b'^O':
|
||||
response = screen.status.prompt(
|
||||
screen, 'enter filename', default=screen.file.filename,
|
||||
)
|
||||
if not response:
|
||||
screen.status.update('cancelled')
|
||||
else:
|
||||
screen.file.filename = response
|
||||
screen.file.save(screen, screen.status)
|
||||
elif key.keyname == b'^X':
|
||||
return EditResult.EXIT
|
||||
elif key.keyname == b'kLFT3':
|
||||
return EditResult.PREV
|
||||
elif key.keyname == b'kRIT3':
|
||||
return EditResult.NEXT
|
||||
elif key.keyname in DISPATCH:
|
||||
fn_res = DISPATCH[key.keyname](screen)
|
||||
if isinstance(fn_res, EditResult):
|
||||
return fn_res
|
||||
elif key.keyname == b'^Z':
|
||||
curses.endwin()
|
||||
os.kill(os.getpid(), signal.SIGSTOP)
|
||||
|
||||
@@ -139,4 +139,7 @@ def and_exit(h):
|
||||
yield
|
||||
# only try and exit in non-exceptional cases
|
||||
h.press('^X')
|
||||
# dismiss the save prompt
|
||||
if ' *' in h.get_screen_line(0):
|
||||
h.press('n')
|
||||
h.await_exit()
|
||||
|
||||
@@ -44,6 +44,7 @@ def test_cut_uncut_multiple_file_buffers(tmpdir):
|
||||
h.press('^K')
|
||||
h.await_text_missing('hello')
|
||||
h.press('^X')
|
||||
h.press('n')
|
||||
h.await_text_missing('world')
|
||||
h.press('^U')
|
||||
h.await_text('hello\ngood\nbye\n')
|
||||
|
||||
@@ -133,3 +133,38 @@ def test_save_via_ctrl_o_cancelled(tmpdir, key):
|
||||
h.await_text('enter filename:')
|
||||
h.press(key)
|
||||
h.await_text('cancelled')
|
||||
|
||||
|
||||
def test_save_on_exit_cancel_yn():
|
||||
with run() as h, and_exit(h):
|
||||
h.press('hello')
|
||||
h.await_text('hello')
|
||||
h.press('^X')
|
||||
h.await_text('file is modified - save [y(es), n(o)]?')
|
||||
h.press('^C')
|
||||
h.await_text('cancelled')
|
||||
|
||||
|
||||
def test_save_on_exit_cancel_filename():
|
||||
with run() as h, and_exit(h):
|
||||
h.press('hello')
|
||||
h.await_text('hello')
|
||||
h.press('^X')
|
||||
h.await_text('file is modified - save [y(es), n(o)]?')
|
||||
h.press('y')
|
||||
h.await_text('enter filename:')
|
||||
h.press('^C')
|
||||
h.await_text('cancelled')
|
||||
|
||||
|
||||
def test_save_on_exit_save(tmpdir):
|
||||
f = tmpdir.join('f')
|
||||
with run(str(f)) as h:
|
||||
h.press('hello')
|
||||
h.await_text('hello')
|
||||
h.press('^X')
|
||||
h.await_text('file is modified - save [y(es), n(o)]?')
|
||||
h.press('y')
|
||||
h.await_text(f'enter filename: {f}')
|
||||
h.press('Enter')
|
||||
h.await_exit()
|
||||
|
||||
Reference in New Issue
Block a user