Update for PEP
diff --git a/python2/crawl.py b/python2/crawl.py
index cf4a218..f1d29ca 100644
--- a/python2/crawl.py
+++ b/python2/crawl.py
@@ -19,26 +19,29 @@
'http://www.youtube.com/',
'http://www.blogger.com/']
-def load_url(url):
- return urllib2.urlopen(url).read()
+def load_url(url, timeout):
+ return urllib2.urlopen(url, timeout=timeout).read()
-def download_urls_sequential(urls):
+def download_urls_sequential(urls, timeout=60):
url_to_content = {}
for url in urls:
try:
- url_to_content[url] = load_url(url)
+ url_to_content[url] = load_url(url, timeout=timeout)
except:
pass
return url_to_content
-def download_urls_with_executor(urls, executor):
+def download_urls_with_executor(urls, executor, timeout=60):
try:
url_to_content = {}
- fs = executor.run_to_futures(
- (functools.partial(load_url, url) for url in urls))
- for future in fs.successful_futures():
- url = urls[future.index]
- url_to_content[url] = future.result()
+ future_to_url = dict((executor.submit(load_url, url, timeout), url)
+ for url in urls)
+
+ for future in futures.as_completed(future_to_url):
+ try:
+ url_to_content[future_to_url[future]] = future.result()
+ except:
+ pass
return url_to_content
finally:
executor.shutdown()
@@ -46,19 +49,20 @@
def main():
for name, fn in [('sequential',
functools.partial(download_urls_sequential, URLS)),
- ('threads',
- functools.partial(download_urls_with_executor,
- URLS,
- futures.ThreadPoolExecutor(10))),
('processes',
functools.partial(download_urls_with_executor,
URLS,
+ futures.ProcessPoolExecutor(10))),
+ ('threads',
+ functools.partial(download_urls_with_executor,
+ URLS,
futures.ThreadPoolExecutor(10)))]:
- print '%s: ' % name.ljust(12),
+ print name.ljust(12),
start = time.time()
url_map = fn()
print '%.2f seconds (%d of %d downloaded)' % (time.time() - start,
- len(url_map),
- len(URLS))
+ len(url_map),
+ len(URLS))
-main()
+if __name__ == '__main__':
+ main()
diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py
index 27a5720..8331d53 100644
--- a/python2/futures/__init__.py
+++ b/python2/futures/__init__.py
@@ -1,18 +1,18 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
__author__ = 'Brian Quinlan ([email protected])'
-from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
- ALL_COMPLETED, RETURN_IMMEDIATELY,
- CancelledError, TimeoutError,
- Future, FutureList)
+from futures._base import (FIRST_COMPLETED,
+ FIRST_EXCEPTION,
+ ALL_COMPLETED,
+ CancelledError,
+ TimeoutError,
+ Future,
+ Executor,
+ wait,
+ as_completed)
+from futures.process import ProcessPoolExecutor
from futures.thread import ThreadPoolExecutor
-
-try:
- import multiprocessing
-except ImportError:
- pass
-else:
- from futures.process import ProcessPoolExecutor
diff --git a/python2/futures/_base.py b/python2/futures/_base.py
index bec7212..ed7a094 100644
--- a/python2/futures/_base.py
+++ b/python2/futures/_base.py
@@ -1,54 +1,24 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
__author__ = 'Brian Quinlan ([email protected])'
+import collections
+import functools
import logging
import threading
import time
-try:
- from functools import partial
-except ImportError:
- def partial(func, *args, **keywords):
- def newfunc(*fargs, **fkeywords):
- newkeywords = keywords.copy()
- newkeywords.update(fkeywords)
- return func(*(args + fargs), **newkeywords)
- newfunc.func = func
- newfunc.args = args
- newfunc.keywords = keywords
- return newfunc
-
-# The "any" and "all" builtins weren't introduced until Python 2.5.
-try:
- any
-except NameError:
- def any(iterable):
- for element in iterable:
- if element:
- return True
- return False
-
-try:
- all
-except NameError:
- def all(iterable):
- for element in iterable:
- if not element:
- return False
- return True
-
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
-RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
RUNNING = 'RUNNING'
# The future was cancelled by the user...
-CANCELLED = 'CANCELLED'
-# ...and ThreadEventSink.add_cancelled() was called by a worker.
+CANCELLED = 'CANCELLED'
+# ...and _Waiter.add_cancelled() was called by a worker.
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'
@@ -70,184 +40,249 @@
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("futures")
-_handler = logging.StreamHandler()
-LOGGER.addHandler(_handler)
-del _handler
-
-def set_future_exception(future, event_sink, exception):
- """Sets a future as having terminated with an exception.
-
- This function should only be used within the futures package.
-
- Args:
- future: The Future that finished with an exception.
- event_sink: The ThreadEventSink accociated with the Future's FutureList.
- The event_sink will be notified of the Future's completion, which
- may unblock some clients that have called FutureList.wait().
- exception: The expection that executing the Future raised.
- """
- future._condition.acquire()
- try:
- future._exception = exception
- event_sink._condition.acquire()
- try:
- future._state = FINISHED
- event_sink.add_exception()
- finally:
- event_sink._condition.release()
-
- future._condition.notifyAll()
- finally:
- future._condition.release()
-
-def set_future_result(future, event_sink, result):
- """Sets a future as having terminated without exception.
-
- This function should only be used within the futures package.
-
- Args:
- future: The Future that completed.
- event_sink: The ThreadEventSink accociated with the Future's FutureList.
- The event_sink will be notified of the Future's completion, which
- may unblock some clients that have called FutureList.wait().
- result: The value returned by the Future.
- """
- future._condition.acquire()
- try:
- future._result = result
- event_sink._condition.acquire()
- try:
- future._state = FINISHED
- event_sink.add_result()
- finally:
- event_sink._condition.release()
-
- future._condition.notifyAll()
- finally:
- future._condition.release()
+STDERR_HANDLER = logging.StreamHandler()
+LOGGER.addHandler(STDERR_HANDLER)
class Error(Exception):
+ """Base class for all future-related exceptions."""
pass
class CancelledError(Error):
+ """The Future was cancelled."""
pass
class TimeoutError(Error):
+ """The operation exceeded the given deadline."""
pass
-class _WaitTracker(object):
- """Provides the event that FutureList.wait(...) blocks on.
-
- """
+class _Waiter(object):
+ """Provides the event that wait() and as_completed() block on."""
def __init__(self):
self.event = threading.Event()
+ self.finished_futures = []
- def add_result(self):
- raise NotImplementedError()
+ def add_result(self, future):
+ self.finished_futures.append(future)
- def add_exception(self):
- raise NotImplementedError()
+ def add_exception(self, future):
+ self.finished_futures.append(future)
- def add_cancelled(self):
- raise NotImplementedError()
+ def add_cancelled(self, future):
+ self.finished_futures.append(future)
-class _FirstCompletedWaitTracker(_WaitTracker):
- """Used by wait(return_when=FIRST_COMPLETED)."""
+class _FirstCompletedWaiter(_Waiter):
+ """Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
- def add_result(self):
+ def add_result(self, future):
+ super(_FirstCompletedWaiter, self).add_result(future)
self.event.set()
- def add_exception(self):
+ def add_exception(self, future):
+ super(_FirstCompletedWaiter, self).add_exception(future)
self.event.set()
- def add_cancelled(self):
+ def add_cancelled(self, future):
+ super(_FirstCompletedWaiter, self).add_cancelled(future)
self.event.set()
-class _AllCompletedWaitTracker(_WaitTracker):
+class _AllCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
- _WaitTracker.__init__(self)
+ super(_AllCompletedWaiter, self).__init__()
- def add_result(self):
+ def _decrement_pending_calls(self):
self.num_pending_calls -= 1
if not self.num_pending_calls:
self.event.set()
- def add_exception(self):
+ def add_result(self, future):
+ super(_AllCompletedWaiter, self).add_result(future)
+ self._decrement_pending_calls()
+
+ def add_exception(self, future):
+ super(_AllCompletedWaiter, self).add_exception(future)
if self.stop_on_exception:
self.event.set()
else:
- self.add_result()
+ self._decrement_pending_calls()
- def add_cancelled(self):
- self.add_result()
+ def add_cancelled(self, future):
+ super(_AllCompletedWaiter, self).add_cancelled(future)
+ self._decrement_pending_calls()
-class ThreadEventSink(object):
- """Forwards events to many _WaitTrackers.
+class _AcquireFutures(object):
+ """A context manager that does an ordered acquire of Future conditions."""
- Each FutureList has a ThreadEventSink and each call to FutureList.wait()
- causes a new _WaitTracker to be added to the ThreadEventSink. This design
- allows many threads to call FutureList.wait() on the same FutureList with
- different arguments.
+ def __init__(self, futures):
+ self.futures = sorted(futures, key=id)
- This class should not be used by clients.
+ def __enter__(self):
+ for future in self.futures:
+ future._condition.acquire()
+
+ def __exit__(self, *args):
+ for future in self.futures:
+ future._condition.release()
+
+def _create_and_install_waiters(fs, return_when):
+ if return_when == FIRST_COMPLETED:
+ waiter = _FirstCompletedWaiter()
+ else:
+ pending_count = sum(
+ f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
+
+ if return_when == FIRST_EXCEPTION:
+ waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
+ elif return_when == ALL_COMPLETED:
+ waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
+ else:
+ raise ValueError("Invalid return condition: %r" % return_when)
+
+ for f in fs:
+ f._waiters.append(waiter)
+
+ return waiter
+
+def as_completed(fs, timeout=None):
+ """An iterator over the given futures that yields each as it completes.
+
+ Args:
+ fs: The sequence of Futures (possibly created by different Executors) to
+ iterate over.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator that yields the given Futures as they complete (finished or
+ cancelled).
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
"""
- def __init__(self):
- self._condition = threading.Lock()
- self._waiters = []
+ if timeout is not None:
+ end_time = timeout + time.time()
- def add(self, e):
- self._waiters.append(e)
+ with _AcquireFutures(fs):
+ finished = set(
+ f for f in fs
+ if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
+ pending = set(fs) - finished
+ waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
- def remove(self, e):
- self._waiters.remove(e)
+ try:
+ for future in finished:
+ yield future
- def add_result(self):
- for waiter in self._waiters:
- waiter.add_result()
+ while pending:
+ if timeout is None:
+ wait_timeout = None
+ else:
+ wait_timeout = end_time - time.time()
+ if wait_timeout < 0:
+ raise TimeoutError(
+ '%d (of %d) futures unfinished' % (
+ len(pending), len(fs)))
- def add_exception(self):
- for waiter in self._waiters:
- waiter.add_exception()
+ waiter.event.wait(timeout)
- def add_cancelled(self):
- for waiter in self._waiters:
- waiter.add_cancelled()
+ for future in waiter.finished_futures[:]:
+ yield future
+ waiter.finished_futures.remove(future)
+ pending.remove(future)
+
+ finally:
+ for f in fs:
+ f._waiters.remove(waiter)
+
+DoneAndNotDoneFutures = collections.namedtuple(
+ 'DoneAndNotDoneFutures', 'done not_done')
+def wait(fs, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the futures in the given sequence to complete.
+
+ Args:
+ fs: The sequence of Futures (possibly created by different Executors) to
+ wait upon.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when this function should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises an exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+
+ Returns:
+ A named 2-tuple of sets. The first set, named 'done', contains the
+ futures that completed (is finished or cancelled) before the wait
+ completed. The second set, named 'not_done', contains uncompleted
+ futures.
+ """
+ with _AcquireFutures(fs):
+ done = set(f for f in fs
+ if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
+ not_done = set(fs) - done
+
+ if (return_when == FIRST_COMPLETED) and done:
+ return DoneAndNotDoneFutures(done, not_done)
+ elif (return_when == FIRST_EXCEPTION) and done:
+ if any(f for f in done
+ if not f.cancelled() and f.exception() is not None):
+ return DoneAndNotDoneFutures(done, not_done)
+
+ if len(done) == len(fs):
+ return DoneAndNotDoneFutures(done, not_done)
+
+ waiter = _create_and_install_waiters(fs, return_when)
+
+ waiter.event.wait(timeout)
+ for f in fs:
+ f._waiters.remove(waiter)
+
+ done.update(waiter.finished_futures)
+ return DoneAndNotDoneFutures(done, set(fs) - done)
class Future(object):
"""Represents the result of an asynchronous computation."""
- def __init__(self, index):
+ def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
- self._index = index
+ self._waiters = []
+ self._done_callbacks = []
+
+ def _invoke_callbacks(self):
+ for callback in self._done_callbacks:
+ try:
+ callback(self)
+ except Exception:
+ LOGGER.exception('exception calling callback for %r', self)
def __repr__(self):
- self._condition.acquire()
- try:
+ with self._condition:
if self._state == FINISHED:
if self._exception:
- return '<Future state=%s raised %s>' % (
+ return '<Future at %s state=%s raised %s>' % (
+ hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
- return '<Future state=%s returned %s>' % (
+ return '<Future at %s state=%s returned %s>' % (
+ hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
- return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
- finally:
- self._condition.release()
-
- @property
- def index(self):
- """The index of the future in its FutureList."""
- return self._index
+ return '<Future at %s state=%s>' % (
+ hex(id(self)),
+ _STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
"""Cancel the future if possible.
@@ -255,40 +290,33 @@
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
- self._condition.acquire()
- try:
+ with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
- if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]:
- self._state = CANCELLED
- self._condition.notify_all()
- return True
- finally:
- self._condition.release()
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ return True
+
+ self._state = CANCELLED
+ self._condition.notify_all()
+
+ self._invoke_callbacks()
+ return True
def cancelled(self):
"""Return True if the future has cancelled."""
- self._condition.acquire()
- try:
+ with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
- finally:
- self._condition.release()
def running(self):
- self._condition.acquire()
- try:
+ """Return True if the future is currently executing."""
+ with self._condition:
return self._state == RUNNING
- finally:
- self._condition.release()
def done(self):
"""Return True of the future was cancelled or finished executing."""
- self._condition.acquire()
- try:
+ with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
- finally:
- self._condition.release()
def __get_result(self):
if self._exception:
@@ -296,6 +324,23 @@
else:
return self._result
+ def add_done_callback(self, fn):
+ """Attaches a callable that will be called when the future finishes.
+
+ Args:
+ fn: A callable that will be called with this future as its only
+ argument when the future completes or is cancelled. The callable
+ will always be called by a thread in the same process in which
+ it was added. If the future has already completed or been
+ cancelled then the callable will be called immediately. These
+ callables are called in the order that they were added.
+ """
+ with self._condition:
+ if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
+ self._done_callbacks.append(fn)
+ return
+ fn(self)
+
def result(self, timeout=None):
"""Return the result of the call that the future represents.
@@ -312,8 +357,7 @@
timeout.
Exception: If the call raised then that exception will be raised.
"""
- self._condition.acquire()
- try:
+ with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
@@ -327,8 +371,6 @@
return self.__get_result()
else:
raise TimeoutError()
- finally:
- self._condition.release()
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
@@ -348,8 +390,7 @@
timeout.
"""
- self._condition.acquire()
- try:
+ with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
@@ -363,237 +404,94 @@
return self._exception
else:
raise TimeoutError()
- finally:
- self._condition.release()
-class FutureList(object):
- def __init__(self, futures, event_sink):
- """Initializes the FutureList. Should not be called by clients."""
- self._futures = futures
- self._event_sink = event_sink
+ # The following methods should only be used by Executors and in tests.
+ def set_running_or_notify_cancel(self):
+ """Mark the future as running or process any cancel notifications.
- def wait(self, timeout=None, return_when=ALL_COMPLETED):
- """Wait for the futures in the list to complete.
+ Should only be used by Executor implementations and unit tests.
- Args:
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
- return_when: Indicates when the method should return. The options
- are:
+ If the future has been cancelled (cancel() was called and returned
+ True) then any threads waiting on the future completing (though calls
+ to as_completed() or wait()) are notified and False is returned.
- FIRST_COMPLETED - Return when any future finishes or is
- cancelled.
- FIRST_EXCEPTION - Return when any future finishes by raising an
- exception. If no future raises and exception
- then it is equivalent to ALL_COMPLETED.
- ALL_COMPLETED - Return when all futures finish or are cancelled.
- RETURN_IMMEDIATELY - Return without waiting (this is not likely
- to be a useful option but it is there to
- be symmetrical with the
- executor.run_to_futures() method.
+ If the future was not cancelled then it is put in the running state
+ (future calls to running() will return True) and True is returned.
+
+ This method should be called by Executor implementations before
+ executing the work associated with this future. If this method returns
+ False then the work should not be executed.
+
+ Returns:
+ False if the Future was cancelled, True otherwise.
Raises:
- TimeoutError: If the wait condition wasn't satisfied before the
- given timeout.
+ RuntimeError: if this method was already called or if set_result()
+ or set_exception() was called.
"""
- if return_when == RETURN_IMMEDIATELY:
- return
-
- # Futures cannot change state without this condition being held.
- self._event_sink._condition.acquire()
- try:
- # Make a quick exit if every future is already done. This check is
- # necessary because, if every future is in the
- # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
- # never receive any events.
- if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
- for f in self):
- return
-
- if return_when == FIRST_COMPLETED:
- completed_tracker = _FirstCompletedWaitTracker()
+ with self._condition:
+ if self._state == CANCELLED:
+ self._state = CANCELLED_AND_NOTIFIED
+ for waiter in self._waiters:
+ waiter.add_cancelled(self)
+ # self._condition.notify_all() is not necessary because
+ # self.cancel() triggers a notification.
+ return False
+ elif self._state == PENDING:
+ self._state = RUNNING
+ return True
else:
- # Calculate how many events are expected before every future
- # is complete. This can be done without holding the futures'
- # locks because a future cannot transition itself into either
- # of the states being looked for.
- pending_count = sum(
- f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
- for f in self)
+ LOGGER.critical('Future %s in unexpected state: %s',
+ id(self.future),
+ self.future._state)
+ raise RuntimeError('Future in unexpected state')
- if return_when == FIRST_EXCEPTION:
- completed_tracker = _AllCompletedWaitTracker(
- pending_count, stop_on_exception=True)
- elif return_when == ALL_COMPLETED:
- completed_tracker = _AllCompletedWaitTracker(
- pending_count, stop_on_exception=False)
+ def set_result(self, result):
+ """Sets the return value of work associated with the future.
- self._event_sink.add(completed_tracker)
- finally:
- self._event_sink._condition.release()
-
- try:
- completed_tracker.event.wait(timeout)
- finally:
- self._event_sink.remove(completed_tracker)
-
- def cancel(self, timeout=None):
- """Cancel the futures in the list.
-
- Args:
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
-
- Raises:
- TimeoutError: If all the futures were not finished before the
- given timeout.
+ Should only be used by Executor implementations and unit tests.
"""
- for f in self:
- f.cancel()
- self.wait(timeout=timeout, return_when=ALL_COMPLETED)
- if any(not f.done() for f in self):
- raise TimeoutError()
+ with self._condition:
+ self._result = result
+ self._state = FINISHED
+ for waiter in self._waiters:
+ waiter.add_result(self)
+ self._condition.notify_all()
+ self._invoke_callbacks()
- def has_running_futures(self):
- """Returns True if any futures in the list are still running."""
- return any(self.running_futures())
+ def set_exception(self, exception):
+ """Sets the result of the future as being the given exception.
- def has_cancelled_futures(self):
- """Returns True if any futures in the list were cancelled."""
- return any(self.cancelled_futures())
-
- def has_done_futures(self):
- """Returns True if any futures in the list are finished or cancelled."""
- return any(self.done_futures())
-
- def has_successful_futures(self):
- """Returns True if any futures in the list finished without raising."""
- return any(self.successful_futures())
-
- def has_exception_futures(self):
- """Returns True if any futures in the list finished by raising."""
- return any(self.exception_futures())
-
- def cancelled_futures(self):
- """Returns all cancelled futures in the list."""
- return (f for f in self
- if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED])
-
- def done_futures(self):
- """Returns all futures in the list that are finished or cancelled."""
- return (f for f in self
- if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
-
- def successful_futures(self):
- """Returns all futures in the list that finished without raising."""
- return (f for f in self
- if f._state == FINISHED and f._exception is None)
-
- def exception_futures(self):
- """Returns all futures in the list that finished by raising."""
- return (f for f in self
- if f._state == FINISHED and f._exception is not None)
-
- def running_futures(self):
- """Returns all futures in the list that are still running."""
- return (f for f in self if f._state == RUNNING)
-
- def __len__(self):
- return len(self._futures)
-
- def __getitem__(self, i):
- return self._futures[i]
-
- def __iter__(self):
- return iter(self._futures)
-
- def __contains__(self, future):
- return future in self._futures
-
- def __repr__(self):
- states = dict([(state, 0) for state in _FUTURE_STATES])
- for f in self:
- states[f._state] += 1
-
- return ('<FutureList #futures=%d '
- '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % (
- len(self),
- states[PENDING],
- states[CANCELLED] + states[CANCELLED_AND_NOTIFIED],
- states[RUNNING],
- states[FINISHED]))
+ Should only be used by Executor implementations and unit tests.
+ """
+ with self._condition:
+ self._exception = exception
+ self._state = FINISHED
+ for waiter in self._waiters:
+ waiter.add_exception(self)
+ self._condition.notify_all()
+ self._invoke_callbacks()
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
- def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- """Return a list of futures representing the given calls.
- Args:
- calls: A sequence of callables that take no arguments. These will
- be bound to Futures and returned.
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
- return_when: Indicates when the method should return. The options
- are:
+ def submit(self, fn, *args, **kwargs):
+ """Submits a callable to be executed with the given arguments.
- FIRST_COMPLETED - Return when any future finishes or is
- cancelled.
- FIRST_EXCEPTION - Return when any future finishes by raising an
- exception. If no future raises and exception
- then it is equivalent to ALL_COMPLETED.
- ALL_COMPLETED - Return when all futures finish or are cancelled.
- RETURN_IMMEDIATELY - Return without waiting.
+ Schedules the callable to be executed as fn(*args, **kwargs) and returns
+ a Future instance representing the execution of the callable.
Returns:
- A FutureList containing Futures for the given calls.
+ A Future representing the given call.
"""
raise NotImplementedError()
- def run_to_results(self, calls, timeout=None):
- """Returns a iterator of the results of the given calls.
-
- Args:
- calls: A sequence of callables that take no arguments. These will
- be called and their results returned.
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
-
- Returns:
- An iterator over the results of the given calls. Equivalent to:
- (call() for call in calls) but the calls may be evaluated
- out-of-order.
-
- Raises:
- TimeoutError: If all the given calls were not completed before the
- given timeout.
- Exception: If any call() raises.
- """
- if timeout is not None:
- end_time = timeout + time.time()
-
- fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY)
-
- try:
- for future in fs:
- if timeout is None:
- yield future.result()
- else:
- yield future.result(end_time - time.time())
- except Exception, e:
- # Python 2.4 and earlier don't allow yield statements in
- # try/finally blocks
- try:
- fs.cancel(timeout=0)
- except TimeoutError:
- pass
- raise e
-
- def map(self, func, *iterables, **kwargs):
+ def map(self, fn, *iterables, **kwargs):
"""Returns a iterator equivalent to map(fn, iter).
Args:
- func: A callable that will take take as many arguments as there
- are passed iterables.
+ fn: A callable that will take take as many arguments as there are
+ passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
@@ -606,17 +504,38 @@
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
- timeout = kwargs.get('timeout') or None
- calls = [partial(func, *args) for args in zip(*iterables)]
- return self.run_to_results(calls, timeout=timeout)
+ timeout = kwargs.get('timeout')
+ if timeout is not None:
+ end_time = timeout + time.time()
- def shutdown(self):
- """Clean-up. No other methods can be called afterwards."""
- raise NotImplementedError()
+ fs = [self.submit(fn, *args) for args in zip(*iterables)]
+
+ try:
+ for future in fs:
+ if timeout is None:
+ yield future.result()
+ else:
+ yield future.result(end_time - time.time())
+ finally:
+ for future in fs:
+ future.cancel()
+
+ def shutdown(self, wait=True):
+ """Clean-up the resources associated with the Executor.
+
+ It is safe to call this method several times. Otherwise, no other
+ methods can be called after this one.
+
+ Args:
+ wait: If True then shutdown will not return until all running
+ futures have finished executing and the resources used by the
+ executor have been reclaimed.
+ """
+ pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
- self.shutdown()
+ self.shutdown(wait=True)
return False
diff --git a/python2/futures/process.py b/python2/futures/process.py
index f0d7fdf..ec48377 100644
--- a/python2/futures/process.py
+++ b/python2/futures/process.py
@@ -1,4 +1,5 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
"""Implements ProcessPoolExecutor.
@@ -23,9 +24,8 @@
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
-Executor.run_to_futures() called:
-- creates a uniquely numbered _WorkItem for each call and adds them to the
- "Work Items" dict
+Executor.submit() called:
+- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
@@ -42,15 +42,11 @@
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Request Q"
"""
-
+
__author__ = 'Brian Quinlan ([email protected])'
-from futures._base import (PENDING, RUNNING, CANCELLED,
- CANCELLED_AND_NOTIFIED, FINISHED,
- ALL_COMPLETED,
- set_future_exception, set_future_result,
- Executor, Future, FutureList, ThreadEventSink)
import atexit
+import _base
import Queue
import multiprocessing
import threading
@@ -63,7 +59,7 @@
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
-# be bad if the function being evaluated has external side-effects e.g.
+# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
@@ -100,10 +96,11 @@
EXTRA_QUEUED_CALLS = 1
class _WorkItem(object):
- def __init__(self, call, future, completion_tracker):
- self.call = call
+ def __init__(self, future, fn, args, kwargs):
self.future = future
- self.completion_tracker = completion_tracker
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
@@ -112,9 +109,11 @@
self.result = result
class _CallItem(object):
- def __init__(self, work_id, call):
+ def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
- self.call = call
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
def _process_worker(call_queue, result_queue, shutdown):
"""Evaluates calls from call_queue and places the results in result_queue.
@@ -137,8 +136,8 @@
return
else:
try:
- r = call_item.call()
- except Exception, e:
+ r = call_item.fn(*call_item.args, **call_item.kwargs)
+ except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
@@ -172,19 +171,15 @@
else:
work_item = pending_work_items[work_id]
- if work_item.future.cancelled():
- work_item.future._condition.acquire()
- work_item.future._condition.notify_all()
- work_item.future._condition.release()
-
- work_item.completion_tracker.add_cancelled()
+ if work_item.future.set_running_or_notify_cancel():
+ call_queue.put(_CallItem(work_id,
+ work_item.fn,
+ work_item.args,
+ work_item.kwargs),
+ block=True)
+ else:
del pending_work_items[work_id]
continue
- else:
- work_item.future._condition.acquire()
- work_item.future._state = RUNNING
- work_item.future._condition.release()
- call_queue.put(_CallItem(work_id, work_item.call), block=True)
def _queue_manangement_worker(executor_reference,
processes,
@@ -218,6 +213,7 @@
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
+
try:
result_item = result_queue.get(block=True, timeout=0.1)
except Queue.Empty:
@@ -244,15 +240,11 @@
del pending_work_items[result_item.work_id]
if result_item.exception:
- set_future_exception(work_item.future,
- work_item.completion_tracker,
- result_item.exception)
+ work_item.future.set_exception(result_item.exception)
else:
- set_future_result(work_item.future,
- work_item.completion_tracker,
- result_item.result)
+ work_item.future.set_result(result_item.result)
-class ProcessPoolExecutor(Executor):
+class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
@@ -296,7 +288,7 @@
self._call_queue,
self._result_queue,
self._shutdown_process_event))
- self._queue_management_thread.setDaemon(True)
+ self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_thread_references.add(weakref.ref(self._queue_management_thread))
@@ -310,36 +302,36 @@
p.start()
self._processes.add(p)
- def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- self._shutdown_lock.acquire()
- try:
+ def submit(self, fn, *args, **kwargs):
+ with self._shutdown_lock:
if self._shutdown_thread:
- raise RuntimeError('cannot run new futures after shutdown')
+ raise RuntimeError('cannot schedule new futures after shutdown')
- futures = []
- event_sink = ThreadEventSink()
+ f = _base.Future()
+ w = _WorkItem(f, fn, args, kwargs)
- for index, call in enumerate(calls):
- f = Future(index)
- self._pending_work_items[self._queue_count] = _WorkItem(
- call, f, event_sink)
- self._work_ids.put(self._queue_count)
- futures.append(f)
- self._queue_count += 1
+ self._pending_work_items[self._queue_count] = w
+ self._work_ids.put(self._queue_count)
+ self._queue_count += 1
self._start_queue_management_thread()
self._adjust_process_count()
- fl = FutureList(futures, event_sink)
- fl.wait(timeout=timeout, return_when=return_when)
- return fl
- finally:
- self._shutdown_lock.release()
+ return f
+ submit.__doc__ = _base.Executor.submit.__doc__
- def shutdown(self):
- self._shutdown_lock.acquire()
- try:
+ def shutdown(self, wait=True):
+ with self._shutdown_lock:
self._shutdown_thread = True
- finally:
- self._shutdown_lock.release()
+ if wait:
+ if self._queue_management_thread:
+ self._queue_management_thread.join()
+ # To reduce the risk of openning too many files, remove references to
+ # objects that use file descriptors.
+ self._queue_management_thread = None
+ self._call_queue = None
+ self._result_queue = None
+ self._shutdown_process_event = None
+ self._processes = None
+ shutdown.__doc__ = _base.Executor.shutdown.__doc__
-atexit.register(_python_exit)
\ No newline at end of file
+atexit.register(_python_exit)
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 4f410fe..3f1584a 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -1,16 +1,12 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
"""Implements ThreadPoolExecutor."""
__author__ = 'Brian Quinlan ([email protected])'
-from futures._base import (PENDING, RUNNING, CANCELLED,
- CANCELLED_AND_NOTIFIED, FINISHED,
- ALL_COMPLETED,
- LOGGER,
- set_future_exception, set_future_result,
- Executor, Future, FutureList, ThreadEventSink)
import atexit
+import _base
import Queue
import threading
import weakref
@@ -22,15 +18,15 @@
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
-# be bad if the function being evaluated has external side-effects e.g.
+# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until they
-# finish.
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
-_thread_references = set() # Weakrefs to every active worker thread.
-_shutdown = False # Indicates that the interpreter is shutting down.
+_thread_references = set()
+_shutdown = False
def _python_exit():
global _shutdown
@@ -43,11 +39,10 @@
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
- Should be called periodically to prevent thread objects from accumulating in
- scenarios such as:
+ Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
- >>> ... t = ThreadPoolExecutor(max_workers=5)
- >>> ... t.map(int, ['1', '2', '3', '4', '5'])
+ ... t = ThreadPoolExecutor(max_workers=5)
+ ... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
@@ -56,38 +51,22 @@
atexit.register(_python_exit)
class _WorkItem(object):
- def __init__(self, call, future, completion_tracker):
- self.call = call
+ def __init__(self, future, fn, args, kwargs):
self.future = future
- self.completion_tracker = completion_tracker
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
def run(self):
- self.future._condition.acquire()
- try:
- if self.future._state == PENDING:
- self.future._state = RUNNING
- elif self.future._state == CANCELLED:
- self.completion_tracker._condition.acquire()
- try:
- self.future._state = CANCELLED_AND_NOTIFIED
- self.completion_tracker.add_cancelled()
- return
- finally:
- self.completion_tracker._condition.release()
- else:
- LOGGER.critical('Future %s in unexpected state: %d',
- id(self.future),
- self.future._state)
- return
- finally:
- self.future._condition.release()
+ if not self.future.set_running_or_notify_cancel():
+ return
try:
- result = self.call()
- except Exception, e:
- set_future_exception(self.future, self.completion_tracker, e)
+ result = self.fn(*self.args, **self.kwargs)
+ except BaseException as e:
+ self.future.set_exception(e)
else:
- set_future_result(self.future, self.completion_tracker, result)
+ self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
@@ -105,10 +84,10 @@
del executor
else:
work_item.run()
- except Exception, e:
- LOGGER.critical('Exception in worker', exc_info=True)
+ except BaseException as e:
+ _base.LOGGER.critical('Exception in worker', exc_info=True)
-class ThreadPoolExecutor(Executor):
+class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers):
"""Initializes a new ThreadPoolExecutor instance.
@@ -124,42 +103,34 @@
self._shutdown = False
self._shutdown_lock = threading.Lock()
+ def submit(self, fn, *args, **kwargs):
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise RuntimeError('cannot schedule new futures after shutdown')
+
+ f = _base.Future()
+ w = _WorkItem(f, fn, args, kwargs)
+
+ self._work_queue.put(w)
+ self._adjust_thread_count()
+ return f
+ submit.__doc__ = _base.Executor.submit.__doc__
+
def _adjust_thread_count(self):
- for _ in range(len(self._threads),
- min(self._max_workers, self._work_queue.qsize())):
+ # TODO(bquinlan): Should avoid creating new threads if there are more
+ # idle threads than items in the work queue.
+ if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self), self._work_queue))
- t.setDaemon(True)
+ t.daemon = True
t.start()
self._threads.add(t)
_thread_references.add(weakref.ref(t))
- def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- self._shutdown_lock.acquire()
- try:
- if self._shutdown:
- raise RuntimeError('cannot run new futures after shutdown')
-
- futures = []
- event_sink = ThreadEventSink()
- for index, call in enumerate(calls):
- f = Future(index)
- w = _WorkItem(call, f, event_sink)
- self._work_queue.put(w)
- futures.append(f)
-
- self._adjust_thread_count()
- fl = FutureList(futures, event_sink)
- fl.wait(timeout=timeout, return_when=return_when)
- return fl
- finally:
- self._shutdown_lock.release()
- run_to_futures.__doc__ = Executor.run_to_futures.__doc__
-
- def shutdown(self):
- self._shutdown_lock.acquire()
- try:
+ def shutdown(self, wait=True):
+ with self._shutdown_lock:
self._shutdown = True
- finally:
- self._shutdown_lock.release()
- shutdown.__doc__ = Executor.shutdown.__doc__
+ if wait:
+ for t in self._threads:
+ t.join()
+ shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/python2/primes.py b/python2/primes.py
index 0b2bf81..fa6c355 100644
--- a/python2/primes.py
+++ b/python2/primes.py
@@ -25,29 +25,23 @@
return list(map(is_prime, PRIMES))
def with_process_pool_executor():
- executor = futures.ProcessPoolExecutor(10)
- try:
+ with futures.ProcessPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
- finally:
- executor.shutdown()
def with_thread_pool_executor():
- executor = futures.ThreadPoolExecutor(10)
- try:
+ with futures.ThreadPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
- finally:
- executor.shutdown()
def main():
for name, fn in [('sequential', sequential),
('processes', with_process_pool_executor),
('threads', with_thread_pool_executor)]:
- print '%s: ' % name.ljust(12),
-
+ print name.ljust(12),
start = time.time()
if fn() != [True] * len(PRIMES):
print 'failed'
else:
print '%.2f seconds' % (time.time() - start)
-main()
\ No newline at end of file
+if __name__ == '__main__':
+ main()
diff --git a/python2/setup.py b/python2/setup.py
index 897dc86..fcd05f2 100755
--- a/python2/setup.py
+++ b/python2/setup.py
@@ -1,10 +1,10 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
from distutils.core import setup
-setup(name='futures',
+setup(name='futures3',
version='1.0',
- description='Java-style futures implementation in Python 2.x',
+ description='Java-style futures implementation in Python 3.x',
author='Brian Quinlan',
author_email='[email protected]',
url='http://code.google.com/p/pythonfutures',
@@ -14,5 +14,5 @@
classifiers=['License :: OSI Approved :: BSD License',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
- 'Programming Language :: Python :: 2']
+ 'Programming Language :: Python :: 3']
)
diff --git a/python2/test_futures.py b/python2/test_futures.py
index cf74286..2d5672b 100644
--- a/python2/test_futures.py
+++ b/python2/test_futures.py
@@ -1,16 +1,25 @@
-import unittest
-import threading
-import time
+import logging
import multiprocessing
+import re
+import StringIO
+import sys
+import threading
from test import test_support
+import time
+import unittest
+
+if sys.platform.startswith('win'):
+ import ctypes
+ import ctypes.wintypes
import futures
-import futures._base
from futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
+ LOGGER, STDERR_HANDLER, wait)
+import futures.process
def create_future(state=PENDING, exception=None, result=None):
- f = Future(0)
+ f = Future()
f._state = state
f._exception = exception
f._result = result
@@ -23,68 +32,104 @@
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+def mul(x, y):
+ return x * y
+
class Call(object):
+ """A call that can be submitted to a future.Executor for testing.
+
+ The call signals when it is called and waits for an event before finishing.
+ """
CALL_LOCKS = {}
+ def _create_event(self):
+ if sys.platform.startswith('win'):
+ class SECURITY_ATTRIBUTES(ctypes.Structure):
+ _fields_ = [("nLength", ctypes.wintypes.DWORD),
+ ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
+ ("bInheritHandle", ctypes.wintypes.BOOL)]
+
+ s = SECURITY_ATTRIBUTES()
+ s.nLength = ctypes.sizeof(s)
+ s.lpSecurityDescriptor = None
+ s.bInheritHandle = True
+
+ handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
+ True,
+ False,
+ None)
+ assert handle is not None
+ return handle
+ else:
+ event = multiprocessing.Event()
+ self.CALL_LOCKS[id(event)] = event
+ return id(event)
+
+ def _wait_on_event(self, handle):
+ if sys.platform.startswith('win'):
+ r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
+ assert r == 0
+ else:
+ self.CALL_LOCKS[handle].wait()
+
+ def _signal_event(self, handle):
+ if sys.platform.startswith('win'):
+ r = ctypes.windll.kernel32.SetEvent(handle)
+ assert r != 0
+ else:
+ self.CALL_LOCKS[handle].set()
+
def __init__(self, manual_finish=False, result=42):
- called_event = multiprocessing.Event()
- can_finish = multiprocessing.Event()
+ self._called_event = self._create_event()
+ self._can_finish = self._create_event()
self._result = result
- self._called_event_id = id(called_event)
- self._can_finish_event_id = id(can_finish)
-
- self.CALL_LOCKS[self._called_event_id] = called_event
- self.CALL_LOCKS[self._can_finish_event_id] = can_finish
if not manual_finish:
- self._can_finish.set()
-
- @property
- def _can_finish(self):
- return self.CALL_LOCKS[self._can_finish_event_id]
-
- @property
- def _called_event(self):
- return self.CALL_LOCKS[self._called_event_id]
+ self._signal_event(self._can_finish)
def wait_on_called(self):
- self._called_event.wait()
+ self._wait_on_event(self._called_event)
def set_can(self):
- self._can_finish.set()
-
- def called(self):
- return self._called_event.is_set()
+ self._signal_event(self._can_finish)
def __call__(self):
- if self._called_event.is_set(): print('called twice')
+ self._signal_event(self._called_event)
+ self._wait_on_event(self._can_finish)
- self._called_event.set()
- self._can_finish.wait()
return self._result
def close(self):
- del self.CALL_LOCKS[self._called_event_id]
- del self.CALL_LOCKS[self._can_finish_event_id]
+ self.set_can()
+ if sys.platform.startswith('win'):
+ ctypes.windll.kernel32.CloseHandle(self._called_event)
+ ctypes.windll.kernel32.CloseHandle(self._can_finish)
+ else:
+ del self.CALL_LOCKS[self._called_event]
+ del self.CALL_LOCKS[self._can_finish]
class ExceptionCall(Call):
def __call__(self):
- assert not self._called_event.is_set(), 'already called'
-
- self._called_event.set()
- self._can_finish.wait()
+ self._signal_event(self._called_event)
+ self._wait_on_event(self._can_finish)
raise ZeroDivisionError()
+class MapCall(Call):
+ def __init__(self, result=42):
+ super(MapCall, self).__init__(manual_finish=True, result=result)
+
+ def __call__(self, manual_finish):
+ if manual_finish:
+ super(MapCall, self).__call__()
+ return self._result
+
class ExecutorShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
- call1 = Call()
- try:
- self.executor.shutdown()
- self.assertRaises(RuntimeError,
- self.executor.run_to_futures,
- [call1])
- finally:
- call1.close()
+ self.executor.shutdown()
+ self.assertRaises(RuntimeError,
+ self.executor.submit,
+ pow, 2, 5)
+
def _start_some_futures(self):
call1 = Call(manual_finish=True)
@@ -92,13 +137,14 @@
call3 = Call(manual_finish=True)
try:
- self.executor.run_to_futures([call1, call2, call3],
- return_when=futures.RETURN_IMMEDIATELY)
-
+ self.executor.submit(call1)
+ self.executor.submit(call2)
+ self.executor.submit(call3)
+
call1.wait_on_called()
call2.wait_on_called()
call3.wait_on_called()
-
+
call1.set_can()
call2.set_can()
call3.set_can()
@@ -112,7 +158,7 @@
self.executor = futures.ThreadPoolExecutor(max_workers=5)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
def test_threads_terminate(self):
self._start_some_futures()
@@ -144,15 +190,15 @@
self.executor = futures.ProcessPoolExecutor(max_workers=5)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
def test_processes_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._processes), 5)
+ processes = self.executor._processes
self.executor.shutdown()
- self.executor._queue_management_thread.join()
- for p in self.executor._processes:
+ for p in processes:
p.join()
def test_context_manager_shutdown(self):
@@ -161,8 +207,7 @@
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- executor._queue_management_thread.join()
- for p in executor._processes:
+ for p in self.executor._processes:
p.join()
def test_del_shutdown(self):
@@ -176,312 +221,316 @@
for p in processes:
p.join()
-class WaitsTest(unittest.TestCase):
- def test_concurrent_waits(self):
- def wait_for_ALL_COMPLETED():
- fs.wait(return_when=futures.ALL_COMPLETED)
- self.assertTrue(f1.done())
- self.assertTrue(f2.done())
- self.assertTrue(f3.done())
- self.assertTrue(f4.done())
- all_completed.release()
+class WaitTests(unittest.TestCase):
+ def test_first_completed(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
- def wait_for_FIRST_COMPLETED():
- fs.wait(return_when=futures.FIRST_COMPLETED)
- self.assertTrue(f1.done())
- self.assertFalse(f2.done())
- self.assertFalse(f3.done())
- self.assertFalse(f4.done())
- first_completed.release()
- def wait_for_FIRST_EXCEPTION():
- fs.wait(return_when=futures.FIRST_EXCEPTION)
- self.assertTrue(f1.done())
- self.assertTrue(f2.done())
- self.assertFalse(f3.done())
- self.assertFalse(f4.done())
- first_exception.release()
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
- all_completed = threading.Semaphore(0)
- first_completed = threading.Semaphore(0)
- first_exception = threading.Semaphore(0)
+ t = threading.Thread(target=wait_test)
+ t.start()
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEquals(set([future1]), done)
+ self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
+ finally:
+ call1.close()
+ call2.close()
+
+ def test_first_completed_one_already_completed(self):
+ call1 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
+ self.assertEquals(set([future1]), pending)
+ finally:
+ call1.close()
+
+ def test_first_exception(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
+ call2.set_can()
call1 = Call(manual_finish=True)
call2 = ExceptionCall(manual_finish=True)
call3 = Call(manual_finish=True)
- call4 = Call()
-
try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- threads = []
- for wait_test in [wait_for_ALL_COMPLETED,
- wait_for_FIRST_COMPLETED,
- wait_for_FIRST_EXCEPTION]:
- t = threading.Thread(target=wait_test)
- t.start()
- threads.append(t)
-
- time.sleep(1) # give threads enough time to execute wait
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+ future3 = self.executor.submit(call3)
- call1.set_can()
- first_completed.acquire()
- call2.set_can()
- first_exception.acquire()
- call3.set_can()
- all_completed.acquire()
-
- self.executor.shutdown()
- finally:
- call1.close()
- call2.close()
- call3.close()
- call4.close()
-
-class ThreadPoolWaitTests(WaitsTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown()
-
-class ProcessPoolWaitTests(WaitsTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown()
-
-class CancelTests(unittest.TestCase):
- def test_cancel_states(self):
- call1 = Call(manual_finish=True)
- call2 = Call()
- call3 = Call()
- call4 = Call()
-
- try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- call1.wait_on_called()
- self.assertEqual(f1.cancel(), False)
- self.assertEqual(f2.cancel(), True)
- self.assertEqual(f4.cancel(), True)
- self.assertEqual(f1.cancelled(), False)
- self.assertEqual(f2.cancelled(), True)
- self.assertEqual(f3.cancelled(), False)
- self.assertEqual(f4.cancelled(), True)
- self.assertEqual(f1.done(), False)
- self.assertEqual(f2.done(), True)
- self.assertEqual(f3.done(), False)
- self.assertEqual(f4.done(), True)
-
- call1.set_can()
- fs.wait(return_when=futures.ALL_COMPLETED)
- self.assertEqual(f1.result(), 42)
- self.assertRaises(futures.CancelledError, f2.result)
- self.assertRaises(futures.CancelledError, f2.exception)
- self.assertEqual(f3.result(), 42)
- self.assertRaises(futures.CancelledError, f4.result)
- self.assertRaises(futures.CancelledError, f4.exception)
-
- self.assertEqual(call2.called(), False)
- self.assertEqual(call4.called(), False)
- finally:
- call1.close()
- call2.close()
- call3.close()
- call4.close()
-
- def test_wait_for_individual_cancel_while_waiting(self):
- def end_call():
- # Wait until the main thread is waiting on the results of the
- # future.
- time.sleep(1)
- f2.cancel()
- call1.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call()
-
- try:
- fs = self.executor.run_to_futures(
- [call1, call2],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2 = fs
-
- call1.wait_on_called()
- t = threading.Thread(target=end_call)
+ t = threading.Thread(target=wait_test)
t.start()
- self.assertRaises(futures.CancelledError, f2.result)
- self.assertRaises(futures.CancelledError, f2.exception)
- t.join()
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEquals(set([future1, future2]), finished)
+ self.assertEquals(set([future3]), pending)
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+
+ def test_first_exception_some_already_complete(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
+
+ call1 = ExceptionCall(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEquals(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
+
+
finally:
call1.close()
call2.close()
- def test_wait_with_already_cancelled_futures(self):
+ def test_first_exception_one_already_failed(self):
call1 = Call(manual_finish=True)
- call2 = Call()
- call3 = Call()
- call4 = Call()
-
try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- call1.wait_on_called()
- self.assertTrue(f2.cancel())
- self.assertTrue(f3.cancel())
+ future1 = self.executor.submit(call1)
+
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEquals(set([EXCEPTION_FUTURE]), finished)
+ self.assertEquals(set([future1]), pending)
+ finally:
+ call1.close()
+
+ def test_all_completed(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
call1.set_can()
-
- fs.wait(return_when=futures.ALL_COMPLETED)
+ call2.set_can()
+
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [future1, future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEquals(set([future1, future2]), finished)
+ self.assertEquals(set(), pending)
+
+
+ finally:
+ call1.close()
+ call2.close()
+
+ def test_all_completed_some_already_completed(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+
+ future4.cancel()
+ call1.set_can()
+ call2.set_can()
+ call3.set_can()
+
+ self.assertTrue(
+ futures.process.EXTRA_QUEUED_CALLS <= 1,
+ 'this test assumes that future4 will be cancelled before it is '
+ 'queued to run - which might not be the case if '
+ 'ProcessPoolExecutor is too aggresive in scheduling futures')
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ call3 = Call(manual_finish=True)
+ call4 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+ future3 = self.executor.submit(call3)
+ future4 = self.executor.submit(call4)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2, future3, future4],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEquals(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2, future3, future4]),
+ finished)
+ self.assertEquals(set(), pending)
finally:
call1.close()
call2.close()
call3.close()
call4.close()
- def test_cancel_all(self):
- call1 = Call(manual_finish=True)
- call2 = Call()
- call3 = Call()
- call4 = Call()
-
- try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- call1.wait_on_called()
- self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0)
+ def test_timeout(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
call1.set_can()
- fs.cancel()
-
- self.assertFalse(f1.cancelled())
- self.assertTrue(f2.cancelled())
- self.assertTrue(f3.cancelled())
- self.assertTrue(f4.cancelled())
+
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=1,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEquals(set([future2]), pending)
+
+
finally:
call1.close()
call2.close()
- call3.close()
- call4.close()
-class ThreadPoolCancelTests(CancelTests):
+
+class ThreadPoolWaitTests(WaitTests):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
-class ProcessPoolCancelTests(WaitsTest):
+class ProcessPoolWaitTests(WaitTests):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
+
+class AsCompletedTests(unittest.TestCase):
+ # TODO([email protected]): Should have a test with a non-zero timeout.
+ def test_no_timeout(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
+ call2.set_can()
+
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ completed = set(futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]))
+ self.assertEquals(set(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]),
+ completed)
+ finally:
+ call1.close()
+ call2.close()
+
+ def test_zero_timeout(self):
+ call1 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ completed_futures = set()
+ try:
+ for future in futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1],
+ timeout=0):
+ completed_futures.add(future)
+ except futures.TimeoutError:
+ pass
+
+ self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]),
+ completed_futures)
+ finally:
+ call1.close()
+
+class ThreadPoolAsCompletedTests(AsCompletedTests):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_workers=1)
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+
+class ProcessPoolAsCompletedTests(AsCompletedTests):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_workers=1)
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
- def test_run_to_futures(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = Call(manual_finish=True)
- call4 = Call()
- call5 = Call()
+ def test_submit(self):
+ future = self.executor.submit(pow, 2, 8)
+ self.assertEquals(256, future.result())
- try:
- f1, f2, f3, f4, f5 = self.executor.run_to_futures(
- [call1, call2, call3, call4, call5],
- return_when=futures.RETURN_IMMEDIATELY)
-
- call3.wait_on_called()
-
- # ProcessPoolExecutor uses a thread to propogate results into the
- # future. Calling result() ensures that the thread has done its work
- # before doing the next set of checks.
- f1.result()
- f2.result()
-
- self.assertTrue(f1.done())
- self.assertFalse(f1.running())
- self.assertEqual(f1.index, 0)
-
- self.assertTrue(f2.done())
- self.assertFalse(f2.running())
- self.assertEqual(f2.index, 1)
-
- self.assertFalse(f3.done())
- self.assertTrue(f3.running())
- self.assertEqual(f3.index, 2)
-
- # ProcessPoolExecutor may mark some futures as running before they
- # actually are so don't check these ones.
- self.assertFalse(f4.done())
- self.assertEqual(f4.index, 3)
-
- self.assertFalse(f5.done())
- self.assertEqual(f5.index, 4)
- finally:
- call3.set_can() # Let the call finish executing.
- call1.close()
- call2.close()
- call3.close()
- call4.close()
- call5.close()
-
- def test_run_to_results(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = Call(result=3)
- try:
- self.assertEqual(
- list(self.executor.run_to_results([call1, call2, call3])),
- [1, 2, 3])
- finally:
- call1.close()
- call2.close()
- call3.close()
-
- def test_run_to_results_exception(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = ExceptionCall()
- try:
- i = self.executor.run_to_results([call1, call2, call3])
-
- self.assertEqual(i.next(), 1)
- self.assertEqual(i.next(), 2)
- self.assertRaises(ZeroDivisionError, i.next)
- finally:
- call1.close()
- call2.close()
- call3.close()
-
- def test_run_to_results_timeout(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = Call(manual_finish=True)
-
- try:
- i = self.executor.run_to_results([call1, call2, call3], timeout=1)
- self.assertEqual(i.next(), 1)
- self.assertEqual(i.next(), 2)
- self.assertRaises(futures.TimeoutError, i.next)
- call3.set_can()
- finally:
- call1.close()
- call2.close()
- call3.close()
+ def test_submit_keyword(self):
+ future = self.executor.submit(mul, 2, y=8)
+ self.assertEquals(16, future.result())
def test_map(self):
self.assertEqual(
@@ -494,36 +543,142 @@
self.assertEqual(i.next(), (0, 1))
self.assertRaises(ZeroDivisionError, i.next)
+ def test_map_timeout(self):
+ results = []
+ timeout_call = MapCall()
+ try:
+ try:
+ for i in self.executor.map(timeout_call,
+ [False, False, True],
+ timeout=1):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
+ finally:
+ timeout_call.close()
+
+ self.assertEquals([42, 42], results)
+
class ThreadPoolExecutorTest(ExecutorTest):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
class ProcessPoolExecutorTest(ExecutorTest):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
class FutureTests(unittest.TestCase):
- # Future.index() is tested by ExecutorTest
- # Future.cancel() is further tested by CancelTests.
+ def test_done_callback_with_result(self):
+ self.callback_result = None
+ def fn(callback_future):
+ self.callback_result = callback_future.result()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertEquals(5, self.callback_result)
+
+ def test_done_callback_with_exception(self):
+ self.callback_exception = None
+ def fn(callback_future):
+ self.callback_exception = callback_future.exception()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_exception(Exception('test'))
+ self.assertEquals(('test',), self.callback_exception.args)
+
+ def test_done_callback_with_cancel(self):
+ self.was_cancelled = None
+ def fn(callback_future):
+ self.was_cancelled = callback_future.cancelled()
+
+ f = Future()
+ f.add_done_callback(fn)
+ self.assertTrue(f.cancel())
+ self.assertTrue(self.was_cancelled)
+
+ def test_done_callback_raises(self):
+ LOGGER.removeHandler(STDERR_HANDLER)
+ logging_stream = StringIO.StringIO()
+ handler = logging.StreamHandler(logging_stream)
+ LOGGER.addHandler(handler)
+ try:
+ self.raising_was_called = False
+ self.fn_was_called = False
+
+ def raising_fn(callback_future):
+ self.raising_was_called = True
+ raise Exception('doh!')
+
+ def fn(callback_future):
+ self.fn_was_called = True
+
+ f = Future()
+ f.add_done_callback(raising_fn)
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertTrue(self.raising_was_called)
+ self.assertTrue(self.fn_was_called)
+ self.assertTrue('Exception: doh!' in logging_stream.getvalue())
+ finally:
+ LOGGER.removeHandler(handler)
+ LOGGER.addHandler(STDERR_HANDLER)
+
+ def test_done_callback_already_successful(self):
+ self.callback_result = None
+ def fn(callback_future):
+ self.callback_result = callback_future.result()
+
+ f = Future()
+ f.set_result(5)
+ f.add_done_callback(fn)
+ self.assertEquals(5, self.callback_result)
+
+ def test_done_callback_already_failed(self):
+ self.callback_exception = None
+ def fn(callback_future):
+ self.callback_exception = callback_future.exception()
+
+ f = Future()
+ f.set_exception(Exception('test'))
+ f.add_done_callback(fn)
+ self.assertEquals(('test',), self.callback_exception.args)
+
+ def test_done_callback_already_cancelled(self):
+ self.was_cancelled = None
+ def fn(callback_future):
+ self.was_cancelled = callback_future.cancelled()
+
+ f = Future()
+ self.assertTrue(f.cancel())
+ f.add_done_callback(fn)
+ self.assertTrue(self.was_cancelled)
def test_repr(self):
- self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>')
- self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>')
- self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>')
- self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE),
- '<Future state=cancelled>')
- self.assertEqual(repr(EXCEPTION_FUTURE),
- '<Future state=finished raised IOError>')
- self.assertEqual(repr(SUCCESSFUL_FUTURE),
- '<Future state=finished returned int>')
+ self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=pending>',
+ repr(PENDING_FUTURE)))
+ self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=running>',
+ repr(RUNNING_FUTURE)))
+ self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
+ repr(CANCELLED_FUTURE)))
+ self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
+ repr(CANCELLED_AND_NOTIFIED_FUTURE)))
+ self.assertTrue(re.match(
+ '<Future at 0x[0-9a-f]+L? state=finished raised IOError>',
+ repr(EXCEPTION_FUTURE)))
+ self.assertTrue(re.match(
+ '<Future at 0x[0-9a-f]+L? state=finished returned int>',
+ repr(SUCCESSFUL_FUTURE)))
- create_future
def test_cancel(self):
f1 = create_future(state=PENDING)
@@ -588,13 +743,11 @@
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
def test_result_with_success(self):
+ # TODO([email protected]): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
- with f1._condition:
- f1._state = FINISHED
- f1._result = 42
- f1._condition.notify_all()
+ f1.set_result(42)
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
@@ -603,12 +756,11 @@
self.assertEquals(f1.result(timeout=5), 42)
def test_result_with_cancel(self):
+ # TODO([email protected]): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
- with f1._condition:
- f1._state = CANCELLED
- f1._condition.notify_all()
+ f1.cancel()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
@@ -644,186 +796,16 @@
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
-class FutureListTests(unittest.TestCase):
- # FutureList.wait() is further tested by WaitsTest.
- # FutureList.cancel() is tested by CancelTests.
- def test_wait_RETURN_IMMEDIATELY(self):
- f = futures.FutureList(futures=None, event_sink=None)
- f.wait(return_when=futures.RETURN_IMMEDIATELY)
-
- def test_wait_timeout(self):
- f = futures.FutureList([PENDING_FUTURE],
- futures._base.ThreadEventSink())
-
- for t in [futures.FIRST_COMPLETED,
- futures.FIRST_EXCEPTION,
- futures.ALL_COMPLETED]:
- f.wait(timeout=0.1, return_when=t)
- self.assertFalse(PENDING_FUTURE.done())
-
- def test_wait_all_done(self):
- f = futures.FutureList([CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- SUCCESSFUL_FUTURE,
- EXCEPTION_FUTURE],
- futures._base.ThreadEventSink())
-
- f.wait(return_when=futures.ALL_COMPLETED)
-
- def test_filters(self):
- fs = [PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]
- f = futures.FutureList(fs, None)
-
- self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE])
- self.assertEqual(list(f.cancelled_futures()),
- [CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE])
- self.assertEqual(list(f.done_futures()),
- [CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE])
- self.assertEqual(list(f.successful_futures()),
- [SUCCESSFUL_FUTURE])
- self.assertEqual(list(f.exception_futures()),
- [EXCEPTION_FUTURE])
-
- def test_has_running_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- SUCCESSFUL_FUTURE,
- EXCEPTION_FUTURE],
- None).has_running_futures())
- self.assertTrue(
- futures.FutureList([RUNNING_FUTURE],
- None).has_running_futures())
-
- def test_has_cancelled_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- SUCCESSFUL_FUTURE,
- EXCEPTION_FUTURE],
- None).has_cancelled_futures())
- self.assertTrue(
- futures.FutureList([CANCELLED_FUTURE],
- None).has_cancelled_futures())
-
- self.assertTrue(
- futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
- None).has_cancelled_futures())
-
- def test_has_done_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE],
- None).has_done_futures())
- self.assertTrue(
- futures.FutureList([CANCELLED_FUTURE],
- None).has_done_futures())
-
- self.assertTrue(
- futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
- None).has_done_futures())
-
- self.assertTrue(
- futures.FutureList([EXCEPTION_FUTURE],
- None).has_done_futures())
-
- self.assertTrue(
- futures.FutureList([SUCCESSFUL_FUTURE],
- None).has_done_futures())
-
- def test_has_successful_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE],
- None).has_successful_futures())
-
- self.assertTrue(
- futures.FutureList([SUCCESSFUL_FUTURE],
- None).has_successful_futures())
-
- def test_has_exception_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- SUCCESSFUL_FUTURE],
- None).has_exception_futures())
-
- self.assertTrue(
- futures.FutureList([EXCEPTION_FUTURE],
- None).has_exception_futures())
-
- def test_get_item(self):
- fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
- f = futures.FutureList(fs, None)
- self.assertEqual(f[0], PENDING_FUTURE)
- self.assertEqual(f[1], RUNNING_FUTURE)
- self.assertEqual(f[2], CANCELLED_FUTURE)
- self.assertRaises(IndexError, f.__getitem__, 3)
-
- def test_len(self):
- f = futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE],
- None)
- self.assertEqual(len(f), 3)
-
- def test_iter(self):
- fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
- f = futures.FutureList(fs, None)
- self.assertEqual(list(iter(f)), fs)
-
- def test_contains(self):
- f = futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE],
- None)
- self.assertTrue(PENDING_FUTURE in f)
- self.assertTrue(RUNNING_FUTURE in f)
- self.assertFalse(CANCELLED_FUTURE in f)
-
- def test_repr(self):
- pending = create_future(state=PENDING)
- cancelled = create_future(state=CANCELLED)
- cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED)
- running = create_future(state=RUNNING)
- finished = create_future(state=FINISHED)
-
- f = futures.FutureList(
- [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 +
- [CANCELLED_AND_NOTIFIED_FUTURE] +
- [RUNNING_FUTURE] * 2 +
- [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3,
- None)
-
- self.assertEqual(repr(f),
- '<FutureList #futures=15 '
- '[#pending=4 #cancelled=3 #running=2 #finished=6]>')
-
def test_main():
- test_support.run_unittest(ProcessPoolCancelTests,
- ThreadPoolCancelTests,
- ProcessPoolExecutorTest,
+ test_support.run_unittest(ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
+ ProcessPoolAsCompletedTests,
+ ThreadPoolAsCompletedTests,
FutureTests,
- FutureListTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
if __name__ == "__main__":
- test_main()
\ No newline at end of file
+ test_main()