Update for PEP
diff --git a/python2/crawl.py b/python2/crawl.py
index cf4a218..f1d29ca 100644
--- a/python2/crawl.py
+++ b/python2/crawl.py
@@ -19,26 +19,29 @@
         'http://www.youtube.com/',
         'http://www.blogger.com/']
 
-def load_url(url):
-    return urllib2.urlopen(url).read()
+def load_url(url, timeout):
+    return urllib2.urlopen(url, timeout=timeout).read()
 
-def download_urls_sequential(urls):
+def download_urls_sequential(urls, timeout=60):
     url_to_content = {}
     for url in urls:
         try:
-            url_to_content[url] = load_url(url)
+            url_to_content[url] = load_url(url, timeout=timeout)
         except:
             pass
     return url_to_content
 
-def download_urls_with_executor(urls, executor):
+def download_urls_with_executor(urls, executor, timeout=60):
     try:
         url_to_content = {}
-        fs = executor.run_to_futures(
-                (functools.partial(load_url, url) for url in urls))
-        for future in fs.successful_futures():
-            url = urls[future.index]
-            url_to_content[url] = future.result()
+        future_to_url = dict((executor.submit(load_url, url, timeout), url)
+                             for url in urls)
+
+        for future in futures.as_completed(future_to_url):
+            try:
+                url_to_content[future_to_url[future]] = future.result()
+            except:
+                pass
         return url_to_content
     finally:
         executor.shutdown()
@@ -46,19 +49,20 @@
 def main():
     for name, fn in [('sequential',
                       functools.partial(download_urls_sequential, URLS)),
-                     ('threads',
-                      functools.partial(download_urls_with_executor,
-                                        URLS,
-                                        futures.ThreadPoolExecutor(10))),
                      ('processes',
                       functools.partial(download_urls_with_executor,
                                         URLS,
+                                        futures.ProcessPoolExecutor(10))),
+                     ('threads',
+                      functools.partial(download_urls_with_executor,
+                                        URLS,
                                         futures.ThreadPoolExecutor(10)))]:
-        print '%s: ' % name.ljust(12),
+        print name.ljust(12),
         start = time.time()
         url_map = fn()
         print '%.2f seconds (%d of %d downloaded)' % (time.time() - start,
-                                                     len(url_map),
-                                                     len(URLS))
+                                                      len(url_map),
+                                                      len(URLS))
 
-main()
+if __name__ == '__main__':
+    main()
diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py
index 27a5720..8331d53 100644
--- a/python2/futures/__init__.py
+++ b/python2/futures/__init__.py
@@ -1,18 +1,18 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
 
 """Execute computations asynchronously using threads or processes."""
 
 __author__ = 'Brian Quinlan ([email protected])'
 
-from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
-                           ALL_COMPLETED, RETURN_IMMEDIATELY,
-                           CancelledError, TimeoutError,
-                           Future, FutureList) 
+from futures._base import (FIRST_COMPLETED,
+                           FIRST_EXCEPTION,
+                           ALL_COMPLETED,
+                           CancelledError,
+                           TimeoutError,
+                           Future,
+                           Executor,
+                           wait,
+                           as_completed)
+from futures.process import ProcessPoolExecutor
 from futures.thread import ThreadPoolExecutor
-
-try:
-    import multiprocessing
-except ImportError:
-    pass
-else:
-    from futures.process import ProcessPoolExecutor
diff --git a/python2/futures/_base.py b/python2/futures/_base.py
index bec7212..ed7a094 100644
--- a/python2/futures/_base.py
+++ b/python2/futures/_base.py
@@ -1,54 +1,24 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
 
 __author__ = 'Brian Quinlan ([email protected])'
 
+import collections
+import functools
 import logging
 import threading
 import time
 
-try:
-    from functools import partial
-except ImportError:
-    def partial(func, *args, **keywords):
-        def newfunc(*fargs, **fkeywords):
-            newkeywords = keywords.copy()
-            newkeywords.update(fkeywords)
-            return func(*(args + fargs), **newkeywords)
-        newfunc.func = func
-        newfunc.args = args
-        newfunc.keywords = keywords
-        return newfunc
-
-# The "any" and "all" builtins weren't introduced until Python 2.5.
-try:
-    any
-except NameError:
-    def any(iterable):
-        for element in iterable:
-            if element:
-                return True
-        return False
-
-try:
-    all
-except NameError:
-    def all(iterable):
-        for element in iterable:
-            if not element:
-                return False
-        return True
-
 FIRST_COMPLETED = 'FIRST_COMPLETED'
 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
 ALL_COMPLETED = 'ALL_COMPLETED'
-RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY'
 
 # Possible future states (for internal use by the futures package).
 PENDING = 'PENDING'
 RUNNING = 'RUNNING'
 # The future was cancelled by the user...
-CANCELLED = 'CANCELLED'                            
-# ...and ThreadEventSink.add_cancelled() was called by a worker.
+CANCELLED = 'CANCELLED'
+# ...and _Waiter.add_cancelled() was called by a worker.
 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
 FINISHED = 'FINISHED'
 
@@ -70,184 +40,249 @@
 
 # Logger for internal use by the futures package.
 LOGGER = logging.getLogger("futures")
-_handler = logging.StreamHandler()
-LOGGER.addHandler(_handler)
-del _handler
-
-def set_future_exception(future, event_sink, exception):
-    """Sets a future as having terminated with an exception.
-    
-    This function should only be used within the futures package.
-
-    Args:
-        future: The Future that finished with an exception.
-        event_sink: The ThreadEventSink accociated with the Future's FutureList.
-            The event_sink will be notified of the Future's completion, which
-            may unblock some clients that have called FutureList.wait().
-        exception: The expection that executing the Future raised.
-    """
-    future._condition.acquire()
-    try:
-        future._exception = exception
-        event_sink._condition.acquire()
-        try:
-            future._state = FINISHED
-            event_sink.add_exception()
-        finally:
-            event_sink._condition.release()
-
-        future._condition.notifyAll()
-    finally:
-        future._condition.release()
-
-def set_future_result(future, event_sink, result):
-    """Sets a future as having terminated without exception.
-    
-    This function should only be used within the futures package.
-
-    Args:
-        future: The Future that completed.
-        event_sink: The ThreadEventSink accociated with the Future's FutureList.
-            The event_sink will be notified of the Future's completion, which
-            may unblock some clients that have called FutureList.wait().
-        result: The value returned by the Future.
-    """
-    future._condition.acquire()
-    try:
-        future._result = result
-        event_sink._condition.acquire()
-        try:
-            future._state = FINISHED
-            event_sink.add_result()
-        finally:
-            event_sink._condition.release()
-
-        future._condition.notifyAll()
-    finally:
-        future._condition.release()
+STDERR_HANDLER = logging.StreamHandler()
+LOGGER.addHandler(STDERR_HANDLER)
 
 class Error(Exception):
+    """Base class for all future-related exceptions."""
     pass
 
 class CancelledError(Error):
+    """The Future was cancelled."""
     pass
 
 class TimeoutError(Error):
+    """The operation exceeded the given deadline."""
     pass
 
-class _WaitTracker(object):
-    """Provides the event that FutureList.wait(...) blocks on.
-
-    """
+class _Waiter(object):
+    """Provides the event that wait() and as_completed() block on."""
     def __init__(self):
         self.event = threading.Event()
+        self.finished_futures = []
 
-    def add_result(self):
-        raise NotImplementedError()
+    def add_result(self, future):
+        self.finished_futures.append(future)
 
-    def add_exception(self):
-        raise NotImplementedError()
+    def add_exception(self, future):
+        self.finished_futures.append(future)
 
-    def add_cancelled(self):
-        raise NotImplementedError()
+    def add_cancelled(self, future):
+        self.finished_futures.append(future)
 
-class _FirstCompletedWaitTracker(_WaitTracker):
-    """Used by wait(return_when=FIRST_COMPLETED)."""
+class _FirstCompletedWaiter(_Waiter):
+    """Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
 
-    def add_result(self):
+    def add_result(self, future):
+        super(_FirstCompletedWaiter, self).add_result(future)
         self.event.set()
 
-    def add_exception(self):
+    def add_exception(self, future):
+        super(_FirstCompletedWaiter, self).add_exception(future)
         self.event.set()
 
-    def add_cancelled(self):
+    def add_cancelled(self, future):
+        super(_FirstCompletedWaiter, self).add_cancelled(future)
         self.event.set()
 
-class _AllCompletedWaitTracker(_WaitTracker):
+class _AllCompletedWaiter(_Waiter):
     """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
 
     def __init__(self, num_pending_calls, stop_on_exception):
         self.num_pending_calls = num_pending_calls
         self.stop_on_exception = stop_on_exception
-        _WaitTracker.__init__(self)
+        super(_AllCompletedWaiter, self).__init__()
 
-    def add_result(self):
+    def _decrement_pending_calls(self):
         self.num_pending_calls -= 1
         if not self.num_pending_calls:
             self.event.set()
 
-    def add_exception(self):
+    def add_result(self, future):
+        super(_AllCompletedWaiter, self).add_result(future)
+        self._decrement_pending_calls()
+
+    def add_exception(self, future):
+        super(_AllCompletedWaiter, self).add_exception(future)
         if self.stop_on_exception:
             self.event.set()
         else:
-            self.add_result()
+            self._decrement_pending_calls()
 
-    def add_cancelled(self):
-        self.add_result()
+    def add_cancelled(self, future):
+        super(_AllCompletedWaiter, self).add_cancelled(future)
+        self._decrement_pending_calls()
 
