Openembedded Core Discussions
 help / color / mirror / Atom feed
* [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements
@ 2015-06-23 16:49 Aníbal Limón
  2015-06-23 16:49 ` [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool Aníbal Limón
  2015-06-23 16:49 ` [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker Aníbal Limón
  0 siblings, 2 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
  To: openembedded-core

The following changes since commit 6f4304e36df7e416643dd4a7ee3de096f21f1020:

  bitbake: runqueue: Sanity check BB_NUMBER_THREADS (2015-06-23 11:57:58 +0100)

are available in the git repository at:

  git://git.yoctoproject.org/poky-contrib alimon/oe-utils
  http://git.yoctoproject.org/cgit.cgi/poky-contrib/log/?h=alimon/oe-utils

Aníbal Limón (2):
  oe/utils.py: Fix thread leakage in ThreadPool
  oe/utils.py: Add support for init/end helper functions in
    ThreadWorker.

 meta/classes/sstate.bbclass |  5 +++--
 meta/lib/oe/utils.py        | 39 +++++++++++++++++++++++++++++++--------
 2 files changed, 34 insertions(+), 10 deletions(-)

-- 
1.9.1



^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool
  2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
@ 2015-06-23 16:49 ` Aníbal Limón
  2015-06-23 16:49 ` [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker Aníbal Limón
  1 sibling, 0 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
  To: openembedded-core

In order to fix Thread leakage caused by not call join() in Threads,

Pass num_tasks in ThreadPool for add all the tasks into a Queue this
enable catch of Queue.Empty exception and exit the threads.

classes/sstate.bbclass: Change checkstatus function to match new
ThreadPool operation.

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
---
 meta/classes/sstate.bbclass |  3 ++-
 meta/lib/oe/utils.py        | 26 ++++++++++++++++++++------
 2 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1e5e98a..a80d1ce 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
             bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist))
             import multiprocessing
             nproc = min(multiprocessing.cpu_count(), len(tasklist))
-            pool = oe.utils.ThreadedPool(nproc)
+            pool = oe.utils.ThreadedPool(nproc, len(tasklist))
             for t in tasklist:
                 pool.add_task(checkstatus, t)
+            pool.start()
             pool.wait_completion()
 
     inheritlist = d.getVar("INHERIT", True)
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 0de8800..f0d3c14 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -222,11 +222,16 @@ class ThreadedWorker(Thread):
         Thread.__init__(self)
         self.tasks = tasks
         self.daemon = True
-        self.start()
 
     def run(self):
+        from Queue import Empty
+
         while True:
-            func, args, kargs = self.tasks.get()
+            try:
+                func, args, kargs = self.tasks.get(block=False)
+            except Empty:
+                break
+
             try:
                 func(*args, **kargs)
             except Exception, e:
@@ -236,9 +241,17 @@ class ThreadedWorker(Thread):
 
 class ThreadedPool:
     """Pool of threads consuming tasks from a queue"""
-    def __init__(self, num_threads):
-        self.tasks = Queue(num_threads)
-        for _ in range(num_threads): ThreadedWorker(self.tasks)
+    def __init__(self, num_workers, num_tasks):
+        self.tasks = Queue(num_tasks)
+        self.workers = []
+
+        for _ in range(num_workers):
+            worker = ThreadedWorker(self.tasks)
+            self.workers.append(worker)
+
+    def start(self):
+        for worker in self.workers:
+            worker.start()
 
     def add_task(self, func, *args, **kargs):
         """Add a task to the queue"""
@@ -247,4 +260,5 @@ class ThreadedPool:
     def wait_completion(self):
         """Wait for completion of all the tasks in the queue"""
         self.tasks.join()
-
+        for worker in self.workers:
+            worker.join()
-- 
1.9.1



^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker.
  2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
  2015-06-23 16:49 ` [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool Aníbal Limón
@ 2015-06-23 16:49 ` Aníbal Limón
  1 sibling, 0 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
  To: openembedded-core

Add init/end helper functions for ThreadWorker also pass ThreadWorker
as first argument to init/end/func functions this enables per-thread
storage handling.

classes/sstate.bbclass: Add thread_worker argument to checkstatus
function.

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
---
 meta/classes/sstate.bbclass |  2 +-
 meta/lib/oe/utils.py        | 17 +++++++++++++----
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index a80d1ce..1e2d4f6 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -739,7 +739,7 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
         if localdata.getVar('BB_NO_NETWORK', True) == "1" and localdata.getVar('SSTATE_MIRROR_ALLOW_NETWORK', True) == "1":
             localdata.delVar('BB_NO_NETWORK')
 
-        def checkstatus(arg):
+        def checkstatus(thread_worker, arg):
             (task, sstatefile) = arg
 
             localdata2 = bb.data.createCopy(localdata)
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index f0d3c14..cee087f 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -218,22 +218,30 @@ from threading import Thread
 
 class ThreadedWorker(Thread):
     """Thread executing tasks from a given tasks queue"""
-    def __init__(self, tasks):
+    def __init__(self, tasks, worker_init, worker_end):
         Thread.__init__(self)
         self.tasks = tasks
         self.daemon = True
 
+        self.worker_init = worker_init
+        self.worker_end = worker_end
+
     def run(self):
         from Queue import Empty
 
+        if self.worker_init is not None:
+            self.worker_init(self)
+
         while True:
             try:
                 func, args, kargs = self.tasks.get(block=False)
             except Empty:
+                if self.worker_end is not None:
+                    self.worker_end(self)
                 break
 
             try:
-                func(*args, **kargs)
+                func(self, *args, **kargs)
             except Exception, e:
                 print e
             finally:
@@ -241,12 +249,13 @@ class ThreadedWorker(Thread):
 
 class ThreadedPool:
     """Pool of threads consuming tasks from a queue"""
-    def __init__(self, num_workers, num_tasks):
+    def __init__(self, num_workers, num_tasks, worker_init=None,
+            worker_end=None):
         self.tasks = Queue(num_tasks)
         self.workers = []
 
         for _ in range(num_workers):
-            worker = ThreadedWorker(self.tasks)
+            worker = ThreadedWorker(self.tasks, worker_init, worker_end)
             self.workers.append(worker)
 
     def start(self):
-- 
1.9.1



^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2015-06-23 16:48 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
2015-06-23 16:49 ` [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool Aníbal Limón
2015-06-23 16:49 ` [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker Aníbal Limón

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox