* [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool
2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
@ 2015-06-23 16:49 ` Aníbal Limón
2015-06-23 16:49 ` [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker Aníbal Limón
1 sibling, 0 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
To: openembedded-core
In order to fix Thread leakage caused by not call join() in Threads,
Pass num_tasks in ThreadPool for add all the tasks into a Queue this
enable catch of Queue.Empty exception and exit the threads.
classes/sstate.bbclass: Change checkstatus function to match new
ThreadPool operation.
Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
---
meta/classes/sstate.bbclass | 3 ++-
meta/lib/oe/utils.py | 26 ++++++++++++++++++++------
2 files changed, 22 insertions(+), 7 deletions(-)
diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1e5e98a..a80d1ce 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist))
import multiprocessing
nproc = min(multiprocessing.cpu_count(), len(tasklist))
- pool = oe.utils.ThreadedPool(nproc)
+ pool = oe.utils.ThreadedPool(nproc, len(tasklist))
for t in tasklist:
pool.add_task(checkstatus, t)
+ pool.start()
pool.wait_completion()
inheritlist = d.getVar("INHERIT", True)
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 0de8800..f0d3c14 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -222,11 +222,16 @@ class ThreadedWorker(Thread):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
- self.start()
def run(self):
+ from Queue import Empty
+
while True:
- func, args, kargs = self.tasks.get()
+ try:
+ func, args, kargs = self.tasks.get(block=False)
+ except Empty:
+ break
+
try:
func(*args, **kargs)
except Exception, e:
@@ -236,9 +241,17 @@ class ThreadedWorker(Thread):
class ThreadedPool:
"""Pool of threads consuming tasks from a queue"""
- def __init__(self, num_threads):
- self.tasks = Queue(num_threads)
- for _ in range(num_threads): ThreadedWorker(self.tasks)
+ def __init__(self, num_workers, num_tasks):
+ self.tasks = Queue(num_tasks)
+ self.workers = []
+
+ for _ in range(num_workers):
+ worker = ThreadedWorker(self.tasks)
+ self.workers.append(worker)
+
+ def start(self):
+ for worker in self.workers:
+ worker.start()
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
@@ -247,4 +260,5 @@ class ThreadedPool:
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
-
+ for worker in self.workers:
+ worker.join()
--
1.9.1
^ permalink raw reply related [flat|nested] 3+ messages in thread* [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker.
2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
2015-06-23 16:49 ` [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool Aníbal Limón
@ 2015-06-23 16:49 ` Aníbal Limón
1 sibling, 0 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
To: openembedded-core
Add init/end helper functions for ThreadWorker also pass ThreadWorker
as first argument to init/end/func functions this enables per-thread
storage handling.
classes/sstate.bbclass: Add thread_worker argument to checkstatus
function.
Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
---
meta/classes/sstate.bbclass | 2 +-
meta/lib/oe/utils.py | 17 +++++++++++++----
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index a80d1ce..1e2d4f6 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -739,7 +739,7 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
if localdata.getVar('BB_NO_NETWORK', True) == "1" and localdata.getVar('SSTATE_MIRROR_ALLOW_NETWORK', True) == "1":
localdata.delVar('BB_NO_NETWORK')
- def checkstatus(arg):
+ def checkstatus(thread_worker, arg):
(task, sstatefile) = arg
localdata2 = bb.data.createCopy(localdata)
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index f0d3c14..cee087f 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -218,22 +218,30 @@ from threading import Thread
class ThreadedWorker(Thread):
"""Thread executing tasks from a given tasks queue"""
- def __init__(self, tasks):
+ def __init__(self, tasks, worker_init, worker_end):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
+ self.worker_init = worker_init
+ self.worker_end = worker_end
+
def run(self):
from Queue import Empty
+ if self.worker_init is not None:
+ self.worker_init(self)
+
while True:
try:
func, args, kargs = self.tasks.get(block=False)
except Empty:
+ if self.worker_end is not None:
+ self.worker_end(self)
break
try:
- func(*args, **kargs)
+ func(self, *args, **kargs)
except Exception, e:
print e
finally:
@@ -241,12 +249,13 @@ class ThreadedWorker(Thread):
class ThreadedPool:
"""Pool of threads consuming tasks from a queue"""
- def __init__(self, num_workers, num_tasks):
+ def __init__(self, num_workers, num_tasks, worker_init=None,
+ worker_end=None):
self.tasks = Queue(num_tasks)
self.workers = []
for _ in range(num_workers):
- worker = ThreadedWorker(self.tasks)
+ worker = ThreadedWorker(self.tasks, worker_init, worker_end)
self.workers.append(worker)
def start(self):
--
1.9.1
^ permalink raw reply related [flat|nested] 3+ messages in thread