aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndré Erdmann <dywi@mailerd.de>2013-07-31 23:07:36 +0200
committerAndré Erdmann <dywi@mailerd.de>2013-07-31 23:07:36 +0200
commit86d046b36e0a8a882a4f5d01a8e1eee82bc335b5 (patch)
tree7d06eb3ba31bb137e84005eb1ff88b856d7e9fc0 /roverlay/util
parentstats collection: count dropped packages (diff)
downloadR_overlay-86d046b36e0a8a882a4f5d01a8e1eee82bc335b5.tar.gz
R_overlay-86d046b36e0a8a882a4f5d01a8e1eee82bc335b5.tar.bz2
R_overlay-86d046b36e0a8a882a4f5d01a8e1eee82bc335b5.zip
incremental ov-creation: postpone revbump check
The revbump check needs hashes for comparing. Postponing the hash calculation and executing it in a thread pool may have speed advantages (needs some testing). Additionally, it allows to explicitly disable revbump checks when doing incremental overlay creation.
Diffstat (limited to 'roverlay/util')
-rw-r--r--roverlay/util/hashpool.py66
1 files changed, 66 insertions, 0 deletions
diff --git a/roverlay/util/hashpool.py b/roverlay/util/hashpool.py
new file mode 100644
index 0000000..921a968
--- /dev/null
+++ b/roverlay/util/hashpool.py
@@ -0,0 +1,66 @@
+# R overlay --
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 André Erdmann <dywi@mailerd.de>
+# Distributed under the terms of the GNU General Public License;
+# either version 2 of the License, or (at your option) any later version.
+
+try:
+ import concurrent.futures
+except ImportError:
+ sys.stderr.write (
+ '!!! concurrent.futures is not available.\n'
+ ' Falling back to single-threaded variants.\n\n'
+ )
+ HAVE_CONCURRENT_FUTURES = False
+else:
+ HAVE_CONCURRENT_FUTURES = True
+
+
+import roverlay.digest
+
+def _calculate_hashes ( hash_job, hashes ):
+ hash_job.hashdict.update (
+ roverlay.digest.multihash_file ( hash_job.filepath, hashes )
+ )
+# --- end of _calculate_hashes (...) ---
+
+class Hashjob ( object ):
+ def __init__ ( self, filepath, hashdict=None ):
+ self.filepath = filepath
+ self.hashdict = dict() if hashdict is None else hashdict
+ # --- end of __init__ (...) ---
+
+
+class HashPool ( object ):
+ def __init__ ( self, hashes, max_workers ):
+ super ( HashPool, self ).__init__()
+ self.hashes = frozenset ( hashes )
+ self._jobs = dict()
+ self.max_workers = int ( max_workers )
+ # --- end of __init__ (...) ---
+
+ def add ( self, backref, filepath, hashdict=None ):
+ self._jobs [backref] = Hashjob ( filepath, hashdict )
+ # --- end of add (...) ---
+
+ def run ( self ):
+ #with concurrent.futures.ProcessPoolExecutor ( self.max_workers ) as exe:
+ with concurrent.futures.ThreadPoolExecutor ( self.max_workers ) as exe:
+ running_jobs = frozenset (
+ exe.submit ( _calculate_hashes, job, self.hashes )
+ for job in self._jobs.values()
+ )
+
+ # wait
+ for finished_job in concurrent.futures.as_completed ( running_jobs ):
+ if finished_job.exception() is not None:
+ raise finished_job.exception()
+ # --- end of run (...) ---
+
+ def reset ( self ):
+ self._jobs.clear()
+ # --- end of reset (...) ---
+
+ def get ( self, backref ):
+ return self._jobs [backref].hashdict
+ # --- end of get (...) ---