diff options
Diffstat (limited to 'lib-python/3/multiprocessing/pool.py')
-rw-r--r-- | lib-python/3/multiprocessing/pool.py | 142 |
1 files changed, 71 insertions, 71 deletions
diff --git a/lib-python/3/multiprocessing/pool.py b/lib-python/3/multiprocessing/pool.py index 0c29e644ff..0f2dab48eb 100644 --- a/lib-python/3/multiprocessing/pool.py +++ b/lib-python/3/multiprocessing/pool.py @@ -4,32 +4,7 @@ # multiprocessing/pool.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['Pool'] @@ -64,6 +39,9 @@ job_counter = itertools.count() def mapstar(args): return list(map(*args)) +def starmapstar(args): + return list(itertools.starmap(args[0], args[1])) + # # Code run by worker processes # @@ -169,7 +147,8 @@ class Pool(object): self._task_handler = threading.Thread( target=Pool._handle_tasks, - args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) + args=(self._taskqueue, self._quick_put, self._outqueue, + self._pool, self._cache) ) self._task_handler.daemon = True self._task_handler._state = RUN @@ -247,14 +226,30 @@ class Pool(object): Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ''' - assert self._state == RUN - return self.map_async(func, iterable, chunksize).get() + return self._map_async(func, iterable, mapstar, chunksize).get() + + def starmap(self, func, iterable, chunksize=None): + ''' + Like `map()` method but the elements of the `iterable` are expected to + be iterables as well and will be unpacked as arguments. Hence + `func` and (a, b) becomes func(a, b). + ''' + return self._map_async(func, iterable, starmapstar, chunksize).get() + + def starmap_async(self, func, iterable, chunksize=None, callback=None, + error_callback=None): + ''' + Asynchronous version of `starmap()` method. + ''' + return self._map_async(func, iterable, starmapstar, chunksize, + callback, error_callback) def imap(self, func, iterable, chunksize=1): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -272,7 +267,8 @@ class Pool(object): ''' Like `imap()` method but ordering of results is arbitrary. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -291,7 +287,8 @@ class Pool(object): ''' Asynchronous version of `apply()` method. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @@ -301,7 +298,16 @@ class Pool(object): ''' Asynchronous version of `map()` method. ''' - assert self._state == RUN + return self._map_async(func, iterable, mapstar, chunksize, callback, + error_callback) + + def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, + error_callback=None): + ''' + Helper function to implement map, starmap and their async counterparts. + ''' + if self._state != RUN: + raise ValueError("Pool not running") if not hasattr(iterable, '__len__'): iterable = list(iterable) @@ -315,7 +321,7 @@ class Pool(object): task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback, error_callback=error_callback) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) + self._taskqueue.put((((result._job, i, mapper, (x,), {}) for i, x in enumerate(task_batches)), None)) return result @@ -333,7 +339,7 @@ class Pool(object): debug('worker handler exiting') @staticmethod - def _handle_tasks(taskqueue, put, outqueue, pool): + def _handle_tasks(taskqueue, put, outqueue, pool, cache): thread = threading.current_thread() for taskseq, set_length in iter(taskqueue.get, None): @@ -344,9 +350,12 @@ class Pool(object): break try: put(task) - except IOError: - debug('could not put task on queue') - break + except Exception as e: + job, ind = task[:2] + try: + cache[job]._set(ind, (False, e)) + except KeyError: + pass else: if set_length: debug('doing set_length()') @@ -493,7 +502,8 @@ class Pool(object): # We must wait for the worker handler to exit before terminating # workers because we don't want workers to be restarted behind our back. debug('joining worker handler') - worker_handler.join() + if threading.current_thread() is not worker_handler: + worker_handler.join() # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): @@ -503,10 +513,12 @@ class Pool(object): p.terminate() debug('joining task handler') - task_handler.join() + if threading.current_thread() is not task_handler: + task_handler.join() debug('joining result handler') - result_handler.join() + if threading.current_thread() is not result_handler: + result_handler.join() if pool and hasattr(pool[0], 'terminate'): debug('joining pool workers') @@ -516,6 +528,12 @@ class Pool(object): debug('cleaning up worker %d' % p.pid) p.join() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + # # Class whose instances are returned by `Pool.apply_async()` # @@ -523,32 +541,26 @@ class Pool(object): class ApplyResult(object): def __init__(self, cache, callback, error_callback): - self._cond = threading.Condition(threading.Lock()) + self._event = threading.Event() self._job = next(job_counter) self._cache = cache - self._ready = False self._callback = callback self._error_callback = error_callback cache[self._job] = self def ready(self): - return self._ready + return self._event.is_set() def successful(self): - assert self._ready + assert self.ready() return self._success def wait(self, timeout=None): - self._cond.acquire() - try: - if not self._ready: - self._cond.wait(timeout) - finally: - self._cond.release() + self._event.wait(timeout) def get(self, timeout=None): self.wait(timeout) - if not self._ready: + if not self.ready(): raise TimeoutError if self._success: return self._value @@ -561,14 +573,11 @@ class ApplyResult(object): self._callback(self._value) if self._error_callback and not self._success: self._error_callback(self._value) - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() del self._cache[self._job] +AsyncResult = ApplyResult # create alias -- see #17805 + # # Class whose instances are returned by `Pool.map_async()` # @@ -583,7 +592,8 @@ class MapResult(ApplyResult): self._chunksize = chunksize if chunksize <= 0: self._number_left = 0 - self._ready = True + self._event.set() + del cache[self._job] else: self._number_left = length//chunksize + bool(length % chunksize) @@ -596,24 +606,14 @@ class MapResult(ApplyResult): if self._callback: self._callback(self._value) del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() else: self._success = False self._value = result if self._error_callback: self._error_callback(self._value) del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() # # Class whose instances are returned by `Pool.imap()` |