aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib-python/3/multiprocessing/pool.py')
-rw-r--r--lib-python/3/multiprocessing/pool.py142
1 files changed, 71 insertions, 71 deletions
diff --git a/lib-python/3/multiprocessing/pool.py b/lib-python/3/multiprocessing/pool.py
index 0c29e644ff..0f2dab48eb 100644
--- a/lib-python/3/multiprocessing/pool.py
+++ b/lib-python/3/multiprocessing/pool.py
@@ -4,32 +4,7 @@
# multiprocessing/pool.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = ['Pool']
@@ -64,6 +39,9 @@ job_counter = itertools.count()
def mapstar(args):
return list(map(*args))
+def starmapstar(args):
+ return list(itertools.starmap(args[0], args[1]))
+
#
# Code run by worker processes
#
@@ -169,7 +147,8 @@ class Pool(object):
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
- args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+ args=(self._taskqueue, self._quick_put, self._outqueue,
+ self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
@@ -247,14 +226,30 @@ class Pool(object):
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
- assert self._state == RUN
- return self.map_async(func, iterable, chunksize).get()
+ return self._map_async(func, iterable, mapstar, chunksize).get()
+
+ def starmap(self, func, iterable, chunksize=None):
+ '''
+ Like `map()` method but the elements of the `iterable` are expected to
+ be iterables as well and will be unpacked as arguments. Hence
+ `func` and (a, b) becomes func(a, b).
+ '''
+ return self._map_async(func, iterable, starmapstar, chunksize).get()
+
+ def starmap_async(self, func, iterable, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Asynchronous version of `starmap()` method.
+ '''
+ return self._map_async(func, iterable, starmapstar, chunksize,
+ callback, error_callback)
def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
@@ -272,7 +267,8 @@ class Pool(object):
'''
Like `imap()` method but ordering of results is arbitrary.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
@@ -291,7 +287,8 @@ class Pool(object):
'''
Asynchronous version of `apply()` method.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
@@ -301,7 +298,16 @@ class Pool(object):
'''
Asynchronous version of `map()` method.
'''
- assert self._state == RUN
+ return self._map_async(func, iterable, mapstar, chunksize, callback,
+ error_callback)
+
+ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Helper function to implement map, starmap and their async counterparts.
+ '''
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
@@ -315,7 +321,7 @@ class Pool(object):
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
- self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+ self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
@@ -333,7 +339,7 @@ class Pool(object):
debug('worker handler exiting')
@staticmethod
- def _handle_tasks(taskqueue, put, outqueue, pool):
+ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
@@ -344,9 +350,12 @@ class Pool(object):
break
try:
put(task)
- except IOError:
- debug('could not put task on queue')
- break
+ except Exception as e:
+ job, ind = task[:2]
+ try:
+ cache[job]._set(ind, (False, e))
+ except KeyError:
+ pass
else:
if set_length:
debug('doing set_length()')
@@ -493,7 +502,8 @@ class Pool(object):
# We must wait for the worker handler to exit before terminating
# workers because we don't want workers to be restarted behind our back.
debug('joining worker handler')
- worker_handler.join()
+ if threading.current_thread() is not worker_handler:
+ worker_handler.join()
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
@@ -503,10 +513,12 @@ class Pool(object):
p.terminate()
debug('joining task handler')
- task_handler.join()
+ if threading.current_thread() is not task_handler:
+ task_handler.join()
debug('joining result handler')
- result_handler.join()
+ if threading.current_thread() is not result_handler:
+ result_handler.join()
if pool and hasattr(pool[0], 'terminate'):
debug('joining pool workers')
@@ -516,6 +528,12 @@ class Pool(object):
debug('cleaning up worker %d' % p.pid)
p.join()
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.terminate()
+
#
# Class whose instances are returned by `Pool.apply_async()`
#
@@ -523,32 +541,26 @@ class Pool(object):
class ApplyResult(object):
def __init__(self, cache, callback, error_callback):
- self._cond = threading.Condition(threading.Lock())
+ self._event = threading.Event()
self._job = next(job_counter)
self._cache = cache
- self._ready = False
self._callback = callback
self._error_callback = error_callback
cache[self._job] = self
def ready(self):
- return self._ready
+ return self._event.is_set()
def successful(self):
- assert self._ready
+ assert self.ready()
return self._success
def wait(self, timeout=None):
- self._cond.acquire()
- try:
- if not self._ready:
- self._cond.wait(timeout)
- finally:
- self._cond.release()
+ self._event.wait(timeout)
def get(self, timeout=None):
self.wait(timeout)
- if not self._ready:
+ if not self.ready():
raise TimeoutError
if self._success:
return self._value
@@ -561,14 +573,11 @@ class ApplyResult(object):
self._callback(self._value)
if self._error_callback and not self._success:
self._error_callback(self._value)
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
+ self._event.set()
del self._cache[self._job]
+AsyncResult = ApplyResult # create alias -- see #17805
+
#
# Class whose instances are returned by `Pool.map_async()`
#
@@ -583,7 +592,8 @@ class MapResult(ApplyResult):
self._chunksize = chunksize
if chunksize <= 0:
self._number_left = 0
- self._ready = True
+ self._event.set()
+ del cache[self._job]
else:
self._number_left = length//chunksize + bool(length % chunksize)
@@ -596,24 +606,14 @@ class MapResult(ApplyResult):
if self._callback:
self._callback(self._value)
del self._cache[self._job]
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
+ self._event.set()
else:
self._success = False
self._value = result
if self._error_callback:
self._error_callback(self._value)
del self._cache[self._job]
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
+ self._event.set()
#
# Class whose instances are returned by `Pool.imap()`