OILS / core / process_test.py View on Github | oilshell.org

259 lines, 163 significant
1#!/usr/bin/env python2
2"""process_test.py: Tests for process.py."""
3
4import os
5import unittest
6
7from _devbuild.gen.id_kind_asdl import Id
8from _devbuild.gen.runtime_asdl import (RedirValue, redirect_arg, cmd_value,
9 trace)
10from _devbuild.gen.syntax_asdl import loc, redir_loc
11from asdl import runtime
12from builtin import read_osh
13from builtin import trap_osh
14from core import dev
15from core import process # module under test
16from core import pyos
17from core import test_lib
18from core import ui
19from core import util
20from mycpp.mylib import log
21from core import state
22from mycpp import mylib
23
24Process = process.Process
25ExternalThunk = process.ExternalThunk
26
27
28def Banner(msg):
29 print('-' * 60)
30 print(msg)
31
32
33def _CommandNode(code_str, arena):
34 c_parser = test_lib.InitCommandParser(code_str, arena=arena)
35 return c_parser.ParseLogicalLine()
36
37
38class FakeJobControl(object):
39 def __init__(self, enabled):
40 self.enabled = enabled
41
42 def Enabled(self):
43 return self.enabled
44
45
46class ProcessTest(unittest.TestCase):
47
48 def setUp(self):
49 self.arena = test_lib.MakeArena('process_test.py')
50
51 mem = state.Mem('', [], self.arena, [])
52 parse_opts, exec_opts, mutable_opts = state.MakeOpts(mem, None)
53 mem.exec_opts = exec_opts
54
55 state.InitMem(mem, {}, '0.1')
56
57 self.job_control = process.JobControl()
58 self.job_list = process.JobList()
59
60 signal_safe = pyos.InitSignalSafe()
61 self.trap_state = trap_osh.TrapState(signal_safe)
62
63 self.tracer = dev.Tracer(None, exec_opts, mutable_opts, mem,
64 mylib.Stderr())
65 self.waiter = process.Waiter(self.job_list, exec_opts, self.trap_state,
66 self.tracer)
67 errfmt = ui.ErrorFormatter()
68 self.fd_state = process.FdState(errfmt, self.job_control,
69 self.job_list, None, self.tracer, None)
70 self.ext_prog = process.ExternalProgram('', self.fd_state, errfmt,
71 util.NullDebugFile())
72
73 def _ExtProc(self, argv):
74 arg_vec = cmd_value.Argv(argv, [loc.Missing] * len(argv), None, None,
75 None)
76 argv0_path = None
77 for path_entry in ['/bin', '/usr/bin']:
78 full_path = os.path.join(path_entry, argv[0])
79 if os.path.exists(full_path):
80 argv0_path = full_path
81 break
82 if not argv0_path:
83 argv0_path = argv[0] # fallback that tests failure case
84 thunk = ExternalThunk(self.ext_prog, argv0_path, arg_vec, {})
85 return Process(thunk, self.job_control, self.job_list, self.tracer)
86
87 def testStdinRedirect(self):
88 PATH = '_tmp/one-two.txt'
89 # Write two lines
90 with open(PATH, 'w') as f:
91 f.write('one\ntwo\n')
92
93 # Should get the first line twice, because Pop() closes it!
94
95 r = RedirValue(Id.Redir_Less, runtime.NO_SPID, redir_loc.Fd(0),
96 redirect_arg.Path(PATH))
97
98 class CommandEvaluator(object):
99
100 def RunPendingTraps(self):
101 pass
102
103 cmd_ev = CommandEvaluator()
104
105 err_out = []
106 self.fd_state.Push([r], err_out)
107 line1, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
108 self.fd_state.Pop(err_out)
109
110 self.fd_state.Push([r], err_out)
111 line2, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
112 self.fd_state.Pop(err_out)
113
114 # sys.stdin.readline() would erroneously return 'two' because of buffering.
115 self.assertEqual('one', line1)
116 self.assertEqual('one', line2)
117
118 def testProcess(self):
119 # 3 fds. Does Python open it? Shell seems to have it too. Maybe it
120 # inherits from the shell.
121 print('FDS BEFORE', os.listdir('/dev/fd'))
122
123 Banner('date')
124 argv = ['date']
125 p = self._ExtProc(argv)
126 why = trace.External(argv)
127 status = p.RunProcess(self.waiter, why)
128 log('date returned %d', status)
129 self.assertEqual(0, status)
130
131 Banner('does-not-exist')
132 p = self._ExtProc(['does-not-exist'])
133 print(p.RunProcess(self.waiter, why))
134
135 # 12 file descriptors open!
136 print('FDS AFTER', os.listdir('/dev/fd'))
137
138 def testPipeline(self):
139 node = _CommandNode('uniq -c', self.arena)
140 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
141 ext_prog=self.ext_prog)
142 print('BEFORE', os.listdir('/dev/fd'))
143
144 p = process.Pipeline(False, self.job_control, self.job_list, self.tracer)
145 p.Add(self._ExtProc(['ls']))
146 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '2']))
147 p.Add(self._ExtProc(['sort']))
148
149 p.AddLast((cmd_ev, node))
150
151 p.StartPipeline(self.waiter)
152 pipe_status = p.RunLastPart(self.waiter, self.fd_state)
153 log('pipe_status: %s', pipe_status)
154
155 print('AFTER', os.listdir('/dev/fd'))
156
157 def testPipeline2(self):
158 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
159 ext_prog=self.ext_prog)
160
161 Banner('ls | cut -d . -f 1 | head')
162 p = process.Pipeline(False, self.job_control, self.job_list, self.tracer)
163 p.Add(self._ExtProc(['ls']))
164 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '1']))
165
166 node = _CommandNode('head', self.arena)
167 p.AddLast((cmd_ev, node))
168
169 p.StartPipeline(self.waiter)
170 print(p.RunLastPart(self.waiter, self.fd_state))
171
172 # Simulating subshell for each command
173 node1 = _CommandNode('ls', self.arena)
174 node2 = _CommandNode('head', self.arena)
175 node3 = _CommandNode('sort --reverse', self.arena)
176
177 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state)
178 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state)
179 thunk3 = process.SubProgramThunk(cmd_ev, node3, self.trap_state)
180
181 p = process.Pipeline(False, self.job_control, self.job_list, self.tracer)
182 p.Add(Process(thunk1, self.job_control, self.job_list, self.tracer))
183 p.Add(Process(thunk2, self.job_control, self.job_list, self.tracer))
184 p.Add(Process(thunk3, self.job_control, self.job_list, self.tracer))
185
186 last_thunk = (cmd_ev, _CommandNode('cat', self.arena))
187 p.AddLast(last_thunk)
188
189 p.StartPipeline(self.waiter)
190 print(p.RunLastPart(self.waiter, self.fd_state))
191
192 # TODO: Combine pipelines for other things:
193
194 # echo foo 1>&2 | tee stdout.txt
195 #
196 # foo=$(ls | head)
197 #
198 # foo=$(<<EOF ls | head)
199 # stdin
200 # EOF
201 #
202 # ls | head &
203
204 # Or technically we could fork the whole interpreter for foo|bar|baz and
205 # capture stdout of that interpreter.
206
207 def makeTestPipeline(self, jc):
208 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
209 ext_prog=self.ext_prog)
210
211 pi = process.Pipeline(False, jc, self.job_list, self.tracer)
212
213 node1 = _CommandNode('/bin/echo testpipeline', self.arena)
214 node2 = _CommandNode('cat', self.arena)
215
216 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state)
217 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state)
218
219 pi.Add(Process(thunk1, jc, self.job_list, self.tracer))
220 pi.Add(Process(thunk2, jc, self.job_list, self.tracer))
221
222 return pi
223
224 def testPipelinePgidField(self):
225 jc = FakeJobControl(False)
226
227 pi = self.makeTestPipeline(jc)
228 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
229
230 pi.StartPipeline(self.waiter)
231 # No pgid
232 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
233
234 jc = FakeJobControl(True)
235
236 pi = self.makeTestPipeline(jc)
237 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
238
239 pi.StartPipeline(self.waiter)
240 # first process is the process group leader
241 self.assertEqual(pi.pids[0], pi.ProcessGroupId())
242
243 def testOpen(self):
244 # Disabled because mycpp translation can't handle it. We do this at a
245 # higher layer.
246 return
247
248 # This function used to raise BOTH OSError and IOError because Python 2 is
249 # inconsistent.
250 # We follow Python 3 in preferring OSError.
251 # https://stackoverflow.com/questions/29347790/difference-between-ioerror-and-oserror
252 self.assertRaises(OSError, self.fd_state.Open, '_nonexistent_')
253 self.assertRaises(OSError, self.fd_state.Open, 'metrics/')
254
255
256if __name__ == '__main__':
257 unittest.main()
258
259# vim: sw=4