Add --jobs option to forall subcommand

Enable '--jobs' ('-j') option in the forall subcommand. For -jn
where n > 1, the '-p' option can no longer guarantee the
continuity of console output between the project header and the
output from the worker process.

SIG_INT is sent to all worker processes upon keyboard interrupt
(Ctrl+C).

Bug: Issue 105
Change-Id: If09afa2ed639d481ede64f28b641dc80d0b89a5c
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 03ebcb2..7771ec1 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -14,7 +14,9 @@
 # limitations under the License.
 
 from __future__ import print_function
+import errno
 import fcntl
+import multiprocessing
 import re
 import os
 import select
@@ -31,6 +33,7 @@
   'log',
 ]
 
+
 class ForallColoring(Coloring):
   def __init__(self, config):
     Coloring.__init__(self, config, 'forall')
@@ -132,9 +135,31 @@
     g.add_option('-v', '--verbose',
                  dest='verbose', action='store_true',
                  help='Show command error messages')
+    g.add_option('-j', '--jobs',
+                 dest='jobs', action='store', type='int', default=1,
+                 help='number of commands to execute simultaneously')
 
   def WantPager(self, opt):
-    return opt.project_header
+    return opt.project_header and opt.jobs == 1
+
+  def _SerializeProject(self, project):
+    """ Serialize a project._GitGetByExec instance.
+
+    project._GitGetByExec is not pickle-able. Instead of trying to pass it
+    around between processes, make a dict ourselves containing only the
+    attributes that we need.
+
+    """
+    return {
+      'name': project.name,
+      'relpath': project.relpath,
+      'remote_name': project.remote.name,
+      'lrev': project.GetRevisionId(),
+      'rrev': project.revisionExpr,
+      'annotations': dict((a.name, a.value) for a in project.annotations),
+      'gitdir': project.gitdir,
+      'worktree': project.worktree,
+    }
 
   def Execute(self, opt, args):
     if not opt.command:
@@ -173,11 +198,7 @@
       # pylint: enable=W0631
 
     mirror = self.manifest.IsMirror
-    out = ForallColoring(self.manifest.manifestProject.config)
-    out.redirect(sys.stdout)
-
     rc = 0
-    first = True
 
     if not opt.regex:
       projects = self.GetProjects(args)
@@ -186,113 +207,156 @@
 
     os.environ['REPO_COUNT'] = str(len(projects))
 
-    for (cnt, project) in enumerate(projects):
-      env = os.environ.copy()
-      def setenv(name, val):
-        if val is None:
-          val = ''
-        env[name] = val.encode()
-
-      setenv('REPO_PROJECT', project.name)
-      setenv('REPO_PATH', project.relpath)
-      setenv('REPO_REMOTE', project.remote.name)
-      setenv('REPO_LREV', project.GetRevisionId())
-      setenv('REPO_RREV', project.revisionExpr)
-      setenv('REPO_I', str(cnt + 1))
-      for a in project.annotations:
-        setenv("REPO__%s" % (a.name), a.value)
-
-      if mirror:
-        setenv('GIT_DIR', project.gitdir)
-        cwd = project.gitdir
-      else:
-        cwd = project.worktree
-
-      if not os.path.exists(cwd):
-        if (opt.project_header and opt.verbose) \
-        or not opt.project_header:
-          print('skipping %s/' % project.relpath, file=sys.stderr)
-        continue
-
-      if opt.project_header:
-        stdin = subprocess.PIPE
-        stdout = subprocess.PIPE
-        stderr = subprocess.PIPE
-      else:
-        stdin = None
-        stdout = None
-        stderr = None
-
-      p = subprocess.Popen(cmd,
-                           cwd = cwd,
-                           shell = shell,
-                           env = env,
-                           stdin = stdin,
-                           stdout = stdout,
-                           stderr = stderr)
-
-      if opt.project_header:
-        class sfd(object):
-          def __init__(self, fd, dest):
-            self.fd = fd
-            self.dest = dest
-          def fileno(self):
-            return self.fd.fileno()
-
-        empty = True
-        errbuf = ''
-
-        p.stdin.close()
-        s_in = [sfd(p.stdout, sys.stdout),
-                sfd(p.stderr, sys.stderr)]
-
-        for s in s_in:
-          flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
-          fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
-        while s_in:
-          in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
-          for s in in_ready:
-            buf = s.fd.read(4096)
-            if not buf:
-              s.fd.close()
-              s_in.remove(s)
-              continue
-
-            if not opt.verbose:
-              if s.fd != p.stdout:
-                errbuf += buf
-                continue
-
-            if empty:
-              if first:
-                first = False
-              else:
-                out.nl()
-
-              if mirror:
-                project_header_path = project.name
-              else:
-                project_header_path = project.relpath
-              out.project('project %s/', project_header_path)
-              out.nl()
-              out.flush()
-              if errbuf:
-                sys.stderr.write(errbuf)
-                sys.stderr.flush()
-                errbuf = ''
-              empty = False
-
-            s.dest.write(buf)
-            s.dest.flush()
-
-      r = p.wait()
-      if r != 0:
-        if r != rc:
-          rc = r
-        if opt.abort_on_errors:
-          print("error: %s: Aborting due to previous error" % project.relpath,
-                file=sys.stderr)
-          sys.exit(r)
+    pool = multiprocessing.Pool(opt.jobs)
+    try:
+      config = self.manifest.manifestProject.config
+      results_it = pool.imap(
+         DoWorkWrapper,
+         [[mirror, opt, cmd, shell, cnt, config, self._SerializeProject(p)]
+          for cnt, p in enumerate(projects)]
+      )
+      pool.close()
+      for r in results_it:
+        rc = rc or r
+        if r != 0 and opt.abort_on_errors:
+          raise Exception('Aborting due to previous error')
+    except (KeyboardInterrupt, WorkerKeyboardInterrupt):
+      # Catch KeyboardInterrupt raised inside and outside of workers
+      print('Interrupted - terminating the pool')
+      pool.terminate()
+      rc = rc or errno.EINTR
+    except Exception as e:
+      # Catch any other exceptions raised
+      print('Got an error, terminating the pool: %r' % e,
+            file=sys.stderr)
+      pool.terminate()
+      rc = rc or getattr(e, 'errno', 1)
+    finally:
+      pool.join()
     if rc != 0:
       sys.exit(rc)
