Make "git command" and "forall" work on Windows

Python on Windows does not support non blocking file operations.
To workaround this issue, we instead use Threads and a Queue to
simulate non-blocking calls. This is happens only when running
with the native Windows version of Python, meaning Linux and Cygwin
are not affected by this change.

Change-Id: I4ce23827b096c5138f67a85c721f58a12279bb6f
diff --git a/git_command.py b/git_command.py
index 9f7d293..dfa6a92 100644
--- a/git_command.py
+++ b/git_command.py
@@ -14,14 +14,14 @@
 # limitations under the License.
 
 from __future__ import print_function
-import fcntl
 import os
-import select
 import sys
 import subprocess
 import tempfile
 from signal import SIGTERM
+
 from error import GitError
+import platform_utils
 from trace import REPO_TRACE, IsTrace, Trace
 from wrapper import Wrapper
 
@@ -78,16 +78,6 @@
 
 _git_version = None
 
-class _sfd(object):
-  """select file descriptor class"""
-  def __init__(self, fd, dest, std_name):
-    assert std_name in ('stdout', 'stderr')
-    self.fd = fd
-    self.dest = dest
-    self.std_name = std_name
-  def fileno(self):
-    return self.fd.fileno()
-
 class _GitCall(object):
   def version(self):
     p = GitCommand(None, ['--version'], capture_stdout=True)
@@ -253,19 +243,16 @@
 
   def _CaptureOutput(self):
     p = self.process
-    s_in = [_sfd(p.stdout, sys.stdout, 'stdout'),
-            _sfd(p.stderr, sys.stderr, 'stderr')]
+    s_in = platform_utils.FileDescriptorStreams.create()
+    s_in.add(p.stdout, sys.stdout, 'stdout')
+    s_in.add(p.stderr, sys.stderr, 'stderr')
     self.stdout = ''
     self.stderr = ''
 
-    for s in s_in:
-      flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
-      fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
-    while s_in:
-      in_ready, _, _ = select.select(s_in, [], [])
+    while not s_in.is_done:
+      in_ready = s_in.select()
       for s in in_ready:
-        buf = s.fd.read(4096)
+        buf = s.read()
         if not buf:
           s_in.remove(s)
           continue
