Add --jobs option to forall subcommand
Enable '--jobs' ('-j') option in the forall subcommand. For -jn
where n > 1, the '-p' option can no longer guarantee the
continuity of console output between the project header and the
output from the worker process.
SIG_INT is sent to all worker processes upon keyboard interrupt
(Ctrl+C).
Bug: Issue 105
Change-Id: If09afa2ed639d481ede64f28b641dc80d0b89a5c
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 03ebcb2..7771ec1 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -14,7 +14,9 @@
# limitations under the License.
from __future__ import print_function
+import errno
import fcntl
+import multiprocessing
import re
import os
import select
@@ -31,6 +33,7 @@
'log',
]
+
class ForallColoring(Coloring):
def __init__(self, config):
Coloring.__init__(self, config, 'forall')
@@ -132,9 +135,31 @@
g.add_option('-v', '--verbose',
dest='verbose', action='store_true',
help='Show command error messages')
+ g.add_option('-j', '--jobs',
+ dest='jobs', action='store', type='int', default=1,
+ help='number of commands to execute simultaneously')
def WantPager(self, opt):
- return opt.project_header
+ return opt.project_header and opt.jobs == 1
+
+ def _SerializeProject(self, project):
+ """ Serialize a project._GitGetByExec instance.
+
+ project._GitGetByExec is not pickle-able. Instead of trying to pass it
+ around between processes, make a dict ourselves containing only the
+ attributes that we need.
+
+ """
+ return {
+ 'name': project.name,
+ 'relpath': project.relpath,
+ 'remote_name': project.remote.name,
+ 'lrev': project.GetRevisionId(),
+ 'rrev': project.revisionExpr,
+ 'annotations': dict((a.name, a.value) for a in project.annotations),
+ 'gitdir': project.gitdir,
+ 'worktree': project.worktree,
+ }
def Execute(self, opt, args):
if not opt.command:
@@ -173,11 +198,7 @@
# pylint: enable=W0631
mirror = self.manifest.IsMirror
- out = ForallColoring(self.manifest.manifestProject.config)
- out.redirect(sys.stdout)
-
rc = 0
- first = True
if not opt.regex:
projects = self.GetProjects(args)
@@ -186,113 +207,156 @@
os.environ['REPO_COUNT'] = str(len(projects))
- for (cnt, project) in enumerate(projects):
- env = os.environ.copy()
- def setenv(name, val):
- if val is None:
- val = ''
- env[name] = val.encode()
-
- setenv('REPO_PROJECT', project.name)
- setenv('REPO_PATH', project.relpath)
- setenv('REPO_REMOTE', project.remote.name)
- setenv('REPO_LREV', project.GetRevisionId())
- setenv('REPO_RREV', project.revisionExpr)
- setenv('REPO_I', str(cnt + 1))
- for a in project.annotations:
- setenv("REPO__%s" % (a.name), a.value)
-
- if mirror:
- setenv('GIT_DIR', project.gitdir)
- cwd = project.gitdir
- else:
- cwd = project.worktree
-
- if not os.path.exists(cwd):
- if (opt.project_header and opt.verbose) \
- or not opt.project_header:
- print('skipping %s/' % project.relpath, file=sys.stderr)
- continue
-
- if opt.project_header:
- stdin = subprocess.PIPE
- stdout = subprocess.PIPE
- stderr = subprocess.PIPE
- else:
- stdin = None
- stdout = None
- stderr = None
-
- p = subprocess.Popen(cmd,
- cwd = cwd,
- shell = shell,
- env = env,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr)
-
- if opt.project_header:
- class sfd(object):
- def __init__(self, fd, dest):
- self.fd = fd
- self.dest = dest
- def fileno(self):
- return self.fd.fileno()
-
- empty = True
- errbuf = ''
-
- p.stdin.close()
- s_in = [sfd(p.stdout, sys.stdout),
- sfd(p.stderr, sys.stderr)]
-
- for s in s_in:
- flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
- fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
- while s_in:
- in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
- for s in in_ready:
- buf = s.fd.read(4096)
- if not buf:
- s.fd.close()
- s_in.remove(s)
- continue
-
- if not opt.verbose:
- if s.fd != p.stdout:
- errbuf += buf
- continue
-
- if empty:
- if first:
- first = False
- else:
- out.nl()
-
- if mirror:
- project_header_path = project.name
- else:
- project_header_path = project.relpath
- out.project('project %s/', project_header_path)
- out.nl()
- out.flush()
- if errbuf:
- sys.stderr.write(errbuf)
- sys.stderr.flush()
- errbuf = ''
- empty = False
-
- s.dest.write(buf)
- s.dest.flush()
-
- r = p.wait()
- if r != 0:
- if r != rc:
- rc = r
- if opt.abort_on_errors:
- print("error: %s: Aborting due to previous error" % project.relpath,
- file=sys.stderr)
- sys.exit(r)
+ pool = multiprocessing.Pool(opt.jobs)
+ try:
+ config = self.manifest.manifestProject.config
+ results_it = pool.imap(
+ DoWorkWrapper,
+ [[mirror, opt, cmd, shell, cnt, config, self._SerializeProject(p)]
+ for cnt, p in enumerate(projects)]
+ )
+ pool.close()
+ for r in results_it:
+ rc = rc or r
+ if r != 0 and opt.abort_on_errors:
+ raise Exception('Aborting due to previous error')
+ except (KeyboardInterrupt, WorkerKeyboardInterrupt):
+ # Catch KeyboardInterrupt raised inside and outside of workers
+ print('Interrupted - terminating the pool')
+ pool.terminate()
+ rc = rc or errno.EINTR
+ except Exception as e:
+ # Catch any other exceptions raised
+ print('Got an error, terminating the pool: %r' % e,
+ file=sys.stderr)
+ pool.terminate()
+ rc = rc or getattr(e, 'errno', 1)
+ finally:
+ pool.join()
if rc != 0:
sys.exit(rc)
+
+
+class WorkerKeyboardInterrupt(Exception):
+ """ Keyboard interrupt exception for worker processes. """
+ pass
+
+
+def DoWorkWrapper(args):
+ """ A wrapper around the DoWork() method.
+
+ Catch the KeyboardInterrupt exceptions here and re-raise them as a different,
+ ``Exception``-based exception to stop it flooding the console with stacktraces
+ and making the parent hang indefinitely.
+
+ """
+ project = args.pop()
+ try:
+ return DoWork(project, *args)
+ except KeyboardInterrupt:
+ print('%s: Worker interrupted' % project['name'])
+ raise WorkerKeyboardInterrupt()
+
+
+def DoWork(project, mirror, opt, cmd, shell, cnt, config):
+ env = os.environ.copy()
+ def setenv(name, val):
+ if val is None:
+ val = ''
+ env[name] = val.encode()
+
+ setenv('REPO_PROJECT', project['name'])
+ setenv('REPO_PATH', project['relpath'])
+ setenv('REPO_REMOTE', project['remote_name'])
+ setenv('REPO_LREV', project['lrev'])
+ setenv('REPO_RREV', project['rrev'])
+ setenv('REPO_I', str(cnt + 1))
+ for name in project['annotations']:
+ setenv("REPO__%s" % (name), project['annotations'][name])
+
+ if mirror:
+ setenv('GIT_DIR', project['gitdir'])
+ cwd = project['gitdir']
+ else:
+ cwd = project['worktree']
+
+ if not os.path.exists(cwd):
+ if (opt.project_header and opt.verbose) \
+ or not opt.project_header:
+ print('skipping %s/' % project['relpath'], file=sys.stderr)
+ return
+
+ if opt.project_header:
+ stdin = subprocess.PIPE
+ stdout = subprocess.PIPE
+ stderr = subprocess.PIPE
+ else:
+ stdin = None
+ stdout = None
+ stderr = None
+
+ p = subprocess.Popen(cmd,
+ cwd=cwd,
+ shell=shell,
+ env=env,
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr)
+
+ if opt.project_header:
+ out = ForallColoring(config)
+ out.redirect(sys.stdout)
+ class sfd(object):
+ def __init__(self, fd, dest):
+ self.fd = fd
+ self.dest = dest
+ def fileno(self):
+ return self.fd.fileno()
+
+ empty = True
+ errbuf = ''
+
+ p.stdin.close()
+ s_in = [sfd(p.stdout, sys.stdout),
+ sfd(p.stderr, sys.stderr)]
+
+ for s in s_in:
+ flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
+ fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ while s_in:
+ in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
+ for s in in_ready:
+ buf = s.fd.read(4096)
+ if not buf:
+ s.fd.close()
+ s_in.remove(s)
+ continue
+
+ if not opt.verbose:
+ if s.fd != p.stdout:
+ errbuf += buf
+ continue
+
+ if empty and out:
+ if not cnt == 0:
+ out.nl()
+
+ if mirror:
+ project_header_path = project['name']
+ else:
+ project_header_path = project['relpath']
+ out.project('project %s/', project_header_path)
+ out.nl()
+ out.flush()
+ if errbuf:
+ sys.stderr.write(errbuf)
+ sys.stderr.flush()
+ errbuf = ''
+ empty = False
+
+ s.dest.write(buf)
+ s.dest.flush()
+
+ r = p.wait()
+ return r