All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements
@ 2015-06-23 16:49 Aníbal Limón
  2015-06-23 16:49 ` [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool Aníbal Limón
  2015-06-23 16:49 ` [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker Aníbal Limón
  0 siblings, 2 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
  To: openembedded-core

The following changes since commit 6f4304e36df7e416643dd4a7ee3de096f21f1020:

  bitbake: runqueue: Sanity check BB_NUMBER_THREADS (2015-06-23 11:57:58 +0100)

are available in the git repository at:

  git://git.yoctoproject.org/poky-contrib alimon/oe-utils
  http://git.yoctoproject.org/cgit.cgi/poky-contrib/log/?h=alimon/oe-utils

Aníbal Limón (2):
  oe/utils.py: Fix thread leakage in ThreadPool
  oe/utils.py: Add support for init/end helper functions in
    ThreadWorker.

 meta/classes/sstate.bbclass |  5 +++--
 meta/lib/oe/utils.py        | 39 +++++++++++++++++++++++++++++++--------
 2 files changed, 34 insertions(+), 10 deletions(-)

-- 
1.9.1



^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool
  2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
@ 2015-06-23 16:49 ` Aníbal Limón
  2015-06-23 16:49 ` [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker Aníbal Limón
  1 sibling, 0 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
  To: openembedded-core

In order to fix Thread leakage caused by not call join() in Threads,

Pass num_tasks in ThreadPool for add all the tasks into a Queue this
enable catch of Queue.Empty exception and exit the threads.

classes/sstate.bbclass: Change checkstatus function to match new
ThreadPool operation.

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
---
 meta/classes/sstate.bbclass |  3 ++-
 meta/lib/oe/utils.py        | 26 ++++++++++++++++++++------
 2 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1e5e98a..a80d1ce 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
             bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist))
             import multiprocessing
             nproc = min(multiprocessing.cpu_count(), len(tasklist))
-            pool = oe.utils.ThreadedPool(nproc)
+            pool = oe.utils.ThreadedPool(nproc, len(tasklist))
             for t in tasklist:
                 pool.add_task(checkstatus, t)
+            pool.start()
             pool.wait_completion()
 
     inheritlist = d.getVar("INHERIT", True)
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 0de8800..f0d3c14 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -222,11 +222,16 @@ class ThreadedWorker(Thread):
         Thread.__init__(self)
         self.tasks = tasks
         self.daemon = True
-        self.start()
 
     def run(self):
+        from Queue import Empty
+
         while True:
-            func, args, kargs = self.tasks.get()
+            try:
+                func, args, kargs = self.tasks.get(block=False)
+            except Empty:
+                break
+
             try:
                 func(*args, **kargs)
             except Exception, e:
@@ -236,9 +241,17 @@ class ThreadedWorker(Thread):
 
 class ThreadedPool:
     """Pool of threads consuming tasks from a queue"""
-    def __init__(self, num_threads):
-        self.tasks = Queue(num_threads)
-        for _ in range(num_threads): ThreadedWorker(self.tasks)
+    def __init__(self, num_workers, num_tasks):
+        self.tasks = Queue(num_tasks)
+        self.workers = []
+
+        for _ in range(num_workers):
+            worker = ThreadedWorker(self.tasks)
+            self.workers.append(worker)
+
+    def start(self):
+        for worker in self.workers:
+            worker.start()
 
     def add_task(self, func, *args, **kargs):
         """Add a task to the queue"""
@@ -247,4 +260,5 @@ class ThreadedPool:
     def wait_completion(self):
         """Wait for completion of all the tasks in the queue"""
         self.tasks.join()
-
+        for worker in self.workers:
+            worker.join()
-- 
1.9.1



^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker.
  2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
  2015-06-23 16:49 ` [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool Aníbal Limón
@ 2015-06-23 16:49 ` Aníbal Limón
  1 sibling, 0 replies; 3+ messages in thread
From: Aníbal Limón @ 2015-06-23 16:49 UTC (permalink / raw)
  To: openembedded-core

Add init/end helper functions for ThreadWorker also pass ThreadWorker
as first argument to init/end/func functions this enables per-thread
storage handling.

classes/sstate.bbclass: Add thread_worker argument to checkstatus
function.

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
---
 meta/classes/sstate.bbclass |  2 +-
 meta/lib/oe/utils.py        | 17 +++++++++++++----
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index a80d1ce..1e2d4f6 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -739,7 +739,7 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
         if localdata.getVar('BB_NO_NETWORK', True) == "1" and localdata.getVar('SSTATE_MIRROR_ALLOW_NETWORK', True) == "1":
             localdata.delVar('BB_NO_NETWORK')
 
-        def checkstatus(arg):
+        def checkstatus(thread_worker, arg):
             (task, sstatefile) = arg
 
             localdata2 = bb.data.createCopy(localdata)
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index f0d3c14..cee087f 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -218,22 +218,30 @@ from threading import Thread
 
 class ThreadedWorker(Thread):
     """Thread executing tasks from a given tasks queue"""
-    def __init__(self, tasks):
+    def __init__(self, tasks, worker_init, worker_end):
         Thread.__init__(self)
         self.tasks = tasks
         self.daemon = True
 
+        self.worker_init = worker_init
+        self.worker_end = worker_end
+
     def run(self):
         from Queue import Empty
 
+        if self.worker_init is not None:
+            self.worker_init(self)
+
         while True:
             try:
                 func, args, kargs = self.tasks.get(block=False)
             except Empty:
+                if self.worker_end is not None:
+                    self.worker_end(self)
                 break
 
             try:
-                func(*args, **kargs)
+                func(self, *args, **kargs)
             except Exception, e:
                 print e
             finally:
@@ -241,12 +249,13 @@ class ThreadedWorker(Thread):
 
 class ThreadedPool:
     """Pool of threads consuming tasks from a queue"""
-    def __init__(self, num_workers, num_tasks):
+    def __init__(self, num_workers, num_tasks, worker_init=None,
+            worker_end=None):
         self.tasks = Queue(num_tasks)
         self.workers = []
 
         for _ in range(num_workers):
-            worker = ThreadedWorker(self.tasks)
+            worker = ThreadedWorker(self.tasks, worker_init, worker_end)
             self.workers.append(worker)
 
     def start(self):
-- 
1.9.1



^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2015-06-23 16:48 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-06-23 16:49 [PATCH 0/2] oe/utils.py: ThreadPool fixes and improvements Aníbal Limón
2015-06-23 16:49 ` [PATCH 1/2] oe/utils.py: Fix thread leakage in ThreadPool Aníbal Limón
2015-06-23 16:49 ` [PATCH 2/2] oe/utils.py: Add support for init/end helper functions in ThreadWorker Aníbal Limón

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.