-class ThreadEventSink(object):
-    """Forwards events to many _WaitTrackers.
+class _AcquireFutures(object):
+    """A context manager that does an ordered acquire of Future conditions."""
 
-    Each FutureList has a ThreadEventSink and each call to FutureList.wait()
-    causes a new _WaitTracker to be added to the ThreadEventSink. This design
-    allows many threads to call FutureList.wait() on the same FutureList with
-    different arguments.
+    def __init__(self, futures):
+        self.futures = sorted(futures, key=id)
 
-    This class should not be used by clients.
+    def __enter__(self):
+        for future in self.futures:
+            future._condition.acquire()
+
+    def __exit__(self, *args):
+        for future in self.futures:
+            future._condition.release()
+
+def _create_and_install_waiters(fs, return_when):
+    if return_when == FIRST_COMPLETED:
+        waiter = _FirstCompletedWaiter()
+    else:
+        pending_count = sum(
+                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
+
+        if return_when == FIRST_EXCEPTION:
+            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
+        elif return_when == ALL_COMPLETED:
+            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
+        else:
+            raise ValueError("Invalid return condition: %r" % return_when)
+
+    for f in fs:
+        f._waiters.append(waiter)
+
+    return waiter
+
+def as_completed(fs, timeout=None):
+    """An iterator over the given futures that yields each as it completes.
+
+    Args:
+        fs: The sequence of Futures (possibly created by different Executors) to
+            iterate over.
+        timeout: The maximum number of seconds to wait. If None, then there
+            is no limit on the wait time.
+
+    Returns:
+        An iterator that yields the given Futures as they complete (finished or
+        cancelled).
+
+    Raises:
+        TimeoutError: If the entire result iterator could not be generated
+            before the given timeout.
     """
-    def __init__(self):
-        self._condition = threading.Lock()
-        self._waiters = []
+    if timeout is not None:
+        end_time = timeout + time.time()
 
-    def add(self, e):
-        self._waiters.append(e)
+    with _AcquireFutures(fs):
+        finished = set(
+                f for f in fs
+                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
+        pending = set(fs) - finished
+        waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
 
-    def remove(self, e):
-        self._waiters.remove(e)
+    try:
+        for future in finished:
+            yield future
 
-    def add_result(self):
-        for waiter in self._waiters:
-            waiter.add_result()
+        while pending:
+            if timeout is None:
+                wait_timeout = None
+            else:
+                wait_timeout = end_time - time.time()
+                if wait_timeout < 0:
+                    raise TimeoutError(
+                            '%d (of %d) futures unfinished' % (
+                            len(pending), len(fs)))
 
-    def add_exception(self):
-        for waiter in self._waiters:
-            waiter.add_exception()
+            waiter.event.wait(timeout)
 
-    def add_cancelled(self):
-        for waiter in self._waiters:
-            waiter.add_cancelled()
+            for future in waiter.finished_futures[:]:
+                yield future
+                waiter.finished_futures.remove(future)
+                pending.remove(future)
+
+    finally:
+        for f in fs:
+            f._waiters.remove(waiter)
+
+DoneAndNotDoneFutures = collections.namedtuple(
+        'DoneAndNotDoneFutures', 'done not_done')
+def wait(fs, timeout=None, return_when=ALL_COMPLETED):
+    """Wait for the futures in the given sequence to complete.
+
+    Args:
+        fs: The sequence of Futures (possibly created by different Executors) to
+            wait upon.
+        timeout: The maximum number of seconds to wait. If None, then there
+            is no limit on the wait time.
+        return_when: Indicates when this function should return. The options
+            are:
+
+            FIRST_COMPLETED - Return when any future finishes or is
+                              cancelled.
+            FIRST_EXCEPTION - Return when any future finishes by raising an
+                              exception. If no future raises an exception
+                              then it is equivalent to ALL_COMPLETED.
+            ALL_COMPLETED -   Return when all futures finish or are cancelled.
+
+    Returns:
+        A named 2-tuple of sets. The first set, named 'done', contains the
+        futures that completed (is finished or cancelled) before the wait
+        completed. The second set, named 'not_done', contains uncompleted
+        futures.
+    """
+    with _AcquireFutures(fs):
+        done = set(f for f in fs
+                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
+        not_done = set(fs) - done
+
+        if (return_when == FIRST_COMPLETED) and done:
+            return DoneAndNotDoneFutures(done, not_done)
+        elif (return_when == FIRST_EXCEPTION) and done:
+            if any(f for f in done
+                   if not f.cancelled() and f.exception() is not None):
+                return DoneAndNotDoneFutures(done, not_done)
+
+        if len(done) == len(fs):
+            return DoneAndNotDoneFutures(done, not_done)
+
+        waiter = _create_and_install_waiters(fs, return_when)
+
+    waiter.event.wait(timeout)
+    for f in fs:
+        f._waiters.remove(waiter)
+
+    done.update(waiter.finished_futures)
+    return DoneAndNotDoneFutures(done, set(fs) - done)
 
 class Future(object):
     """Represents the result of an asynchronous computation."""
 
-    def __init__(self, index):
+    def __init__(self):
         """Initializes the future. Should not be called by clients."""
         self._condition = threading.Condition()
         self._state = PENDING
         self._result = None
         self._exception = None
-        self._index = index
+        self._waiters = []
+        self._done_callbacks = []
+
+    def _invoke_callbacks(self):
+        for callback in self._done_callbacks:
+            try:
+                callback(self)
+            except Exception:
+                LOGGER.exception('exception calling callback for %r', self)
 
     def __repr__(self):
-        self._condition.acquire()
-        try:
+        with self._condition:
             if self._state == FINISHED:
                 if self._exception:
-                    return '<Future state=%s raised %s>' % (
+                    return '<Future at %s state=%s raised %s>' % (
+                        hex(id(self)),
                         _STATE_TO_DESCRIPTION_MAP[self._state],
                         self._exception.__class__.__name__)
                 else:
-                    return '<Future state=%s returned %s>' % (
+                    return '<Future at %s state=%s returned %s>' % (
+                        hex(id(self)),
                         _STATE_TO_DESCRIPTION_MAP[self._state],
                         self._result.__class__.__name__)
-            return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
-        finally:
-            self._condition.release()
-
-    @property
-    def index(self):
-        """The index of the future in its FutureList."""
-        return self._index
+            return '<Future at %s state=%s>' % (
+                    hex(id(self)),
+                   _STATE_TO_DESCRIPTION_MAP[self._state])
 
     def cancel(self):
         """Cancel the future if possible.
@@ -255,40 +290,33 @@
         Returns True if the future was cancelled, False otherwise. A future
         cannot be cancelled if it is running or has already completed.
         """
-        self._condition.acquire()
-        try:
+        with self._condition:
             if self._state in [RUNNING, FINISHED]:
                 return False
 
-            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]:
-                self._state = CANCELLED
-                self._condition.notify_all()
-            return True
-        finally:
-            self._condition.release()
+            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+                return True
+
+            self._state = CANCELLED
+            self._condition.notify_all()
+
+        self._invoke_callbacks()
+        return True
 
     def cancelled(self):
         """Return True if the future has cancelled."""
-        self._condition.acquire()
-        try:
+        with self._condition:
             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
-        finally:
-            self._condition.release()
 
     def running(self):
-        self._condition.acquire()
-        try:
+        """Return True if the future is currently executing."""
+        with self._condition:
             return self._state == RUNNING
-        finally:
-            self._condition.release()
 
     def done(self):
         """Return True of the future was cancelled or finished executing."""
-        self._condition.acquire()
-        try:
+        with self._condition:
             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
-        finally:
-            self._condition.release()
 
     def __get_result(self):
         if self._exception:
@@ -296,6 +324,23 @@
         else:
             return self._result
 
+    def add_done_callback(self, fn):
+        """Attaches a callable that will be called when the future finishes.
+
+        Args:
+            fn: A callable that will be called with this future as its only
+                argument when the future completes or is cancelled. The callable
+                will always be called by a thread in the same process in which
+                it was added. If the future has already completed or been
+                cancelled then the callable will be called immediately. These
+                callables are called in the order that they were added.
+        """
+        with self._condition:
+            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
+                self._done_callbacks.append(fn)
+                return
+        fn(self)
+
     def result(self, timeout=None):
         """Return the result of the call that the future represents.
 
@@ -312,8 +357,7 @@
                 timeout.
             Exception: If the call raised then that exception will be raised.
         """
-        self._condition.acquire()
-        try:
+        with self._condition:
             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                 raise CancelledError()
             elif self._state == FINISHED:
@@ -327,8 +371,6 @@
                 return self.__get_result()
             else:
                 raise TimeoutError()
-        finally:
-            self._condition.release()
 
     def exception(self, timeout=None):
         """Return the exception raised by the call that the future represents.
@@ -348,8 +390,7 @@
                 timeout.
         """
 
-        self._condition.acquire()
-        try:
+        with self._condition:
             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                 raise CancelledError()
             elif self._state == FINISHED:
@@ -363,237 +404,94 @@
                 return self._exception
             else:
                 raise TimeoutError()
-        finally:
-            self._condition.release()
 
-class FutureList(object):
-    def __init__(self, futures, event_sink):
-        """Initializes the FutureList. Should not be called by clients."""
-        self._futures = futures
-        self._event_sink = event_sink
+    # The following methods should only be used by Executors and in tests.
+    def set_running_or_notify_cancel(self):
+        """Mark the future as running or process any cancel notifications.
 
-    def wait(self, timeout=None, return_when=ALL_COMPLETED):
-        """Wait for the futures in the list to complete.
+        Should only be used by Executor implementations and unit tests.
 
-        Args:
-            timeout: The maximum number of seconds to wait. If None, then there
-                is no limit on the wait time.
-            return_when: Indicates when the method should return. The options
-                are:
+        If the future has been cancelled (cancel() was called and returned
+        True) then any threads waiting on the future completing (though calls
+        to as_completed() or wait()) are notified and False is returned.
 
-                FIRST_COMPLETED - Return when any future finishes or is
-                                  cancelled.
-                FIRST_EXCEPTION - Return when any future finishes by raising an
-                                  exception. If no future raises and exception
-                                  then it is equivalent to ALL_COMPLETED.
-                ALL_COMPLETED - Return when all futures finish or are cancelled.
-                RETURN_IMMEDIATELY - Return without waiting (this is not likely
-                                     to be a useful option but it is there to
-                                     be symmetrical with the
-                                     executor.run_to_futures() method.
+        If the future was not cancelled then it is put in the running state
+        (future calls to running() will return True) and True is returned.
+
+        This method should be called by Executor implementations before
+        executing the work associated with this future. If this method returns
+        False then the work should not be executed.
+
+        Returns:
+            False if the Future was cancelled, True otherwise.
 
         Raises:
-            TimeoutError: If the wait condition wasn't satisfied before the
-                given timeout.
+            RuntimeError: if this method was already called or if set_result()
+                or set_exception() was called.
         """
