From a7694985683a36377841bb365c3a49551fe88a8d Mon Sep 17 00:00:00 2001 From: Takeshi Kanemoto Date: Mon, 14 Apr 2014 17:36:57 +0900 Subject: [PATCH] Add --jobs option to forall subcommand Enable '--jobs' ('-j') option in the forall subcommand. For -jn where n > 1, the '-p' option can no longer guarantee the continuity of console output between the project header and the output from the worker process. SIG_INT is sent to all worker processes upon keyboard interrupt (Ctrl+C). Bug: Issue 105 Change-Id: If09afa2ed639d481ede64f28b641dc80d0b89a5c --- subcmds/forall.py | 290 ++++++++++++++++++++++++++++------------------ 1 file changed, 177 insertions(+), 113 deletions(-) diff --git a/subcmds/forall.py b/subcmds/forall.py index 03ebcb21..7771ec16 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py @@ -14,7 +14,9 @@ # limitations under the License. from __future__ import print_function +import errno import fcntl +import multiprocessing import re import os import select @@ -31,6 +33,7 @@ _CAN_COLOR = [ 'log', ] + class ForallColoring(Coloring): def __init__(self, config): Coloring.__init__(self, config, 'forall') @@ -132,9 +135,31 @@ without iterating through the remaining projects. g.add_option('-v', '--verbose', dest='verbose', action='store_true', help='Show command error messages') + g.add_option('-j', '--jobs', + dest='jobs', action='store', type='int', default=1, + help='number of commands to execute simultaneously') def WantPager(self, opt): - return opt.project_header + return opt.project_header and opt.jobs == 1 + + def _SerializeProject(self, project): + """ Serialize a project._GitGetByExec instance. + + project._GitGetByExec is not pickle-able. Instead of trying to pass it + around between processes, make a dict ourselves containing only the + attributes that we need. + + """ + return { + 'name': project.name, + 'relpath': project.relpath, + 'remote_name': project.remote.name, + 'lrev': project.GetRevisionId(), + 'rrev': project.revisionExpr, + 'annotations': dict((a.name, a.value) for a in project.annotations), + 'gitdir': project.gitdir, + 'worktree': project.worktree, + } def Execute(self, opt, args): if not opt.command: @@ -173,11 +198,7 @@ without iterating through the remaining projects. # pylint: enable=W0631 mirror = self.manifest.IsMirror - out = ForallColoring(self.manifest.manifestProject.config) - out.redirect(sys.stdout) - rc = 0 - first = True if not opt.regex: projects = self.GetProjects(args) @@ -186,113 +207,156 @@ without iterating through the remaining projects. os.environ['REPO_COUNT'] = str(len(projects)) - for (cnt, project) in enumerate(projects): - env = os.environ.copy() - def setenv(name, val): - if val is None: - val = '' - env[name] = val.encode() - - setenv('REPO_PROJECT', project.name) - setenv('REPO_PATH', project.relpath) - setenv('REPO_REMOTE', project.remote.name) - setenv('REPO_LREV', project.GetRevisionId()) - setenv('REPO_RREV', project.revisionExpr) - setenv('REPO_I', str(cnt + 1)) - for a in project.annotations: - setenv("REPO__%s" % (a.name), a.value) - - if mirror: - setenv('GIT_DIR', project.gitdir) - cwd = project.gitdir - else: - cwd = project.worktree - - if not os.path.exists(cwd): - if (opt.project_header and opt.verbose) \ - or not opt.project_header: - print('skipping %s/' % project.relpath, file=sys.stderr) - continue - - if opt.project_header: - stdin = subprocess.PIPE - stdout = subprocess.PIPE - stderr = subprocess.PIPE - else: - stdin = None - stdout = None - stderr = None - - p = subprocess.Popen(cmd, - cwd = cwd, - shell = shell, - env = env, - stdin = stdin, - stdout = stdout, - stderr = stderr) - - if opt.project_header: - class sfd(object): - def __init__(self, fd, dest): - self.fd = fd - self.dest = dest - def fileno(self): - return self.fd.fileno() - - empty = True - errbuf = '' - - p.stdin.close() - s_in = [sfd(p.stdout, sys.stdout), - sfd(p.stderr, sys.stderr)] - - for s in s_in: - flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) - fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - while s_in: - in_ready, _out_ready, _err_ready = select.select(s_in, [], []) - for s in in_ready: - buf = s.fd.read(4096) - if not buf: - s.fd.close() - s_in.remove(s) - continue - - if not opt.verbose: - if s.fd != p.stdout: - errbuf += buf - continue - - if empty: - if first: - first = False - else: - out.nl() - - if mirror: - project_header_path = project.name - else: - project_header_path = project.relpath - out.project('project %s/', project_header_path) - out.nl() - out.flush() - if errbuf: - sys.stderr.write(errbuf) - sys.stderr.flush() - errbuf = '' - empty = False - - s.dest.write(buf) - s.dest.flush() - - r = p.wait() - if r != 0: - if r != rc: - rc = r - if opt.abort_on_errors: - print("error: %s: Aborting due to previous error" % project.relpath, - file=sys.stderr) - sys.exit(r) + pool = multiprocessing.Pool(opt.jobs) + try: + config = self.manifest.manifestProject.config + results_it = pool.imap( + DoWorkWrapper, + [[mirror, opt, cmd, shell, cnt, config, self._SerializeProject(p)] + for cnt, p in enumerate(projects)] + ) + pool.close() + for r in results_it: + rc = rc or r + if r != 0 and opt.abort_on_errors: + raise Exception('Aborting due to previous error') + except (KeyboardInterrupt, WorkerKeyboardInterrupt): + # Catch KeyboardInterrupt raised inside and outside of workers + print('Interrupted - terminating the pool') + pool.terminate() + rc = rc or errno.EINTR + except Exception as e: + # Catch any other exceptions raised + print('Got an error, terminating the pool: %r' % e, + file=sys.stderr) + pool.terminate() + rc = rc or getattr(e, 'errno', 1) + finally: + pool.join() if rc != 0: sys.exit(rc) + + +class WorkerKeyboardInterrupt(Exception): + """ Keyboard interrupt exception for worker processes. """ + pass + + +def DoWorkWrapper(args): + """ A wrapper around the DoWork() method. + + Catch the KeyboardInterrupt exceptions here and re-raise them as a different, + ``Exception``-based exception to stop it flooding the console with stacktraces + and making the parent hang indefinitely. + + """ + project = args.pop() + try: + return DoWork(project, *args) + except KeyboardInterrupt: + print('%s: Worker interrupted' % project['name']) + raise WorkerKeyboardInterrupt() + + +def DoWork(project, mirror, opt, cmd, shell, cnt, config): + env = os.environ.copy() + def setenv(name, val): + if val is None: + val = '' + env[name] = val.encode() + + setenv('REPO_PROJECT', project['name']) + setenv('REPO_PATH', project['relpath']) + setenv('REPO_REMOTE', project['remote_name']) + setenv('REPO_LREV', project['lrev']) + setenv('REPO_RREV', project['rrev']) + setenv('REPO_I', str(cnt + 1)) + for name in project['annotations']: + setenv("REPO__%s" % (name), project['annotations'][name]) + + if mirror: + setenv('GIT_DIR', project['gitdir']) + cwd = project['gitdir'] + else: + cwd = project['worktree'] + + if not os.path.exists(cwd): + if (opt.project_header and opt.verbose) \ + or not opt.project_header: + print('skipping %s/' % project['relpath'], file=sys.stderr) + return + + if opt.project_header: + stdin = subprocess.PIPE + stdout = subprocess.PIPE + stderr = subprocess.PIPE + else: + stdin = None + stdout = None + stderr = None + + p = subprocess.Popen(cmd, + cwd=cwd, + shell=shell, + env=env, + stdin=stdin, + stdout=stdout, + stderr=stderr) + + if opt.project_header: + out = ForallColoring(config) + out.redirect(sys.stdout) + class sfd(object): + def __init__(self, fd, dest): + self.fd = fd + self.dest = dest + def fileno(self): + return self.fd.fileno() + + empty = True + errbuf = '' + + p.stdin.close() + s_in = [sfd(p.stdout, sys.stdout), + sfd(p.stderr, sys.stderr)] + + for s in s_in: + flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) + fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + while s_in: + in_ready, _out_ready, _err_ready = select.select(s_in, [], []) + for s in in_ready: + buf = s.fd.read(4096) + if not buf: + s.fd.close() + s_in.remove(s) + continue + + if not opt.verbose: + if s.fd != p.stdout: + errbuf += buf + continue + + if empty and out: + if not cnt == 0: + out.nl() + + if mirror: + project_header_path = project['name'] + else: + project_header_path = project['relpath'] + out.project('project %s/', project_header_path) + out.nl() + out.flush() + if errbuf: + sys.stderr.write(errbuf) + sys.stderr.flush() + errbuf = '' + empty = False + + s.dest.write(buf) + s.dest.flush() + + r = p.wait() + return r