+
+
+class WorkerKeyboardInterrupt(Exception):
+  """ Keyboard interrupt exception for worker processes. """
+  pass
+
+
+def DoWorkWrapper(args):
+  """ A wrapper around the DoWork() method.
+
+  Catch the KeyboardInterrupt exceptions here and re-raise them as a different,
+  ``Exception``-based exception to stop it flooding the console with stacktraces
+  and making the parent hang indefinitely.
+
+  """
+  project = args.pop()
+  try:
+    return DoWork(project, *args)
+  except KeyboardInterrupt:
+    print('%s: Worker interrupted' % project['name'])
+    raise WorkerKeyboardInterrupt()
+
+
+def DoWork(project, mirror, opt, cmd, shell, cnt, config):
+  env = os.environ.copy()
+  def setenv(name, val):
+    if val is None:
+      val = ''
+    env[name] = val.encode()
+
+  setenv('REPO_PROJECT', project['name'])
+  setenv('REPO_PATH', project['relpath'])
+  setenv('REPO_REMOTE', project['remote_name'])
+  setenv('REPO_LREV', project['lrev'])
+  setenv('REPO_RREV', project['rrev'])
+  setenv('REPO_I', str(cnt + 1))
+  for name in project['annotations']:
+    setenv("REPO__%s" % (name), project['annotations'][name])
+
+  if mirror:
+    setenv('GIT_DIR', project['gitdir'])
+    cwd = project['gitdir']
+  else:
+    cwd = project['worktree']
+
+  if not os.path.exists(cwd):
+    if (opt.project_header and opt.verbose) \
+    or not opt.project_header:
+      print('skipping %s/' % project['relpath'], file=sys.stderr)
+    return
+
+  if opt.project_header:
+    stdin = subprocess.PIPE
+    stdout = subprocess.PIPE
+    stderr = subprocess.PIPE
+  else:
+    stdin = None
+    stdout = None
+    stderr = None
+
+  p = subprocess.Popen(cmd,
+                       cwd=cwd,
+                       shell=shell,
+                       env=env,
+                       stdin=stdin,
+                       stdout=stdout,
+                       stderr=stderr)
+
+  if opt.project_header:
+    out = ForallColoring(config)
+    out.redirect(sys.stdout)
+    class sfd(object):
+      def __init__(self, fd, dest):
+        self.fd = fd
+        self.dest = dest
+      def fileno(self):
+        return self.fd.fileno()
+
+    empty = True
+    errbuf = ''
+
+    p.stdin.close()
+    s_in = [sfd(p.stdout, sys.stdout),
+            sfd(p.stderr, sys.stderr)]
+
+    for s in s_in:
+      flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
+      fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+    while s_in:
+      in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
+      for s in in_ready:
+        buf = s.fd.read(4096)
+        if not buf:
+          s.fd.close()
+          s_in.remove(s)
+          continue
+
+        if not opt.verbose:
+          if s.fd != p.stdout:
+            errbuf += buf
+            continue
+
+        if empty and out:
+          if not cnt == 0:
+            out.nl()
+
+          if mirror:
+            project_header_path = project['name']
+          else:
+            project_header_path = project['relpath']
+          out.project('project %s/', project_header_path)
+          out.nl()
+          out.flush()
+          if errbuf:
+            sys.stderr.write(errbuf)
+            sys.stderr.flush()
+            errbuf = ''
+          empty = False
+
+        s.dest.write(buf)
+        s.dest.flush()
+
+  r = p.wait()
+  return r