-        if return_when == RETURN_IMMEDIATELY:
-            return
-
-        # Futures cannot change state without this condition being held.
-        self._event_sink._condition.acquire()
-        try:
-            # Make a quick exit if every future is already done. This check is
-            # necessary because, if every future is in the
-            # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
-            # never receive any events.
-            if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
-                   for f in self):
-                return
-
-            if return_when == FIRST_COMPLETED:
-                completed_tracker = _FirstCompletedWaitTracker()
+        with self._condition:
+            if self._state == CANCELLED:
+                self._state = CANCELLED_AND_NOTIFIED
+                for waiter in self._waiters:
+                    waiter.add_cancelled(self)
+                # self._condition.notify_all() is not necessary because
+                # self.cancel() triggers a notification.
+                return False
+            elif self._state == PENDING:
+                self._state = RUNNING
+                return True
             else:
-                # Calculate how many events are expected before every future
-                # is complete. This can be done without holding the futures'
-                # locks because a future cannot transition itself into either
-                # of the states being looked for.
-                pending_count = sum(
-                        f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
-                        for f in self)
+                LOGGER.critical('Future %s in unexpected state: %s',
+                                id(self.future),
+                                self.future._state)
+                raise RuntimeError('Future in unexpected state')
 
-                if return_when == FIRST_EXCEPTION:
-                    completed_tracker = _AllCompletedWaitTracker(
-                            pending_count, stop_on_exception=True)
-                elif return_when == ALL_COMPLETED:
-                    completed_tracker = _AllCompletedWaitTracker(
-                            pending_count, stop_on_exception=False)
+    def set_result(self, result):
+        """Sets the return value of work associated with the future.
 
-            self._event_sink.add(completed_tracker)
-        finally:
-            self._event_sink._condition.release()
-
-        try:
-            completed_tracker.event.wait(timeout)
-        finally:
-            self._event_sink.remove(completed_tracker)
-
-    def cancel(self, timeout=None):
-        """Cancel the futures in the list.
-
-        Args:
-            timeout: The maximum number of seconds to wait. If None, then there
-                is no limit on the wait time.
-
-        Raises:
-            TimeoutError: If all the futures were not finished before the
-                given timeout.
+        Should only be used by Executor implementations and unit tests.
         """
-        for f in self:
-            f.cancel()
-        self.wait(timeout=timeout, return_when=ALL_COMPLETED)
-        if any(not f.done() for f in self):
-            raise TimeoutError()
+        with self._condition:
+            self._result = result
+            self._state = FINISHED
+            for waiter in self._waiters:
+                waiter.add_result(self)
+            self._condition.notify_all()
+        self._invoke_callbacks()
 
-    def has_running_futures(self):
-        """Returns True if any futures in the list are still running."""
-        return any(self.running_futures())
+    def set_exception(self, exception):
+        """Sets the result of the future as being the given exception.
 
-    def has_cancelled_futures(self):
-        """Returns True if any futures in the list were cancelled."""
-        return any(self.cancelled_futures())
-
-    def has_done_futures(self):
-        """Returns True if any futures in the list are finished or cancelled."""
-        return any(self.done_futures())
-
-    def has_successful_futures(self):
-        """Returns True if any futures in the list finished without raising."""
-        return any(self.successful_futures())
-
-    def has_exception_futures(self):
-        """Returns True if any futures in the list finished by raising."""
-        return any(self.exception_futures())
-
-    def cancelled_futures(self):
-        """Returns all cancelled futures in the list."""
-        return (f for f in self
-                if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED])
-  
-    def done_futures(self):
-        """Returns all futures in the list that are finished or cancelled."""
-        return (f for f in self
-                if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
-
-    def successful_futures(self):
-        """Returns all futures in the list that finished without raising."""
-        return (f for f in self
-                if f._state == FINISHED and f._exception is None)
-  
-    def exception_futures(self):
-        """Returns all futures in the list that finished by raising."""
-        return (f for f in self
-                if f._state == FINISHED and f._exception is not None)
-  
-    def running_futures(self):
-        """Returns all futures in the list that are still running."""
-        return (f for f in self if f._state == RUNNING)
-
-    def __len__(self):
-        return len(self._futures)
-
-    def __getitem__(self, i):
-        return self._futures[i]
-
-    def __iter__(self):
-        return iter(self._futures)
-
-    def __contains__(self, future):
-        return future in self._futures
-
-    def __repr__(self):
-        states = dict([(state, 0) for state in _FUTURE_STATES])
-        for f in self:
-            states[f._state] += 1
-
-        return ('<FutureList #futures=%d '
-                '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % (
-                len(self),
-                states[PENDING],
-                states[CANCELLED] + states[CANCELLED_AND_NOTIFIED],
-                states[RUNNING],
-                states[FINISHED]))
+        Should only be used by Executor implementations and unit tests.
+        """
+        with self._condition:
+            self._exception = exception
+            self._state = FINISHED
+            for waiter in self._waiters:
+                waiter.add_exception(self)
+            self._condition.notify_all()
+        self._invoke_callbacks()
 
 class Executor(object):
     """This is an abstract base class for concrete asynchronous executors."""
-    def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
-        """Return a list of futures representing the given calls.
 
-        Args:
-            calls: A sequence of callables that take no arguments. These will
-                be bound to Futures and returned.
-            timeout: The maximum number of seconds to wait. If None, then there
-                is no limit on the wait time.
-            return_when: Indicates when the method should return. The options
-                are:
+    def submit(self, fn, *args, **kwargs):
+        """Submits a callable to be executed with the given arguments.
 
-                FIRST_COMPLETED - Return when any future finishes or is
-                                  cancelled.
-                FIRST_EXCEPTION - Return when any future finishes by raising an
-                                  exception. If no future raises and exception
-                                  then it is equivalent to ALL_COMPLETED.
-                ALL_COMPLETED - Return when all futures finish or are cancelled.
-                RETURN_IMMEDIATELY - Return without waiting.
+        Schedules the callable to be executed as fn(*args, **kwargs) and returns
+        a Future instance representing the execution of the callable.
 
         Returns:
-            A FutureList containing Futures for the given calls.
+            A Future representing the given call.
         """
         raise NotImplementedError()
 
-    def run_to_results(self, calls, timeout=None):
-        """Returns a iterator of the results of the given calls.
-
-        Args:
-            calls: A sequence of callables that take no arguments. These will
-                be called and their results returned.
-            timeout: The maximum number of seconds to wait. If None, then there
-                is no limit on the wait time.
-
-        Returns:
-            An iterator over the results of the given calls. Equivalent to:
-            (call() for call in calls) but the calls may be evaluated
-            out-of-order.
-
-        Raises:
-            TimeoutError: If all the given calls were not completed before the
-                given timeout.
-            Exception: If any call() raises.
-        """
-        if timeout is not None:
-            end_time = timeout + time.time()
-
-        fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY)
-
-        try:
-            for future in fs:
-                if timeout is None:
-                    yield future.result()
-                else:
-                    yield future.result(end_time - time.time())
-        except Exception, e:
-            # Python 2.4 and earlier don't allow yield statements in
-            # try/finally blocks
-            try:
-                fs.cancel(timeout=0)
-            except TimeoutError:
-                pass
-            raise e
-
-    def map(self, func, *iterables, **kwargs):
+    def map(self, fn, *iterables, **kwargs):
         """Returns a iterator equivalent to map(fn, iter).
 
         Args:
-            func: A callable that will take take as many arguments as there
-                are passed iterables.
+            fn: A callable that will take take as many arguments as there are
+                passed iterables.
             timeout: The maximum number of seconds to wait. If None, then there
                 is no limit on the wait time.
 
@@ -606,17 +504,38 @@
                 before the given timeout.
             Exception: If fn(*args) raises for any values.
         """
-        timeout = kwargs.get('timeout') or None
-        calls = [partial(func, *args) for args in zip(*iterables)]
-        return self.run_to_results(calls, timeout=timeout)
+        timeout = kwargs.get('timeout')
+        if timeout is not None:
+            end_time = timeout + time.time()
 
-    def shutdown(self):
-        """Clean-up. No other methods can be called afterwards."""
-        raise NotImplementedError()
+        fs = [self.submit(fn, *args) for args in zip(*iterables)]
+
+        try:
+            for future in fs:
+                if timeout is None:
+                    yield future.result()
+                else:
+                    yield future.result(end_time - time.time())
+        finally:
+            for future in fs:
+                future.cancel()
+
+    def shutdown(self, wait=True):
+        """Clean-up the resources associated with the Executor.
+
+        It is safe to call this method several times. Otherwise, no other
+        methods can be called after this one.
+
+        Args:
+            wait: If True then shutdown will not return until all running
+                futures have finished executing and the resources used by the
+                executor have been reclaimed.
+        """
+        pass
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        self.shutdown()
+        self.shutdown(wait=True)
         return False
diff --git a/python2/futures/process.py b/python2/futures/process.py
index f0d7fdf..ec48377 100644
--- a/python2/futures/process.py
+++ b/python2/futures/process.py
@@ -1,4 +1,5 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
 
 """Implements ProcessPoolExecutor.
 
@@ -23,9 +24,8 @@
 |          |     | ...        |     |        |     | 3, except |    |         |
 +----------+     +------------+     +--------+     +-----------+    +---------+
 
-Executor.run_to_futures() called:
-- creates a uniquely numbered _WorkItem for each call and adds them to the
-  "Work Items" dict
+Executor.submit() called:
+- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
 - adds the id of the _WorkItem to the "Work Ids" queue
 
 Local worker thread:
@@ -42,15 +42,11 @@
 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
   _ResultItems in "Request Q"
 """
- 
+
 __author__ = 'Brian Quinlan ([email protected])'
 
-from futures._base import (PENDING, RUNNING, CANCELLED,
-                           CANCELLED_AND_NOTIFIED, FINISHED,
-                           ALL_COMPLETED,
-                           set_future_exception, set_future_result,
-                           Executor, Future, FutureList, ThreadEventSink)
 import atexit
+import _base
 import Queue
 import multiprocessing
 import threading