diff --git a/platform_utils.py b/platform_utils.py
new file mode 100644
index 0000000..1c719b1
--- /dev/null
+++ b/platform_utils.py
@@ -0,0 +1,169 @@
+#
+# Copyright (C) 2016 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import platform
+import select
+
+from Queue import Queue
+from threading import Thread
+
+
+def isWindows():
+  """ Returns True when running with the native port of Python for Windows,
+  False when running on any other platform (including the Cygwin port of
+  Python).
+  """
+  # Note: The cygwin port of Python returns "CYGWIN_NT_xxx"
+  return platform.system() == "Windows"
+
+
+class FileDescriptorStreams(object):
+  """ Platform agnostic abstraction enabling non-blocking I/O over a
+  collection of file descriptors. This abstraction is required because
+  fctnl(os.O_NONBLOCK) is not supported on Windows.
+  """
+  @classmethod
+  def create(cls):
+    """ Factory method: instantiates the concrete class according to the
+    current platform.
+    """
+    if isWindows():
+      return _FileDescriptorStreamsThreads()
+    else:
+      return _FileDescriptorStreamsNonBlocking()
+
+  def __init__(self):
+    self.streams = []
+
+  def add(self, fd, dest, std_name):
+    """ Wraps an existing file descriptor as a stream.
+    """
+    self.streams.append(self._create_stream(fd, dest, std_name))
+
+  def remove(self, stream):
+    """ Removes a stream, when done with it.
+    """
+    self.streams.remove(stream)
+
+  @property
+  def is_done(self):
+    """ Returns True when all streams have been processed.
+    """
+    return len(self.streams) == 0
+
+  def select(self):
+    """ Returns the set of streams that have data available to read.
+    The returned streams each expose a read() and a close() method.
+    When done with a stream, call the remove(stream) method.
+    """
+    raise NotImplementedError
+
+  def _create_stream(fd, dest, std_name):
+    """ Creates a new stream wrapping an existing file descriptor.
+    """
+    raise NotImplementedError
+
+
+class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams):
+  """ Implementation of FileDescriptorStreams for platforms that support
+  non blocking I/O.
+  """
+  class Stream(object):
+    """ Encapsulates a file descriptor """
+    def __init__(self, fd, dest, std_name):
+      self.fd = fd
+      self.dest = dest
+      self.std_name = std_name
+      self.set_non_blocking()
+
+    def set_non_blocking(self):
+      import fcntl
+      flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
+      fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+    def fileno(self):
+      return self.fd.fileno()
+
+    def read(self):
+      return self.fd.read(4096)
+
+    def close(self):
+      self.fd.close()
+
+  def _create_stream(self, fd, dest, std_name):
+    return self.Stream(fd, dest, std_name)
+
+  def select(self):
+    ready_streams, _, _ = select.select(self.streams, [], [])
+    return ready_streams
+
+
+class _FileDescriptorStreamsThreads(FileDescriptorStreams):
+  """ Implementation of FileDescriptorStreams for platforms that don't support
+  non blocking I/O. This implementation requires creating threads issuing
+  blocking read operations on file descriptors.
+  """
+  def __init__(self):
+    super(_FileDescriptorStreamsThreads, self).__init__()
+    # The queue is shared accross all threads so we can simulate the
+    # behavior of the select() function
+    self.queue = Queue(10)  # Limit incoming data from streams
+
+  def _create_stream(self, fd, dest, std_name):
+    return self.Stream(fd, dest, std_name, self.queue)
+
+  def select(self):
+    # Return only one stream at a time, as it is the most straighforward
+    # thing to do and it is compatible with the select() function.
+    item = self.queue.get()
+    stream = item.stream
+    stream.data = item.data
+    return [stream]
+
+  class QueueItem(object):
+    """ Item put in the shared queue """
+    def __init__(self, stream, data):
+      self.stream = stream
+      self.data = data
+
+  class Stream(object):
+    """ Encapsulates a file descriptor """
+    def __init__(self, fd, dest, std_name, queue):
+      self.fd = fd
+      self.dest = dest
+      self.std_name = std_name
+      self.queue = queue
+      self.data = None
+      self.thread = Thread(target=self.read_to_queue)
+      self.thread.daemon = True
+      self.thread.start()
+
+    def close(self):
+      self.fd.close()
+
+    def read(self):
+      data = self.data
+      self.data = None
+      return data
+
+    def read_to_queue(self):
+      """ The thread function: reads everything from the file descriptor into
+      the shared queue and terminates when reaching EOF.
+      """
+      for line in iter(self.fd.readline, b''):
+        self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line))
+      self.fd.close()
+      self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, None))
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 07ee8d5..2c12c55 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -15,17 +15,16 @@
 
 from __future__ import print_function
 import errno
-import fcntl
 import multiprocessing
 import re
 import os
-import select
 import signal
 import sys
 import subprocess
 
 from color import Coloring
 from command import Command, MirrorSafeCommand
+import platform_utils
 
 _CAN_COLOR = [
   'branch',
@@ -344,35 +343,25 @@
   if opt.project_header:
     out = ForallColoring(config)
     out.redirect(sys.stdout)
-    class sfd(object):
-      def __init__(self, fd, dest):
-        self.fd = fd
-        self.dest = dest
-      def fileno(self):
-        return self.fd.fileno()
-
     empty = True
     errbuf = ''
 
     p.stdin.close()
-    s_in = [sfd(p.stdout, sys.stdout),
-            sfd(p.stderr, sys.stderr)]
+    s_in = platform_utils.FileDescriptorStreams.create()
+    s_in.add(p.stdout, sys.stdout, 'stdout')
+    s_in.add(p.stderr, sys.stderr, 'stderr')
 
-    for s in s_in:
-      flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
-      fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
-    while s_in:
-      in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
+    while not s_in.is_done:
+      in_ready = s_in.select()
       for s in in_ready:
-        buf = s.fd.read(4096)
+        buf = s.read()
         if not buf:
-          s.fd.close()
+          s.close()
           s_in.remove(s)
           continue
 
         if not opt.verbose:
-          if s.fd != p.stdout:
+          if s.std_name == 'stderr':
             errbuf += buf
             continue