to us as soon as it is produced, rather than waiting for the end of a
line.
- Use CommunicateFilter() to handle output from the subprocess.
+ Use communicate_filter() to handle output from the subprocess.
"""
if kwargs:
raise ValueError("Unit tests do not test extra args - please add tests")
- def ConvertData(self, data):
+ def convert_data(self, data):
"""Convert stdout/stderr data to the correct format for output
Args:
return b''
return data
- def CommunicateFilter(self, output):
+ def communicate_filter(self, output):
"""Interact with process: Read data from stdout and stderr.
This method runs until end-of-file is reached, then waits for the
The output function is sent all output from the subprocess and must be
defined like this:
- def Output([self,] stream, data)
+ def output([self,] stream, data)
Args:
stream: the stream the output was received on, which will be
sys.stdout or sys.stderr.
self.terminate()
# All data exchanged. Translate lists into strings.
- stdout = self.ConvertData(stdout)
- stderr = self.ConvertData(stderr)
- combined = self.ConvertData(combined)
+ stdout = self.convert_data(stdout)
+ stderr = self.convert_data(stderr)
+ combined = self.convert_data(combined)
# Translate newlines, if requested. We cannot let the file
# object do the translation: It is based on stdio, which is
self.stdin_read_pipe = pipe[0]
self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
- def Output(self, stream, data):
+ def output(self, stream, data):
"""Output handler for Popen. Stores the data for later comparison"""
if stream == sys.stdout:
self.stdout_data += data
self._stdin_write_pipe.write(self._input_to_send + '\r\n')
self._stdin_write_pipe.flush()
- def _BasicCheck(self, plist, oper):
+ def _basic_check(self, plist, oper):
"""Basic checks that the output looks sane."""
self.assertEqual(plist[0], oper.stdout_data)
self.assertEqual(plist[1], oper.stderr_data)
def test_simple(self):
"""Simple redirection: Get process list"""
oper = TestSubprocess.MyOperation()
- plist = Popen(['ps']).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen(['ps']).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
def test_stderr(self):
"""Check stdout and stderr"""
oper = TestSubprocess.MyOperation()
cmd = 'echo fred >/dev/stderr && false || echo bad'
- plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen([cmd], shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(plist [0], 'bad\r\n')
self.assertEqual(plist [1], 'fred\r\n')
oper = TestSubprocess.MyOperation()
cmd = 'echo test >/dev/stderr'
self.assertRaises(OSError, Popen, [cmd], shell=False)
- plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen([cmd], shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(len(plist [0]), 0)
self.assertEqual(plist [1], 'test\r\n')
"""Check with and without shell works using list arguments"""
oper = TestSubprocess.MyOperation()
cmd = ['echo', 'test', '>/dev/stderr']
- plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen(cmd, shell=False).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
self.assertEqual(len(plist [1]), 0)
# this should be interpreted as 'echo' with the other args dropped
cmd = ['echo', 'test', '>/dev/stderr']
- plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen(cmd, shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(plist [0], '\r\n')
def test_cwd(self):
"""Check we can change directory"""
for shell in (False, True):
oper = TestSubprocess.MyOperation()
- plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter(
+ oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(plist [0], '/tmp\r\n')
def test_env(self):
if add:
env ['FRED'] = 'fred'
cmd = 'echo $FRED'
- plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
def test_extra_args(self):
prompt = 'What is your name?: '
cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
plist = Popen([cmd], stdin=oper.stdin_read_pipe,
- shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(len(plist [1]), 0)
self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
both_cmds = ''
for fd in (1, 2):
both_cmds += cmd % (fd, fd, fd, fd, fd)
- plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ plist = Popen(both_cmds, shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(plist [0], 'terminal 1\r\n')
self.assertEqual(plist [1], 'terminal 2\r\n')
# Now try with PIPE and make sure it is not a terminal
oper = TestSubprocess.MyOperation()
plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
+ shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
self.assertEqual(plist [0], 'not 1\n')
self.assertEqual(plist [1], 'not 2\n')