@@ -63,7 +59,7 @@
 #   - The workers would still be running during interpretor shutdown,
 #     meaning that they would fail in unpredictable ways.
 #   - The workers could be killed while evaluating a work item, which could
-#     be bad if the function being evaluated has external side-effects e.g.
+#     be bad if the callable being evaluated has external side-effects e.g.
 #     writing to a file.
 #
 # To work around this problem, an exit handler is installed which tells the
@@ -100,10 +96,11 @@
 EXTRA_QUEUED_CALLS = 1
 
 class _WorkItem(object):
-    def __init__(self, call, future, completion_tracker):
-        self.call = call
+    def __init__(self, future, fn, args, kwargs):
         self.future = future
-        self.completion_tracker = completion_tracker
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
 
 class _ResultItem(object):
     def __init__(self, work_id, exception=None, result=None):
@@ -112,9 +109,11 @@
         self.result = result
 
 class _CallItem(object):
-    def __init__(self, work_id, call):
+    def __init__(self, work_id, fn, args, kwargs):
         self.work_id = work_id
-        self.call = call
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
 
 def _process_worker(call_queue, result_queue, shutdown):
     """Evaluates calls from call_queue and places the results in result_queue.
@@ -137,8 +136,8 @@
                 return
         else:
             try:
-                r = call_item.call()
-            except Exception, e:
+                r = call_item.fn(*call_item.args, **call_item.kwargs)
+            except BaseException as e:
                 result_queue.put(_ResultItem(call_item.work_id,
                                              exception=e))
             else:
@@ -172,19 +171,15 @@
         else:
             work_item = pending_work_items[work_id]
 
-            if work_item.future.cancelled():
-                work_item.future._condition.acquire()
-                work_item.future._condition.notify_all()
-                work_item.future._condition.release()
-
-                work_item.completion_tracker.add_cancelled()
+            if work_item.future.set_running_or_notify_cancel():
+                call_queue.put(_CallItem(work_id,
+                                         work_item.fn,
+                                         work_item.args,
+                                         work_item.kwargs),
+                               block=True)
+            else:
                 del pending_work_items[work_id]
                 continue
-            else:
-                work_item.future._condition.acquire()
-                work_item.future._state = RUNNING
-                work_item.future._condition.release()
-                call_queue.put(_CallItem(work_id, work_item.call), block=True)
 
 def _queue_manangement_worker(executor_reference,
                               processes,
@@ -218,6 +213,7 @@
         _add_call_item_to_queue(pending_work_items,
                                 work_ids_queue,
                                 call_queue)
+
         try:
             result_item = result_queue.get(block=True, timeout=0.1)
         except Queue.Empty:
@@ -244,15 +240,11 @@
             del pending_work_items[result_item.work_id]
 
             if result_item.exception:
-                set_future_exception(work_item.future,
-                                     work_item.completion_tracker,
-                                     result_item.exception)
+                work_item.future.set_exception(result_item.exception)
             else:
-                set_future_result(work_item.future,
-                                     work_item.completion_tracker,
-                                     result_item.result)
+                work_item.future.set_result(result_item.result)
 
-class ProcessPoolExecutor(Executor):
+class ProcessPoolExecutor(_base.Executor):
     def __init__(self, max_workers=None):
         """Initializes a new ProcessPoolExecutor instance.
 
@@ -296,7 +288,7 @@
                           self._call_queue,
                           self._result_queue,
                           self._shutdown_process_event))
-            self._queue_management_thread.setDaemon(True)
+            self._queue_management_thread.daemon = True
             self._queue_management_thread.start()
             _thread_references.add(weakref.ref(self._queue_management_thread))
 
@@ -310,36 +302,36 @@
             p.start()
             self._processes.add(p)
 
-    def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
-        self._shutdown_lock.acquire()
-        try:
+    def submit(self, fn, *args, **kwargs):
+        with self._shutdown_lock:
             if self._shutdown_thread:
-                raise RuntimeError('cannot run new futures after shutdown')
+                raise RuntimeError('cannot schedule new futures after shutdown')
 
-            futures = []
-            event_sink = ThreadEventSink()
+            f = _base.Future()
+            w = _WorkItem(f, fn, args, kwargs)
 
-            for index, call in enumerate(calls):
-                f = Future(index)
-                self._pending_work_items[self._queue_count] = _WorkItem(
-                        call, f, event_sink)
-                self._work_ids.put(self._queue_count)
-                futures.append(f)
-                self._queue_count += 1
+            self._pending_work_items[self._queue_count] = w
+            self._work_ids.put(self._queue_count)
+            self._queue_count += 1
 
             self._start_queue_management_thread()
             self._adjust_process_count()
-            fl = FutureList(futures, event_sink)
-            fl.wait(timeout=timeout, return_when=return_when)
-            return fl
-        finally:
-            self._shutdown_lock.release()
+            return f
+    submit.__doc__ = _base.Executor.submit.__doc__
 
-    def shutdown(self):
-        self._shutdown_lock.acquire()
-        try:
+    def shutdown(self, wait=True):
+        with self._shutdown_lock:
             self._shutdown_thread = True
-        finally:
-            self._shutdown_lock.release()
+        if wait:
+            if self._queue_management_thread:
+                self._queue_management_thread.join()
+        # To reduce the risk of openning too many files, remove references to
+        # objects that use file descriptors.
+        self._queue_management_thread = None
+        self._call_queue = None
+        self._result_queue = None
+        self._shutdown_process_event = None
+        self._processes = None
+    shutdown.__doc__ = _base.Executor.shutdown.__doc__
 
-atexit.register(_python_exit)
\ No newline at end of file
+atexit.register(_python_exit)
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 4f410fe..3f1584a 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -1,16 +1,12 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
 
 """Implements ThreadPoolExecutor."""
 
 __author__ = 'Brian Quinlan ([email protected])'
 
-from futures._base import (PENDING, RUNNING, CANCELLED,
-                           CANCELLED_AND_NOTIFIED, FINISHED,
-                           ALL_COMPLETED,
-                           LOGGER,
-                           set_future_exception, set_future_result,
-                           Executor, Future, FutureList, ThreadEventSink)
 import atexit
+import _base
 import Queue
 import threading
 import weakref
@@ -22,15 +18,15 @@
 #   - The workers would still be running during interpretor shutdown,
 #     meaning that they would fail in unpredictable ways.
 #   - The workers could be killed while evaluating a work item, which could
-#     be bad if the function being evaluated has external side-effects e.g.
+#     be bad if the callable being evaluated has external side-effects e.g.
 #     writing to a file.
 #
 # To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until they
-# finish.
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
 
-_thread_references = set()  # Weakrefs to every active worker thread.
-_shutdown = False           # Indicates that the interpreter is shutting down.
+_thread_references = set()
+_shutdown = False
 
 def _python_exit():
     global _shutdown
@@ -43,11 +39,10 @@
 def _remove_dead_thread_references():
     """Remove inactive threads from _thread_references.
 
-    Should be called periodically to prevent thread objects from accumulating in
-    scenarios such as:
+    Should be called periodically to prevent memory leaks in scenarios such as:
     >>> while True:
-    >>> ...    t = ThreadPoolExecutor(max_workers=5)
-    >>> ...    t.map(int, ['1', '2', '3', '4', '5'])
+    ...    t = ThreadPoolExecutor(max_workers=5)
+    ...    t.map(int, ['1', '2', '3', '4', '5'])
     """
     for thread_reference in set(_thread_references):
         if thread_reference() is None:
@@ -56,38 +51,22 @@
 atexit.register(_python_exit)
 
 class _WorkItem(object):
-    def __init__(self, call, future, completion_tracker):
-        self.call = call
+    def __init__(self, future, fn, args, kwargs):
         self.future = future
-        self.completion_tracker = completion_tracker
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
 
     def run(self):
-        self.future._condition.acquire()
-        try:
-            if self.future._state == PENDING:
-                self.future._state = RUNNING
-            elif self.future._state == CANCELLED:
-                self.completion_tracker._condition.acquire()
-                try:
-                    self.future._state = CANCELLED_AND_NOTIFIED
-                    self.completion_tracker.add_cancelled()
-                    return
-                finally:
-                    self.completion_tracker._condition.release()
-            else:
-                LOGGER.critical('Future %s in unexpected state: %d',
-                                id(self.future),
-                                self.future._state)
-                return
-        finally:
-            self.future._condition.release()
+        if not self.future.set_running_or_notify_cancel():
+            return
 
         try:
-            result = self.call()
-        except Exception, e:
-            set_future_exception(self.future, self.completion_tracker, e)
+            result = self.fn(*self.args, **self.kwargs)
+        except BaseException as e:
+            self.future.set_exception(e)
         else:
-            set_future_result(self.future, self.completion_tracker, result)
+            self.future.set_result(result)
 
 def _worker(executor_reference, work_queue):
     try:
@@ -105,10 +84,10 @@
                 del executor
             else:
                 work_item.run()
-    except Exception, e:
-        LOGGER.critical('Exception in worker', exc_info=True)
+    except BaseException as e:
+        _base.LOGGER.critical('Exception in worker', exc_info=True)
 
