-
Notifications
You must be signed in to change notification settings - Fork 128
Pyscript fix #653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pyscript fix #653
Changes from all commits
833089f
1241734
f93d51b
2beb3de
373c404
cd09009
a4ff5ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -302,6 +302,12 @@ class EmptyStatement(Exception): | |
| # Contains data about a disabled command which is used to restore its original functions when the command is enabled | ||
| DisabledCommand = namedtuple('DisabledCommand', ['command_function', 'help_function']) | ||
|
|
||
| # Used to restore state after redirection ends | ||
| # redirecting and piping are used to know what needs to be restored | ||
| RedirectionSavedState = utils.namedtuple_with_defaults('RedirectionSavedState', | ||
| ['redirecting', 'self_stdout', 'sys_stdout', | ||
| 'piping', 'pipe_proc']) | ||
|
|
||
|
|
||
| class Cmd(cmd.Cmd): | ||
| """An easy but powerful framework for writing line-oriented command interpreters. | ||
|
|
@@ -412,10 +418,6 @@ def __init__(self, completekey: str = 'tab', stdin=None, stdout=None, persistent | |
| # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience. | ||
| self._last_result = None | ||
|
|
||
| # Used to save state during a redirection | ||
| self.kept_state = None | ||
| self.kept_sys = None | ||
|
|
||
| # Codes used for exit conditions | ||
| self._STOP_AND_EXIT = True # cmd convention | ||
|
|
||
|
|
@@ -1717,9 +1719,17 @@ def onecmd_plus_hooks(self, line: str) -> bool: | |
| # we need to run the finalization hooks | ||
| raise EmptyStatement | ||
|
|
||
| # Keep track of whether or not we were already redirecting before this command | ||
| already_redirecting = self.redirecting | ||
|
|
||
| # Handle any redirection for this command | ||
| saved_state = self._redirect_output(statement) | ||
|
|
||
| # See if we need to update self.redirecting | ||
| if not already_redirecting: | ||
| self.redirecting = saved_state.redirecting or saved_state.piping | ||
|
|
||
| try: | ||
| if self.allow_redirection: | ||
| self._redirect_output(statement) | ||
| timestart = datetime.datetime.now() | ||
| if self._in_py: | ||
| self._last_result = None | ||
|
|
@@ -1747,8 +1757,10 @@ def onecmd_plus_hooks(self, line: str) -> bool: | |
| if self.timing: | ||
| self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) | ||
| finally: | ||
| if self.allow_redirection and self.redirecting: | ||
| self._restore_output(statement) | ||
| self._restore_output(statement, saved_state) | ||
| if not already_redirecting: | ||
| self.redirecting = False | ||
|
|
||
| except EmptyStatement: | ||
| # don't do anything, but do allow command finalization hooks to run | ||
| pass | ||
|
|
@@ -1848,29 +1860,9 @@ def _complete_statement(self, line: str) -> Statement: | |
| # if we get here we must have: | ||
| # - a multiline command with no terminator | ||
| # - a multiline command with unclosed quotation marks | ||
| if not self.quit_on_sigint: | ||
| try: | ||
| self.at_continuation_prompt = True | ||
| newline = self.pseudo_raw_input(self.continuation_prompt) | ||
| if newline == 'eof': | ||
| # they entered either a blank line, or we hit an EOF | ||
| # for some other reason. Turn the literal 'eof' | ||
| # into a blank line, which serves as a command | ||
| # terminator | ||
| newline = '\n' | ||
| self.poutput(newline) | ||
| line = '{}\n{}'.format(statement.raw, newline) | ||
| except KeyboardInterrupt: | ||
| self.poutput('^C') | ||
| statement = self.statement_parser.parse('') | ||
| break | ||
| finally: | ||
| self.at_continuation_prompt = False | ||
| else: | ||
| try: | ||
| self.at_continuation_prompt = True | ||
| newline = self.pseudo_raw_input(self.continuation_prompt) | ||
| self.at_continuation_prompt = False | ||
|
|
||
| if newline == 'eof': | ||
| # they entered either a blank line, or we hit an EOF | ||
| # for some other reason. Turn the literal 'eof' | ||
|
|
@@ -1879,53 +1871,59 @@ def _complete_statement(self, line: str) -> Statement: | |
| newline = '\n' | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A unit test for the case inside this if would be nice |
||
| self.poutput(newline) | ||
| line = '{}\n{}'.format(statement.raw, newline) | ||
| except KeyboardInterrupt as ex: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A unit test for anything inside tis except would be great (but maybe not easy) |
||
| if self.quit_on_sigint: | ||
| raise ex | ||
| else: | ||
| self.poutput('^C') | ||
| statement = self.statement_parser.parse('') | ||
| break | ||
| finally: | ||
| self.at_continuation_prompt = False | ||
|
|
||
| if not statement.command: | ||
| raise EmptyStatement() | ||
| return statement | ||
|
|
||
| def _redirect_output(self, statement: Statement) -> None: | ||
| def _redirect_output(self, statement: Statement) -> RedirectionSavedState: | ||
| """Handles output redirection for >, >>, and |. | ||
|
|
||
| :param statement: a parsed statement from the user | ||
| :return: A RedirectionSavedState object | ||
| """ | ||
| import io | ||
| import subprocess | ||
|
|
||
| if statement.pipe_to: | ||
| self.kept_state = Statekeeper(self, ('stdout',)) | ||
| ret_val = RedirectionSavedState(redirecting=False, piping=False) | ||
|
|
||
| if not self.allow_redirection: | ||
| return ret_val | ||
|
|
||
| if statement.pipe_to: | ||
| # Create a pipe with read and write sides | ||
| read_fd, write_fd = os.pipe() | ||
|
|
||
| # Open each side of the pipe and set stdout accordingly | ||
| # noinspection PyTypeChecker | ||
| self.stdout = io.open(write_fd, 'w') | ||
| self.redirecting = True | ||
| # noinspection PyTypeChecker | ||
| subproc_stdin = io.open(read_fd, 'r') | ||
| pipe_read = io.open(read_fd, 'r') | ||
| pipe_write = io.open(write_fd, 'w') | ||
|
|
||
| # We want Popen to raise an exception if it fails to open the process. Thus we don't set shell to True. | ||
| try: | ||
| self.pipe_proc = subprocess.Popen(statement.pipe_to, stdin=subproc_stdin) | ||
| pipe_proc = subprocess.Popen(statement.pipe_to, stdin=pipe_read, stdout=self.stdout) | ||
| ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, | ||
| piping=True, pipe_proc=self.pipe_proc) | ||
| self.stdout = pipe_write | ||
| self.pipe_proc = pipe_proc | ||
| except Exception as ex: | ||
| self.perror('Not piping because - {}'.format(ex), traceback_war=False) | ||
|
|
||
| # Restore stdout to what it was and close the pipe | ||
| self.stdout.close() | ||
| subproc_stdin.close() | ||
| self.pipe_proc = None | ||
| self.kept_state.restore() | ||
| self.kept_state = None | ||
| self.redirecting = False | ||
| pipe_read.close() | ||
| pipe_write.close() | ||
|
|
||
| elif statement.output: | ||
| import tempfile | ||
| if (not statement.output_to) and (not self.can_clip): | ||
| raise EnvironmentError("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable") | ||
| self.kept_state = Statekeeper(self, ('stdout',)) | ||
| self.kept_sys = Statekeeper(sys, ('stdout',)) | ||
| self.redirecting = True | ||
|
|
||
| if statement.output_to: | ||
| # going to a file | ||
| mode = 'w' | ||
|
|
@@ -1934,24 +1932,30 @@ def _redirect_output(self, statement: Statement) -> None: | |
| if statement.output == constants.REDIRECTION_APPEND: | ||
| mode = 'a' | ||
| try: | ||
| sys.stdout = self.stdout = open(statement.output_to, mode) | ||
| new_stdout = open(statement.output_to, mode) | ||
| ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout) | ||
| sys.stdout = self.stdout = new_stdout | ||
| except OSError as ex: | ||
| self.perror('Not redirecting because - {}'.format(ex), traceback_war=False) | ||
| self.redirecting = False | ||
| else: | ||
| # going to a paste buffer | ||
| sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+") | ||
| new_stdout = tempfile.TemporaryFile(mode="w+") | ||
| ret_val = RedirectionSavedState(redirecting=True, self_stdout=self.stdout, sys_stdout=sys.stdout) | ||
| sys.stdout = self.stdout = new_stdout | ||
| if statement.output == constants.REDIRECTION_APPEND: | ||
| self.poutput(get_paste_buffer()) | ||
|
|
||
| def _restore_output(self, statement: Statement) -> None: | ||
| return ret_val | ||
|
|
||
| def _restore_output(self, statement: Statement, saved_state: RedirectionSavedState) -> None: | ||
| """Handles restoring state after output redirection as well as | ||
| the actual pipe operation if present. | ||
|
|
||
| :param statement: Statement object which contains the parsed input from the user | ||
| :param saved_state: contains information needed to restore state data | ||
| """ | ||
| # If we have redirected output to a file or the clipboard or piped it to a shell command, then restore state | ||
| if self.kept_state is not None: | ||
| # Check if self.stdout was redirected | ||
| if saved_state.redirecting: | ||
| # If we redirected output to the clipboard | ||
| if statement.output and not statement.output_to: | ||
| self.stdout.seek(0) | ||
|
|
@@ -1963,21 +1967,16 @@ def _restore_output(self, statement: Statement) -> None: | |
| except BrokenPipeError: | ||
| pass | ||
| finally: | ||
| # Restore self.stdout | ||
| self.kept_state.restore() | ||
| self.kept_state = None | ||
| self.stdout = saved_state.self_stdout | ||
|
|
||
| # If we were piping output to a shell command, then close the subprocess the shell command was running in | ||
| if self.pipe_proc is not None: | ||
| self.pipe_proc.communicate() | ||
| self.pipe_proc = None | ||
| # Check if sys.stdout was redirected | ||
| if saved_state.sys_stdout is not None: | ||
| sys.stdout = saved_state.sys_stdout | ||
|
|
||
| # Restore sys.stdout if need be | ||
| if self.kept_sys is not None: | ||
| self.kept_sys.restore() | ||
| self.kept_sys = None | ||
|
|
||
| self.redirecting = False | ||
| # Check if output was being piped to a process | ||
| if saved_state.piping: | ||
| self.pipe_proc.communicate() | ||
| self.pipe_proc = saved_state.pipe_proc | ||
|
|
||
| def cmd_func(self, command: str) -> Optional[Callable]: | ||
| """ | ||
|
|
@@ -2159,10 +2158,10 @@ def _cmdloop(self) -> bool: | |
| # Set GNU readline's rl_basic_quote_characters to NULL so it won't automatically add a closing quote | ||
| # We don't need to worry about setting rl_completion_suppress_quote since we never declared | ||
| # rl_completer_quote_characters. | ||
| old_basic_quotes = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value | ||
| saved_basic_quotes = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value | ||
| rl_basic_quote_characters.value = None | ||
|
|
||
| old_completer = readline.get_completer() | ||
| saved_completer = readline.get_completer() | ||
| readline.set_completer(self.complete) | ||
|
|
||
| # Break words on whitespace and quotes when tab completing | ||
|
|
@@ -2172,7 +2171,7 @@ def _cmdloop(self) -> bool: | |
| # If redirection is allowed, then break words on those characters too | ||
| completer_delims += ''.join(constants.REDIRECTION_CHARS) | ||
|
|
||
| old_delims = readline.get_completer_delims() | ||
| saved_delims = readline.get_completer_delims() | ||
| readline.set_completer_delims(completer_delims) | ||
|
|
||
| # Enable tab completion | ||
|
|
@@ -2189,27 +2188,27 @@ def _cmdloop(self) -> bool: | |
| self.poutput('{}{}'.format(self.prompt, line)) | ||
| else: | ||
| # Otherwise, read a command from stdin | ||
| if not self.quit_on_sigint: | ||
| try: | ||
| line = self.pseudo_raw_input(self.prompt) | ||
| except KeyboardInterrupt: | ||
| try: | ||
| line = self.pseudo_raw_input(self.prompt) | ||
| except KeyboardInterrupt as ex: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like how you cleaned up handling this case |
||
| if self.quit_on_sigint: | ||
| raise ex | ||
| else: | ||
| self.poutput('^C') | ||
| line = '' | ||
| else: | ||
| line = self.pseudo_raw_input(self.prompt) | ||
|
|
||
| # Run the command along with all associated pre and post hooks | ||
| stop = self.onecmd_plus_hooks(line) | ||
| finally: | ||
| if self.use_rawinput and self.completekey and rl_type != RlType.NONE: | ||
|
|
||
| # Restore what we changed in readline | ||
| readline.set_completer(old_completer) | ||
| readline.set_completer_delims(old_delims) | ||
| readline.set_completer(saved_completer) | ||
| readline.set_completer_delims(saved_delims) | ||
|
|
||
| if rl_type == RlType.GNU: | ||
| readline.set_completion_display_matches_hook(None) | ||
| rl_basic_quote_characters.value = old_basic_quotes | ||
| rl_basic_quote_characters.value = saved_basic_quotes | ||
| elif rl_type == RlType.PYREADLINE: | ||
| # noinspection PyUnresolvedReferences | ||
| readline.rl.mode._display_completions = orig_pyreadline_display | ||
|
|
@@ -3070,7 +3069,7 @@ def py_quit(): | |
| # Set up tab completion for the Python console | ||
| # rlcompleter relies on the default settings of the Python readline module | ||
| if rl_type == RlType.GNU: | ||
| old_basic_quotes = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value | ||
| saved_basic_quotes = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value | ||
| rl_basic_quote_characters.value = orig_rl_basic_quotes | ||
|
|
||
| if 'gnureadline' in sys.modules: | ||
|
|
@@ -3082,7 +3081,7 @@ def py_quit(): | |
|
|
||
| sys.modules['readline'] = sys.modules['gnureadline'] | ||
|
|
||
| old_delims = readline.get_completer_delims() | ||
| saved_delims = readline.get_completer_delims() | ||
| readline.set_completer_delims(orig_rl_delims) | ||
|
|
||
| # rlcompleter will not need cmd2's custom display function | ||
|
|
@@ -3095,15 +3094,18 @@ def py_quit(): | |
|
|
||
| # Save off the current completer and set a new one in the Python console | ||
| # Make sure it tab completes from its locals() dictionary | ||
| old_completer = readline.get_completer() | ||
| saved_completer = readline.get_completer() | ||
| interp.runcode("from rlcompleter import Completer") | ||
| interp.runcode("import readline") | ||
| interp.runcode("readline.set_completer(Completer(locals()).complete)") | ||
|
|
||
| # Set up sys module for the Python console | ||
| self._reset_py_display() | ||
| keepstate = Statekeeper(sys, ('stdin', 'stdout')) | ||
|
|
||
| saved_sys_stdout = sys.stdout | ||
| sys.stdout = self.stdout | ||
|
|
||
| saved_sys_stdin = sys.stdin | ||
| sys.stdin = self.stdin | ||
|
|
||
| cprt = 'Type "help", "copyright", "credits" or "license" for more information.' | ||
|
|
@@ -3121,7 +3123,8 @@ def py_quit(): | |
| pass | ||
|
|
||
| finally: | ||
| keepstate.restore() | ||
| sys.stdout = saved_sys_stdout | ||
| sys.stdin = saved_sys_stdin | ||
|
|
||
| # Set up readline for cmd2 | ||
| if rl_type != RlType.NONE: | ||
|
|
@@ -3139,11 +3142,11 @@ def py_quit(): | |
|
|
||
| if self.use_rawinput and self.completekey: | ||
| # Restore cmd2's tab completion settings | ||
| readline.set_completer(old_completer) | ||
| readline.set_completer_delims(old_delims) | ||
| readline.set_completer(saved_completer) | ||
| readline.set_completer_delims(saved_delims) | ||
|
|
||
| if rl_type == RlType.GNU: | ||
| rl_basic_quote_characters.value = old_basic_quotes | ||
| rl_basic_quote_characters.value = saved_basic_quotes | ||
|
|
||
| if 'gnureadline' in sys.modules: | ||
| # Restore what the readline module pointed to | ||
|
|
@@ -3963,28 +3966,3 @@ def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizati | |
| """Register a hook to be called after a command is completed, whether it completes successfully or not.""" | ||
| self._validate_cmdfinalization_callable(func) | ||
| self._cmdfinalization_hooks.append(func) | ||
|
|
||
|
|
||
| class Statekeeper(object): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice job finally getting rid of this |
||
| """Class used to save and restore state during load and py commands as well as when redirecting output or pipes.""" | ||
| def __init__(self, obj: Any, attribs: Iterable) -> None: | ||
| """Use the instance attributes as a generic key-value store to copy instance attributes from outer object. | ||
|
|
||
| :param obj: instance of cmd2.Cmd derived class (your application instance) | ||
| :param attribs: tuple of strings listing attributes of obj to save a copy of | ||
| """ | ||
| self.obj = obj | ||
| self.attribs = attribs | ||
| if self.obj: | ||
| self._save() | ||
|
|
||
| def _save(self) -> None: | ||
| """Create copies of attributes from self.obj inside this Statekeeper instance.""" | ||
| for attrib in self.attribs: | ||
| setattr(self, attrib, getattr(self.obj, attrib)) | ||
|
|
||
| def restore(self) -> None: | ||
| """Overwrite attributes in self.obj with the saved values stored in this Statekeeper instance.""" | ||
| if self.obj: | ||
| for attrib in self.attribs: | ||
| setattr(self.obj, attrib, getattr(self, attrib)) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should be declared in utils and imported from there?