-class ThreadPoolExecutor(Executor):
+class ThreadPoolExecutor(_base.Executor):
     def __init__(self, max_workers):
         """Initializes a new ThreadPoolExecutor instance.
 
@@ -124,42 +103,34 @@
         self._shutdown = False
         self._shutdown_lock = threading.Lock()
 
+    def submit(self, fn, *args, **kwargs):
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot schedule new futures after shutdown')
+
+            f = _base.Future()
+            w = _WorkItem(f, fn, args, kwargs)
+
+            self._work_queue.put(w)
+            self._adjust_thread_count()
+            return f
+    submit.__doc__ = _base.Executor.submit.__doc__
+
     def _adjust_thread_count(self):
-        for _ in range(len(self._threads),
-                       min(self._max_workers, self._work_queue.qsize())):
+        # TODO(bquinlan): Should avoid creating new threads if there are more
+        # idle threads than items in the work queue.
+        if len(self._threads) < self._max_workers:
             t = threading.Thread(target=_worker,
                                  args=(weakref.ref(self), self._work_queue))
-            t.setDaemon(True)
+            t.daemon = True
             t.start()
             self._threads.add(t)
             _thread_references.add(weakref.ref(t))
 
-    def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
-        self._shutdown_lock.acquire()
-        try:
-            if self._shutdown:
-                raise RuntimeError('cannot run new futures after shutdown')
-
-            futures = []
-            event_sink = ThreadEventSink()
-            for index, call in enumerate(calls):
-                f = Future(index)
-                w = _WorkItem(call, f, event_sink)
-                self._work_queue.put(w)
-                futures.append(f)
-    
-            self._adjust_thread_count()
-            fl = FutureList(futures, event_sink)
-            fl.wait(timeout=timeout, return_when=return_when)
-            return fl
-        finally:
-            self._shutdown_lock.release()
-    run_to_futures.__doc__ = Executor.run_to_futures.__doc__
-
-    def shutdown(self):
-        self._shutdown_lock.acquire()
-        try:
+    def shutdown(self, wait=True):
+        with self._shutdown_lock:
             self._shutdown = True
-        finally:
-            self._shutdown_lock.release()
-    shutdown.__doc__ = Executor.shutdown.__doc__
+        if wait:
+            for t in self._threads:
+                t.join()
+    shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/python2/primes.py b/python2/primes.py
index 0b2bf81..fa6c355 100644
--- a/python2/primes.py
+++ b/python2/primes.py
@@ -25,29 +25,23 @@
     return list(map(is_prime, PRIMES))
 
 def with_process_pool_executor():
-    executor = futures.ProcessPoolExecutor(10)
-    try:
+    with futures.ProcessPoolExecutor(10) as executor:
         return list(executor.map(is_prime, PRIMES))
-    finally:
-        executor.shutdown()
 
 def with_thread_pool_executor():
-    executor = futures.ThreadPoolExecutor(10)
-    try:
+    with futures.ThreadPoolExecutor(10) as executor:
         return list(executor.map(is_prime, PRIMES))
-    finally:
-        executor.shutdown()
 
 def main():
     for name, fn in [('sequential', sequential),
                      ('processes', with_process_pool_executor),
                      ('threads', with_thread_pool_executor)]:
-        print '%s: ' % name.ljust(12),
-
+        print name.ljust(12),
         start = time.time()
         if fn() != [True] * len(PRIMES):
             print 'failed'
         else:
             print '%.2f seconds' % (time.time() - start)
 
-main()
\ No newline at end of file
+if __name__ == '__main__':
+    main()
diff --git a/python2/setup.py b/python2/setup.py
index 897dc86..fcd05f2 100755
--- a/python2/setup.py
+++ b/python2/setup.py
@@ -1,10 +1,10 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 from distutils.core import setup
 
-setup(name='futures',
+setup(name='futures3',
       version='1.0',
-      description='Java-style futures implementation in Python 2.x',
+      description='Java-style futures implementation in Python 3.x',
       author='Brian Quinlan',
       author_email='[email protected]',
       url='http://code.google.com/p/pythonfutures',
@@ -14,5 +14,5 @@
       classifiers=['License :: OSI Approved :: BSD License',
                    'Development Status :: 5 - Production/Stable',
                    'Intended Audience :: Developers',
-                   'Programming Language :: Python :: 2']
+                   'Programming Language :: Python :: 3']
       )
diff --git a/python2/test_futures.py b/python2/test_futures.py
index cf74286..2d5672b 100644
--- a/python2/test_futures.py
+++ b/python2/test_futures.py
@@ -1,16 +1,25 @@
-import unittest
-import threading
-import time
+import logging
 import multiprocessing
+import re
+import StringIO
+import sys
+import threading
 from test import test_support
+import time
+import unittest
+
+if sys.platform.startswith('win'):
+    import ctypes
+    import ctypes.wintypes
 
 import futures
-import futures._base
 from futures._base import (
-    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
+    LOGGER, STDERR_HANDLER, wait)
+import futures.process
 
 def create_future(state=PENDING, exception=None, result=None):
-    f = Future(0)
+    f = Future()
     f._state = state
     f._exception = exception
     f._result = result
@@ -23,68 +32,104 @@
 EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
 SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
 
+def mul(x, y):
+    return x * y
+
 class Call(object):
+    """A call that can be submitted to a future.Executor for testing.
+
+    The call signals when it is called and waits for an event before finishing.
+    """
     CALL_LOCKS = {}
+    def _create_event(self):
+        if sys.platform.startswith('win'):
+            class SECURITY_ATTRIBUTES(ctypes.Structure):
+                _fields_ = [("nLength", ctypes.wintypes.DWORD),
+                            ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
+                            ("bInheritHandle", ctypes.wintypes.BOOL)]
+
+            s = SECURITY_ATTRIBUTES()
+            s.nLength = ctypes.sizeof(s)
+            s.lpSecurityDescriptor = None
+            s.bInheritHandle = True
+
+            handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
+                                                         True,
+                                                         False,
+                                                         None)
+            assert handle is not None
+            return handle
+        else:
+            event = multiprocessing.Event()
+            self.CALL_LOCKS[id(event)] = event
+            return id(event)
+
+    def _wait_on_event(self, handle):
+        if sys.platform.startswith('win'):
+            r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
+            assert r == 0
+        else:
+            self.CALL_LOCKS[handle].wait()
+
+    def _signal_event(self, handle):
+        if sys.platform.startswith('win'):
+            r = ctypes.windll.kernel32.SetEvent(handle)
+            assert r != 0
+        else:
+            self.CALL_LOCKS[handle].set()
+
     def __init__(self, manual_finish=False, result=42):
-        called_event = multiprocessing.Event()
-        can_finish = multiprocessing.Event()
+        self._called_event = self._create_event()
+        self._can_finish = self._create_event()
 
         self._result = result
-        self._called_event_id = id(called_event)
-        self._can_finish_event_id = id(can_finish)
-
-        self.CALL_LOCKS[self._called_event_id] = called_event
-        self.CALL_LOCKS[self._can_finish_event_id] = can_finish
 
         if not manual_finish:
-            self._can_finish.set()
-
-    @property
-    def _can_finish(self):
-        return self.CALL_LOCKS[self._can_finish_event_id]
-
-    @property
-    def _called_event(self):
-        return self.CALL_LOCKS[self._called_event_id]
+            self._signal_event(self._can_finish)
 
     def wait_on_called(self):
-        self._called_event.wait()
+        self._wait_on_event(self._called_event)
 
     def set_can(self):
-        self._can_finish.set()
-
-    def called(self):
-        return self._called_event.is_set()
+        self._signal_event(self._can_finish)
 
     def __call__(self):
-        if self._called_event.is_set(): print('called twice')
+        self._signal_event(self._called_event)
+        self._wait_on_event(self._can_finish)
 
-        self._called_event.set()
-        self._can_finish.wait()
         return self._result
 
     def close(self):
-        del self.CALL_LOCKS[self._called_event_id]
-        del self.CALL_LOCKS[self._can_finish_event_id]
+        self.set_can()
+        if sys.platform.startswith('win'):
+            ctypes.windll.kernel32.CloseHandle(self._called_event)
+            ctypes.windll.kernel32.CloseHandle(self._can_finish)
+        else:
+            del self.CALL_LOCKS[self._called_event]
+            del self.CALL_LOCKS[self._can_finish]
 
 class ExceptionCall(Call):
     def __call__(self):
-        assert not self._called_event.is_set(), 'already called'
-
-        self._called_event.set()
-        self._can_finish.wait()
+        self._signal_event(self._called_event)
+        self._wait_on_event(self._can_finish)
         raise ZeroDivisionError()
 
+class MapCall(Call):
+    def __init__(self, result=42):
+        super(MapCall, self).__init__(manual_finish=True, result=result)
+
+    def __call__(self, manual_finish):
+        if manual_finish:
+            super(MapCall, self).__call__()
+        return self._result
+
 class ExecutorShutdownTest(unittest.TestCase):
     def test_run_after_shutdown(self):
-        call1 = Call()
-        try:
-            self.executor.shutdown()
-            self.assertRaises(RuntimeError,
-                              self.executor.run_to_futures,
-                              [call1])
-        finally:
-            call1.close()
+        self.executor.shutdown()
+        self.assertRaises(RuntimeError,
+                          self.executor.submit,
+                          pow, 2, 5)
+
 
     def _start_some_futures(self):
         call1 = Call(manual_finish=True)
@@ -92,13 +137,14 @@
         call3 = Call(manual_finish=True)
 
         try:
-            self.executor.run_to_futures([call1, call2, call3],
-                                         return_when=futures.RETURN_IMMEDIATELY)
-    
+            self.executor.submit(call1)
+            self.executor.submit(call2)
+            self.executor.submit(call3)
+
             call1.wait_on_called()
             call2.wait_on_called()
             call3.wait_on_called()
-    
+
             call1.set_can()
             call2.set_can()
             call3.set_can()
@@ -112,7 +158,7 @@
         self.executor = futures.ThreadPoolExecutor(max_workers=5)
 
     def tearDown(self):
-        self.executor.shutdown()
+        self.executor.shutdown(wait=True)
 
     def test_threads_terminate(self):
         self._start_some_futures()
@@ -144,15 +190,15 @@
         self.executor = futures.ProcessPoolExecutor(max_workers=5)
 
     def tearDown(self):
-        self.executor.shutdown()
+        self.executor.shutdown(wait=True)
 
     def test_processes_terminate(self):
         self._start_some_futures()
         self.assertEqual(len(self.executor._processes), 5)
+        processes = self.executor._processes
         self.executor.shutdown()
 
-        self.executor._queue_management_thread.join()
-        for p in self.executor._processes:
+        for p in processes:
             p.join()
 
     def test_context_manager_shutdown(self):
@@ -161,8 +207,7 @@
             self.assertEqual(list(e.map(abs, range(-5, 5))),
                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 
-        executor._queue_management_thread.join()
-        for p in executor._processes:
+        for p in self.executor._processes:
             p.join()
 
     def test_del_shutdown(self):
@@ -176,312 +221,316 @@
         for p in processes:
             p.join()
 
-class WaitsTest(unittest.TestCase):
-    def test_concurrent_waits(self):
-        def wait_for_ALL_COMPLETED():
-            fs.wait(return_when=futures.ALL_COMPLETED)
-            self.assertTrue(f1.done())
-            self.assertTrue(f2.done())
-            self.assertTrue(f3.done())
-            self.assertTrue(f4.done())
-            all_completed.release()
+class WaitTests(unittest.TestCase):
+    def test_first_completed(self):
+        def wait_test():
+            while not future1._waiters:
+                pass
+            call1.set_can()
 
-        def wait_for_FIRST_COMPLETED():
-            fs.wait(return_when=futures.FIRST_COMPLETED)
-            self.assertTrue(f1.done())
-            self.assertFalse(f2.done())
-            self.assertFalse(f3.done())
-            self.assertFalse(f4.done())
-            first_completed.release()
-        def wait_for_FIRST_EXCEPTION():
-            fs.wait(return_when=futures.FIRST_EXCEPTION)
-            self.assertTrue(f1.done())
-            self.assertTrue(f2.done())
-            self.assertFalse(f3.done())
-            self.assertFalse(f4.done())
-            first_exception.release()
+        call1 = Call(manual_finish=True)
+        call2 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+            future2 = self.executor.submit(call2)
 
-        all_completed = threading.Semaphore(0)
-        first_completed = threading.Semaphore(0)
-        first_exception = threading.Semaphore(0)
+            t = threading.Thread(target=wait_test)
+            t.start()
+            done, not_done = futures.wait(
+                    [CANCELLED_FUTURE, future1, future2],
+                     return_when=futures.FIRST_COMPLETED)
+
+            self.assertEquals(set([future1]), done)
+            self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
+        finally:
+            call1.close()
+            call2.close()
+
+    def test_first_completed_one_already_completed(self):
+        call1 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+
+            finished, pending = futures.wait(
+                     [SUCCESSFUL_FUTURE, future1],
+                     return_when=futures.FIRST_COMPLETED)
+
+            self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
+            self.assertEquals(set([future1]), pending)
+        finally:
+            call1.close()
+
+    def test_first_exception(self):
+        def wait_test():
+            while not future1._waiters:
+                pass
+            call1.set_can()
+            call2.set_can()
 
         call1 = Call(manual_finish=True)
         call2 = ExceptionCall(manual_finish=True)
         call3 = Call(manual_finish=True)
-        call4 = Call()
-
         try:
-            fs = self.executor.run_to_futures(
-                    [call1, call2, call3, call4],
-                    return_when=futures.RETURN_IMMEDIATELY)
-            f1, f2, f3, f4 = fs
-    
-            threads = []
-            for wait_test in [wait_for_ALL_COMPLETED,
-                              wait_for_FIRST_COMPLETED,
-                              wait_for_FIRST_EXCEPTION]:
-                t = threading.Thread(target=wait_test)
-                t.start()
-                threads.append(t)
-    
-            time.sleep(1)   # give threads enough time to execute wait
+            future1 = self.executor.submit(call1)
+            future2 = self.executor.submit(call2)
+            future3 = self.executor.submit(call3)
 
-            call1.set_can()
-            first_completed.acquire()
-            call2.set_can()        
-            first_exception.acquire()
-            call3.set_can()
-            all_completed.acquire()
-    
-            self.executor.shutdown()
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-            call4.close()
-
-class ThreadPoolWaitTests(WaitsTest):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown()
-
-class ProcessPoolWaitTests(WaitsTest):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown()
-
-class CancelTests(unittest.TestCase):
-    def test_cancel_states(self):
-        call1 = Call(manual_finish=True)
-        call2 = Call()
-        call3 = Call()
-        call4 = Call()
-
-        try:
-            fs = self.executor.run_to_futures(
-                    [call1, call2, call3, call4],
-                    return_when=futures.RETURN_IMMEDIATELY)
-            f1, f2, f3, f4 = fs
-    
-            call1.wait_on_called()
-            self.assertEqual(f1.cancel(), False)
-            self.assertEqual(f2.cancel(), True)
-            self.assertEqual(f4.cancel(), True)
-            self.assertEqual(f1.cancelled(), False)
-            self.assertEqual(f2.cancelled(), True)
-            self.assertEqual(f3.cancelled(), False)
-            self.assertEqual(f4.cancelled(), True)
-            self.assertEqual(f1.done(), False)
-            self.assertEqual(f2.done(), True)
-            self.assertEqual(f3.done(), False)
-            self.assertEqual(f4.done(), True)
-    
-            call1.set_can()
-            fs.wait(return_when=futures.ALL_COMPLETED)
-            self.assertEqual(f1.result(), 42)
-            self.assertRaises(futures.CancelledError, f2.result)
-            self.assertRaises(futures.CancelledError, f2.exception)
-            self.assertEqual(f3.result(), 42)
-            self.assertRaises(futures.CancelledError, f4.result)
-            self.assertRaises(futures.CancelledError, f4.exception)
-    
-            self.assertEqual(call2.called(), False)
-            self.assertEqual(call4.called(), False)
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-            call4.close()
-
-    def test_wait_for_individual_cancel_while_waiting(self):
-        def end_call():
-            # Wait until the main thread is waiting on the results of the
-            # future.
-            time.sleep(1)
-            f2.cancel()
-            call1.set_can()
-
-        call1 = Call(manual_finish=True)
-        call2 = Call()
-
-        try:
-            fs = self.executor.run_to_futures(
-                    [call1, call2],
-                    return_when=futures.RETURN_IMMEDIATELY)
-            f1, f2 = fs
-    
-            call1.wait_on_called()
-            t = threading.Thread(target=end_call)
+            t = threading.Thread(target=wait_test)
             t.start()
-            self.assertRaises(futures.CancelledError, f2.result)
-            self.assertRaises(futures.CancelledError, f2.exception)
-            t.join()
+            finished, pending = futures.wait(
+                    [future1, future2, future3],
+                    return_when=futures.FIRST_EXCEPTION)
+
+            self.assertEquals(set([future1, future2]), finished)
+            self.assertEquals(set([future3]), pending)
+        finally:
+            call1.close()
+            call2.close()
+            call3.close()
+
+    def test_first_exception_some_already_complete(self):
+        def wait_test():
+            while not future1._waiters:
+                pass
+            call1.set_can()
+
+        call1 = ExceptionCall(manual_finish=True)
+        call2 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+            future2 = self.executor.submit(call2)
+
+            t = threading.Thread(target=wait_test)
+            t.start()
+            finished, pending = futures.wait(
+                    [SUCCESSFUL_FUTURE,
+                     CANCELLED_FUTURE,
+                     CANCELLED_AND_NOTIFIED_FUTURE,
+                     future1, future2],
+                    return_when=futures.FIRST_EXCEPTION)
+
+            self.assertEquals(set([SUCCESSFUL_FUTURE,
+                                   CANCELLED_AND_NOTIFIED_FUTURE,
+                                   future1]), finished)
+            self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
+
+
         finally:
             call1.close()
             call2.close()
 
-    def test_wait_with_already_cancelled_futures(self):
+    def test_first_exception_one_already_failed(self):
         call1 = Call(manual_finish=True)
-        call2 = Call()
-        call3 = Call()
-        call4 = Call()
-
         try:
-            fs = self.executor.run_to_futures(
-                    [call1, call2, call3, call4],
-                    return_when=futures.RETURN_IMMEDIATELY)
-            f1, f2, f3, f4 = fs
-    
-            call1.wait_on_called()
-            self.assertTrue(f2.cancel())
-            self.assertTrue(f3.cancel())
+            future1 = self.executor.submit(call1)
+
+            finished, pending = futures.wait(
+                     [EXCEPTION_FUTURE, future1],
+                     return_when=futures.FIRST_EXCEPTION)
+
+            self.assertEquals(set([EXCEPTION_FUTURE]), finished)
+            self.assertEquals(set([future1]), pending)
+        finally:
+            call1.close()
+
+    def test_all_completed(self):
+        def wait_test():
+            while not future1._waiters:
+                pass
             call1.set_can()
-    
-            fs.wait(return_when=futures.ALL_COMPLETED)
+            call2.set_can()
+
+        call1 = Call(manual_finish=True)
+        call2 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+            future2 = self.executor.submit(call2)
+
+            t = threading.Thread(target=wait_test)
+            t.start()
+            finished, pending = futures.wait(
+                    [future1, future2],
+                    return_when=futures.ALL_COMPLETED)
+
+            self.assertEquals(set([future1, future2]), finished)
+            self.assertEquals(set(), pending)
+
+
+        finally:
+            call1.close()
+            call2.close()
+
+    def test_all_completed_some_already_completed(self):
+        def wait_test():
+            while not future1._waiters:
+                pass
+
+            future4.cancel()
+            call1.set_can()
+            call2.set_can()
+            call3.set_can()
+
+        self.assertTrue(
+                futures.process.EXTRA_QUEUED_CALLS <= 1,
+               'this test assumes that future4 will be cancelled before it is '
+               'queued to run - which might not be the case if '
+               'ProcessPoolExecutor is too aggresive in scheduling futures')
+        call1 = Call(manual_finish=True)
+        call2 = Call(manual_finish=True)
+        call3 = Call(manual_finish=True)
+        call4 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+            future2 = self.executor.submit(call2)
+            future3 = self.executor.submit(call3)
+            future4 = self.executor.submit(call4)
+
+            t = threading.Thread(target=wait_test)
+            t.start()
+            finished, pending = futures.wait(
+                    [SUCCESSFUL_FUTURE,
+                     CANCELLED_AND_NOTIFIED_FUTURE,
+                     future1, future2, future3, future4],
+                    return_when=futures.ALL_COMPLETED)
+
+            self.assertEquals(set([SUCCESSFUL_FUTURE,
+                                   CANCELLED_AND_NOTIFIED_FUTURE,
+                                   future1, future2, future3, future4]),
+                              finished)
+            self.assertEquals(set(), pending)
         finally:
             call1.close()
             call2.close()
             call3.close()
             call4.close()
 
-    def test_cancel_all(self):
-        call1 = Call(manual_finish=True)
-        call2 = Call()
-        call3 = Call()
-        call4 = Call()
-
-        try:
-            fs = self.executor.run_to_futures(
-                    [call1, call2, call3, call4],
-                    return_when=futures.RETURN_IMMEDIATELY)
-            f1, f2, f3, f4 = fs
-    
-            call1.wait_on_called()
-            self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0)
+    def test_timeout(self):
+        def wait_test():
+            while not future1._waiters:
+                pass
             call1.set_can()
-            fs.cancel()
-    
-            self.assertFalse(f1.cancelled())
-            self.assertTrue(f2.cancelled())
-            self.assertTrue(f3.cancelled())
-            self.assertTrue(f4.cancelled())
+
+        call1 = Call(manual_finish=True)
+        call2 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+            future2 = self.executor.submit(call2)
+
+            t = threading.Thread(target=wait_test)
+            t.start()
+            finished, pending = futures.wait(
+                    [CANCELLED_AND_NOTIFIED_FUTURE,
+                     EXCEPTION_FUTURE,
+                     SUCCESSFUL_FUTURE,
+                     future1, future2],
+                    timeout=1,
+                    return_when=futures.ALL_COMPLETED)
+
+            self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                                   EXCEPTION_FUTURE,
+                                   SUCCESSFUL_FUTURE,
+                                   future1]), finished)
+            self.assertEquals(set([future2]), pending)
+
+
         finally:
             call1.close()
             call2.close()
-            call3.close()
-            call4.close()
 
-class ThreadPoolCancelTests(CancelTests):
+
+class ThreadPoolWaitTests(WaitTests):
     def setUp(self):
         self.executor = futures.ThreadPoolExecutor(max_workers=1)
 
     def tearDown(self):
-        self.executor.shutdown()
+        self.executor.shutdown(wait=True)
 
-class ProcessPoolCancelTests(WaitsTest):
+class ProcessPoolWaitTests(WaitTests):
     def setUp(self):
         self.executor = futures.ProcessPoolExecutor(max_workers=1)
 
     def tearDown(self):
-        self.executor.shutdown()
+        self.executor.shutdown(wait=True)
+
+class AsCompletedTests(unittest.TestCase):
+    # TODO([email protected]): Should have a test with a non-zero timeout.
+    def test_no_timeout(self):
+        def wait_test():
+            while not future1._waiters:
+                pass
+            call1.set_can()
+            call2.set_can()
+
+        call1 = Call(manual_finish=True)
+        call2 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+            future2 = self.executor.submit(call2)
+
+            t = threading.Thread(target=wait_test)
+            t.start()
+            completed = set(futures.as_completed(
+                    [CANCELLED_AND_NOTIFIED_FUTURE,
+                     EXCEPTION_FUTURE,
+                     SUCCESSFUL_FUTURE,
+                     future1, future2]))
+            self.assertEquals(set(
+                    [CANCELLED_AND_NOTIFIED_FUTURE,
+                     EXCEPTION_FUTURE,
+                     SUCCESSFUL_FUTURE,
+                     future1, future2]),
+                    completed)
+        finally:
+            call1.close()
+            call2.close()
+
+    def test_zero_timeout(self):
+        call1 = Call(manual_finish=True)
+        try:
+            future1 = self.executor.submit(call1)
+            completed_futures = set()
+            try:
+                for future in futures.as_completed(
+                        [CANCELLED_AND_NOTIFIED_FUTURE,
+                         EXCEPTION_FUTURE,
+                         SUCCESSFUL_FUTURE,
+                         future1],
+                        timeout=0):
+                    completed_futures.add(future)
+            except futures.TimeoutError:
+                pass
+
+            self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                                   EXCEPTION_FUTURE,
+                                   SUCCESSFUL_FUTURE]),
+                              completed_futures)
+        finally:
+            call1.close()
+
+class ThreadPoolAsCompletedTests(AsCompletedTests):
+    def setUp(self):
+        self.executor = futures.ThreadPoolExecutor(max_workers=1)
+
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
+
+class ProcessPoolAsCompletedTests(AsCompletedTests):
+    def setUp(self):
+        self.executor = futures.ProcessPoolExecutor(max_workers=1)
+
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
 
 class ExecutorTest(unittest.TestCase):
     # Executor.shutdown() and context manager usage is tested by
     # ExecutorShutdownTest.
-    def test_run_to_futures(self):
-        call1 = Call(result=1)
-        call2 = Call(result=2)
-        call3 = Call(manual_finish=True)
-        call4 = Call()
-        call5 = Call()
+    def test_submit(self):
+        future = self.executor.submit(pow, 2, 8)
+        self.assertEquals(256, future.result())
 
-        try:
-            f1, f2, f3, f4, f5 = self.executor.run_to_futures(
-                    [call1, call2, call3, call4, call5],
-                    return_when=futures.RETURN_IMMEDIATELY)
-    
-            call3.wait_on_called()
-
-            # ProcessPoolExecutor uses a thread to propogate results into the
-            # future. Calling result() ensures that the thread has done its work
-            # before doing the next set of checks.
-            f1.result()  
-            f2.result()
-
-            self.assertTrue(f1.done())
-            self.assertFalse(f1.running())
-            self.assertEqual(f1.index, 0)
-    
-            self.assertTrue(f2.done())
-            self.assertFalse(f2.running())
-            self.assertEqual(f2.index, 1)
-    
-            self.assertFalse(f3.done())
-            self.assertTrue(f3.running())
-            self.assertEqual(f3.index, 2)
-
-            # ProcessPoolExecutor may mark some futures as running before they
-            # actually are so don't check these ones.
-            self.assertFalse(f4.done())
-            self.assertEqual(f4.index, 3)
-    
-            self.assertFalse(f5.done())
-            self.assertEqual(f5.index, 4)
-        finally:
-            call3.set_can()  # Let the call finish executing.
-            call1.close()
-            call2.close()
-            call3.close()
-            call4.close()
-            call5.close()
-
-    def test_run_to_results(self):
-        call1 = Call(result=1)
-        call2 = Call(result=2)
-        call3 = Call(result=3)
-        try:
-            self.assertEqual(
-                    list(self.executor.run_to_results([call1, call2, call3])),
-                    [1, 2, 3])
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-
-    def test_run_to_results_exception(self):
-        call1 = Call(result=1)
-        call2 = Call(result=2)
-        call3 = ExceptionCall()
-        try:
-            i = self.executor.run_to_results([call1, call2, call3])
-    
-            self.assertEqual(i.next(), 1)
-            self.assertEqual(i.next(), 2)
-            self.assertRaises(ZeroDivisionError, i.next)
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-
-    def test_run_to_results_timeout(self):
-        call1 = Call(result=1)
-        call2 = Call(result=2)
-        call3 = Call(manual_finish=True)
-
-        try:
-            i = self.executor.run_to_results([call1, call2, call3], timeout=1)
-            self.assertEqual(i.next(), 1)
-            self.assertEqual(i.next(), 2)
-            self.assertRaises(futures.TimeoutError, i.next)
-            call3.set_can()
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
+    def test_submit_keyword(self):
+        future = self.executor.submit(mul, 2, y=8)
+        self.assertEquals(16, future.result())
 
     def test_map(self):
         self.assertEqual(
@@ -494,36 +543,142 @@
         self.assertEqual(i.next(), (0, 1))
         self.assertRaises(ZeroDivisionError, i.next)
 
+    def test_map_timeout(self):
+        results = []
+        timeout_call = MapCall()
+        try:
+            try:
+                for i in self.executor.map(timeout_call,
+                                           [False, False, True],
+                                           timeout=1):
+                    results.append(i)
+            except futures.TimeoutError:
+                pass
+            else:
+                self.fail('expected TimeoutError')
+        finally:
+            timeout_call.close()
+
+        self.assertEquals([42, 42], results)
+
 class ThreadPoolExecutorTest(ExecutorTest):
     def setUp(self):
         self.executor = futures.ThreadPoolExecutor(max_workers=1)
 
     def tearDown(self):
-        self.executor.shutdown()
+        self.executor.shutdown(wait=True)
 
 class ProcessPoolExecutorTest(ExecutorTest):
     def setUp(self):
         self.executor = futures.ProcessPoolExecutor(max_workers=1)
 
     def tearDown(self):
-        self.executor.shutdown()
+        self.executor.shutdown(wait=True)
 
 class FutureTests(unittest.TestCase):
-    # Future.index() is tested by ExecutorTest
-    # Future.cancel() is further tested by CancelTests.
+    def test_done_callback_with_result(self):
+        self.callback_result = None
+        def fn(callback_future):
+            self.callback_result = callback_future.result()
+
+        f = Future()
+        f.add_done_callback(fn)
+        f.set_result(5)
+        self.assertEquals(5, self.callback_result)
+
+    def test_done_callback_with_exception(self):
+        self.callback_exception = None
+        def fn(callback_future):
+            self.callback_exception = callback_future.exception()
+
+        f = Future()
+        f.add_done_callback(fn)
+        f.set_exception(Exception('test'))
+        self.assertEquals(('test',), self.callback_exception.args)
+
+    def test_done_callback_with_cancel(self):
+        self.was_cancelled = None
+        def fn(callback_future):
+            self.was_cancelled = callback_future.cancelled()
+
+        f = Future()
+        f.add_done_callback(fn)
+        self.assertTrue(f.cancel())
+        self.assertTrue(self.was_cancelled)
+
+    def test_done_callback_raises(self):
+        LOGGER.removeHandler(STDERR_HANDLER)
+        logging_stream = StringIO.StringIO()
+        handler = logging.StreamHandler(logging_stream)
+        LOGGER.addHandler(handler)
+        try:
+            self.raising_was_called = False
+            self.fn_was_called = False
+
+            def raising_fn(callback_future):
+                self.raising_was_called = True
+                raise Exception('doh!')
+
+            def fn(callback_future):
+                self.fn_was_called = True
+
+            f = Future()
+            f.add_done_callback(raising_fn)
+            f.add_done_callback(fn)
+            f.set_result(5)
+            self.assertTrue(self.raising_was_called)
+            self.assertTrue(self.fn_was_called)
+            self.assertTrue('Exception: doh!' in logging_stream.getvalue())
+        finally:
+            LOGGER.removeHandler(handler)
+            LOGGER.addHandler(STDERR_HANDLER)
+
+    def test_done_callback_already_successful(self):
+        self.callback_result = None
+        def fn(callback_future):
+            self.callback_result = callback_future.result()
+
+        f = Future()
+        f.set_result(5)
+        f.add_done_callback(fn)
+        self.assertEquals(5, self.callback_result)
+
+    def test_done_callback_already_failed(self):
+        self.callback_exception = None
+        def fn(callback_future):
+            self.callback_exception = callback_future.exception()
+
+        f = Future()
+        f.set_exception(Exception('test'))
+        f.add_done_callback(fn)
+        self.assertEquals(('test',), self.callback_exception.args)
+
+    def test_done_callback_already_cancelled(self):
+        self.was_cancelled = None
+        def fn(callback_future):
+            self.was_cancelled = callback_future.cancelled()
+
+        f = Future()
+        self.assertTrue(f.cancel())
+        f.add_done_callback(fn)
+        self.assertTrue(self.was_cancelled)
 
     def test_repr(self):
-        self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>')
-        self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>')
-        self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>')
-        self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE),
-                         '<Future state=cancelled>')
-        self.assertEqual(repr(EXCEPTION_FUTURE),
-                         '<Future state=finished raised IOError>')
-        self.assertEqual(repr(SUCCESSFUL_FUTURE),
-                         '<Future state=finished returned int>')
+        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=pending>',
+                                 repr(PENDING_FUTURE)))
+        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=running>',
+                                 repr(RUNNING_FUTURE)))
+        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
+                                 repr(CANCELLED_FUTURE)))
+        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
+                                 repr(CANCELLED_AND_NOTIFIED_FUTURE)))
+        self.assertTrue(re.match(
+                '<Future at 0x[0-9a-f]+L? state=finished raised IOError>',
+                repr(EXCEPTION_FUTURE)))
+        self.assertTrue(re.match(
+                '<Future at 0x[0-9a-f]+L? state=finished returned int>',
+                repr(SUCCESSFUL_FUTURE)))
 
-        create_future
 
     def test_cancel(self):
         f1 = create_future(state=PENDING)
@@ -588,13 +743,11 @@
         self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
 
     def test_result_with_success(self):
+        # TODO([email protected]): This test is timing dependant.
         def notification():
             # Wait until the main thread is waiting for the result.
             time.sleep(1)
-            with f1._condition:
-                f1._state = FINISHED
-                f1._result = 42
-                f1._condition.notify_all()
+            f1.set_result(42)
 
         f1 = create_future(state=PENDING)
         t = threading.Thread(target=notification)
@@ -603,12 +756,11 @@
         self.assertEquals(f1.result(timeout=5), 42)
 
     def test_result_with_cancel(self):
+        # TODO([email protected]): This test is timing dependant.
         def notification():
             # Wait until the main thread is waiting for the result.
             time.sleep(1)
-            with f1._condition:
-                f1._state = CANCELLED
-                f1._condition.notify_all()
+            f1.cancel()
 
         f1 = create_future(state=PENDING)
         t = threading.Thread(target=notification)
@@ -644,186 +796,16 @@
 
         self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
 
-class FutureListTests(unittest.TestCase):
-    # FutureList.wait() is further tested by WaitsTest.
-    # FutureList.cancel() is tested by CancelTests.
-    def test_wait_RETURN_IMMEDIATELY(self):
-        f = futures.FutureList(futures=None, event_sink=None)
-        f.wait(return_when=futures.RETURN_IMMEDIATELY)
-
-    def test_wait_timeout(self):
-        f = futures.FutureList([PENDING_FUTURE],
-                               futures._base.ThreadEventSink())
-
-        for t in [futures.FIRST_COMPLETED,
-                  futures.FIRST_EXCEPTION,
-                  futures.ALL_COMPLETED]:
-            f.wait(timeout=0.1, return_when=t)
-            self.assertFalse(PENDING_FUTURE.done())
-
-    def test_wait_all_done(self):
-        f = futures.FutureList([CANCELLED_FUTURE,
-                                CANCELLED_AND_NOTIFIED_FUTURE,
-                                SUCCESSFUL_FUTURE,
-                                EXCEPTION_FUTURE],
-                               futures._base.ThreadEventSink())
-
-        f.wait(return_when=futures.ALL_COMPLETED)
-
-    def test_filters(self):
-        fs = [PENDING_FUTURE,
-              RUNNING_FUTURE,
-              CANCELLED_FUTURE,
-              CANCELLED_AND_NOTIFIED_FUTURE,
-              EXCEPTION_FUTURE,
-              SUCCESSFUL_FUTURE]
-        f = futures.FutureList(fs, None)
-
-        self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE])
-        self.assertEqual(list(f.cancelled_futures()),
-                        [CANCELLED_FUTURE,
-                         CANCELLED_AND_NOTIFIED_FUTURE])
-        self.assertEqual(list(f.done_futures()),
-                         [CANCELLED_FUTURE,
-                          CANCELLED_AND_NOTIFIED_FUTURE,
-                          EXCEPTION_FUTURE,
-                          SUCCESSFUL_FUTURE])
-        self.assertEqual(list(f.successful_futures()),
-                         [SUCCESSFUL_FUTURE])
-        self.assertEqual(list(f.exception_futures()),
-                         [EXCEPTION_FUTURE])
-
-    def test_has_running_futures(self):
-        self.assertFalse(
-                futures.FutureList([PENDING_FUTURE,
-                                    CANCELLED_FUTURE,
-                                    CANCELLED_AND_NOTIFIED_FUTURE,
-                                    SUCCESSFUL_FUTURE,
-                                    EXCEPTION_FUTURE],
-                                   None).has_running_futures())
-        self.assertTrue(
-                futures.FutureList([RUNNING_FUTURE],
-                                   None).has_running_futures())
-
-    def test_has_cancelled_futures(self):
-        self.assertFalse(
-                futures.FutureList([PENDING_FUTURE,
-                                    RUNNING_FUTURE,
-                                    SUCCESSFUL_FUTURE,
-                                    EXCEPTION_FUTURE],
-                                   None).has_cancelled_futures())
-        self.assertTrue(
-                futures.FutureList([CANCELLED_FUTURE],
-                                   None).has_cancelled_futures())
-
-        self.assertTrue(
-                futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
-                                   None).has_cancelled_futures())
-
-    def test_has_done_futures(self):
-        self.assertFalse(
-                futures.FutureList([PENDING_FUTURE,
-                                    RUNNING_FUTURE],
-                                   None).has_done_futures())
-        self.assertTrue(
-                futures.FutureList([CANCELLED_FUTURE],
-                                   None).has_done_futures())
-
-        self.assertTrue(
-                futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
-                                   None).has_done_futures())
-
-        self.assertTrue(
-                futures.FutureList([EXCEPTION_FUTURE],
-                                   None).has_done_futures())
-
-        self.assertTrue(
-                futures.FutureList([SUCCESSFUL_FUTURE],
-                                   None).has_done_futures())
-
-    def test_has_successful_futures(self):
-        self.assertFalse(
-                futures.FutureList([PENDING_FUTURE,
-                                    RUNNING_FUTURE,
-                                    CANCELLED_FUTURE,
-                                    CANCELLED_AND_NOTIFIED_FUTURE,
-                                    EXCEPTION_FUTURE],
-                                   None).has_successful_futures())
-
-        self.assertTrue(
-                futures.FutureList([SUCCESSFUL_FUTURE],
-                                   None).has_successful_futures())
-
-    def test_has_exception_futures(self):
-        self.assertFalse(
-                futures.FutureList([PENDING_FUTURE,
-                                    RUNNING_FUTURE,
-                                    CANCELLED_FUTURE,
-                                    CANCELLED_AND_NOTIFIED_FUTURE,
-                                    SUCCESSFUL_FUTURE],
-                                   None).has_exception_futures())
-
-        self.assertTrue(
-                futures.FutureList([EXCEPTION_FUTURE],
-                                   None).has_exception_futures())
-
-    def test_get_item(self):
-        fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
-        f = futures.FutureList(fs, None)
-        self.assertEqual(f[0], PENDING_FUTURE)
-        self.assertEqual(f[1], RUNNING_FUTURE)
-        self.assertEqual(f[2], CANCELLED_FUTURE)
-        self.assertRaises(IndexError, f.__getitem__, 3)
-
-    def test_len(self):
-        f = futures.FutureList([PENDING_FUTURE,
-                                RUNNING_FUTURE,
-                                CANCELLED_FUTURE],
-                               None)
-        self.assertEqual(len(f), 3)
-
-    def test_iter(self):
-        fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
-        f = futures.FutureList(fs, None)
-        self.assertEqual(list(iter(f)), fs)
-
-    def test_contains(self):
-        f = futures.FutureList([PENDING_FUTURE,
-                                RUNNING_FUTURE],
-                               None)
-        self.assertTrue(PENDING_FUTURE in f)
-        self.assertTrue(RUNNING_FUTURE in f)
-        self.assertFalse(CANCELLED_FUTURE in f)
-
-    def test_repr(self):
-        pending = create_future(state=PENDING)
-        cancelled = create_future(state=CANCELLED)
-        cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED)
-        running = create_future(state=RUNNING)
-        finished = create_future(state=FINISHED)
-
-        f = futures.FutureList(
-                [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 +
-                [CANCELLED_AND_NOTIFIED_FUTURE] +
-                [RUNNING_FUTURE] * 2 +
-                [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3,
-                None)
-
-        self.assertEqual(repr(f),
-                         '<FutureList #futures=15 '
-                         '[#pending=4 #cancelled=3 #running=2 #finished=6]>')
-
 def test_main():
-    test_support.run_unittest(ProcessPoolCancelTests,
-                              ThreadPoolCancelTests,
-                              ProcessPoolExecutorTest,
+    test_support.run_unittest(ProcessPoolExecutorTest,
                               ThreadPoolExecutorTest,
                               ProcessPoolWaitTests,
                               ThreadPoolWaitTests,
+                              ProcessPoolAsCompletedTests,
+                              ThreadPoolAsCompletedTests,
                               FutureTests,
-                              FutureListTests,
                               ProcessPoolShutdownTest,
                               ThreadPoolShutdownTest)
 
 if __name__ == "__main__":
-    test_main()
\ No newline at end of file
+    test_main()