* [bitbake][scarthgap][2.8][PATCH 0/8] Patch review
@ 2024-06-01 12:27 Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 1/8] runqueue: Add timing warnings around slow loops Steve Sakoman
` (7 more replies)
0 siblings, 8 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
Please review this set of changes for 2.8/scarthgap and have comments back by
end of day Tuesday, June 4
Passed a-full on autobuilder:
https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/6993
with two exceptions, the first a known reproducibility issue also present
on master:
https://bugzilla.yoctoproject.org/show_bug.cgi?id=15491
and the second is a failure on meta-agl-core, which will require an update
to the ptest-runner override in meta-agl once "ptest-runner: Bump to 2.4.4 (95f528c)"
merges.
The following changes since commit b5159c0373e2e7d403aed16e096ad655f38b1fa7:
fetch2/gcp: Add missing runfetchcmd import (2024-05-28 15:27:08 +0100)
are available in the Git repository at:
https://git.openembedded.org/bitbake-contrib stable/2.8-nut
https://git.openembedded.org/bitbake-contrib/log/?h=stable/2.8-nut
Joshua Watt (3):
bb: Use namedtuple for Task data
hashserv: client: Add batch stream API
siggen: Enable batching of unihash queries
Richard Purdie (4):
runqueue: Add timing warnings around slow loops
runqueue: Allow rehash loop to exit in case of interrupts
runqueue: Process unihashes in parallel at init
runqueue: Improve rehash get_unihash parallelism
joshua Watt (1):
siggen/runqueue: Report which dependencies affect the taskhash
lib/bb/__init__.py | 12 +++++
lib/bb/cooker.py | 1 -
lib/bb/runqueue.py | 99 +++++++++++++++++++++++++++-----------
lib/bb/siggen.py | 11 +++--
lib/hashserv/client.py | 106 +++++++++++++++++++++++++++++++++++++----
lib/hashserv/tests.py | 75 +++++++++++++++++++++++++++++
6 files changed, 263 insertions(+), 41 deletions(-)
--
2.34.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 1/8] runqueue: Add timing warnings around slow loops
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 2/8] runqueue: Allow rehash loop to exit in case of interrupts Steve Sakoman
` (6 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: Richard Purdie <richard.purdie@linuxfoundation.org>
With hashserve enabled, there are two slow paths/loops, one at initial runqueue
generation and also during the rehash process when new outhashes are found.
Add timing information at the hashserve log level for when these loops
take longer than 30s or 60s overall. This will leave evidence in the logs when
things are running particularly slowly.
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/bb/runqueue.py | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index bc7e18175..beec1e046 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1273,6 +1273,9 @@ class RunQueueData:
bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
+ starttime = time.time()
+ lasttime = starttime
+
# Iterate over the task list and call into the siggen code
dealtwith = set()
todeal = set(self.runtaskentries)
@@ -1284,6 +1287,14 @@ class RunQueueData:
self.prepare_task_hash(tid)
bb.event.check_for_interrupts(self.cooker.data)
+ if time.time() > (lasttime + 30):
+ lasttime = time.time()
+ hashequiv_logger.verbose("Initial setup loop progress: %s of %s in %s" % (len(todeal), len(self.runtaskentries), lasttime - starttime))
+
+ endtime = time.time()
+ if (endtime-starttime > 60):
+ hashequiv_logger.verbose("Initial setup loop took: %s" % (endtime-starttime))
+
bb.parse.siggen.writeout_file_checksum_cache()
#self.dump_data()
@@ -2556,6 +2567,9 @@ class RunQueueExecute:
elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
next.add(p)
+ starttime = time.time()
+ lasttime = starttime
+
# When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
while next:
current = next.copy()
@@ -2588,6 +2602,14 @@ class RunQueueExecute:
total.remove(tid)
next.intersection_update(total)
+ if time.time() > (lasttime + 30):
+ lasttime = time.time()
+ hashequiv_logger.verbose("Rehash loop slow progress: %s in %s" % (len(total), lasttime - starttime))
+
+ endtime = time.time()
+ if (endtime-starttime > 60):
+ hashequiv_logger.verbose("Rehash loop took more than 60s: %s" % (endtime-starttime))
+
if changed:
for mc in self.rq.worker:
RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 2/8] runqueue: Allow rehash loop to exit in case of interrupts
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 1/8] runqueue: Add timing warnings around slow loops Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 3/8] siggen/runqueue: Report which dependencies affect the taskhash Steve Sakoman
` (5 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: Richard Purdie <richard.purdie@linuxfoundation.org>
The initial hash serve loop exits in the case where interrupts are present
but probably checks a bit too often. Tweak that and also allow the slow
rehash loop to break on interrupt, improving bitbake Ctrl+C response.
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/bb/runqueue.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index beec1e046..85c3581f9 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1285,7 +1285,8 @@ class RunQueueData:
dealtwith.add(tid)
todeal.remove(tid)
self.prepare_task_hash(tid)
- bb.event.check_for_interrupts(self.cooker.data)
+
+ bb.event.check_for_interrupts(self.cooker.data)
if time.time() > (lasttime + 30):
lasttime = time.time()
@@ -2601,6 +2602,7 @@ class RunQueueExecute:
next |= self.rqdata.runtaskentries[tid].revdeps
total.remove(tid)
next.intersection_update(total)
+ bb.event.check_for_interrupts(self.cooker.data)
if time.time() > (lasttime + 30):
lasttime = time.time()
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 3/8] siggen/runqueue: Report which dependencies affect the taskhash
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 1/8] runqueue: Add timing warnings around slow loops Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 2/8] runqueue: Allow rehash loop to exit in case of interrupts Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 4/8] runqueue: Process unihashes in parallel at init Steve Sakoman
` (4 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: joshua Watt <JPEWhacker@gmail.com>
Report which task dependencies in BB_TASKDEPDATA are included in the
taskhash. This allows tasks to identify which tasks dependencies may
change without the task re-running. Knowing this information is
important for tasks that want to transfer information from dependencies
(such as SPDX)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/bb/runqueue.py | 8 +++++---
lib/bb/siggen.py | 2 +-
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 85c3581f9..84a6f4172 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1302,7 +1302,7 @@ class RunQueueData:
return len(self.runtaskentries)
def prepare_task_hash(self, tid):
- bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
+ self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
@@ -2457,7 +2457,8 @@ class RunQueueExecute:
unihash = self.rqdata.runtaskentries[task].unihash
deps = self.filtermcdeps(task, mc, deps)
hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
- taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
+ taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps
+ taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn, taskhash_deps]
self.taskdepdata_cache = taskdepdata_cache
@@ -2836,7 +2837,8 @@ class RunQueueExecute:
taskhash = self.rqdata.runtaskentries[revdep].hash
unihash = self.rqdata.runtaskentries[revdep].unihash
hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
- taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
+ taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps
+ taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn, taskhash_deps]
for revdep2 in deps:
if revdep2 not in taskdepdata:
additional.append(revdep2)
diff --git a/lib/bb/siggen.py b/lib/bb/siggen.py
index 8ab08ec96..03dfda6f3 100644
--- a/lib/bb/siggen.py
+++ b/lib/bb/siggen.py
@@ -381,7 +381,7 @@ class SignatureGeneratorBasic(SignatureGenerator):
self.taints[tid] = taint
logger.warning("%s is tainted from a forced run" % tid)
- return
+ return set(dep for _, dep in self.runtaskdeps[tid])
def get_taskhash(self, tid, deps, dataCaches):
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 4/8] runqueue: Process unihashes in parallel at init
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
` (2 preceding siblings ...)
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 3/8] siggen/runqueue: Report which dependencies affect the taskhash Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 5/8] runqueue: Improve rehash get_unihash parallelism Steve Sakoman
` (3 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: Richard Purdie <richard.purdie@linuxfoundation.org>
Improve the runqueue init code to call unihash queries in parallel since
this is faster and more efficient, particularly on slower links with longer
round trip times.
The call to the function from cooker is unneeded since that function calls
prepare() and hence this functionality will already have run, so drop
that obsolete call.
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/bb/cooker.py | 1 -
lib/bb/runqueue.py | 18 ++++++++++--------
2 files changed, 10 insertions(+), 9 deletions(-)
diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
index 939a99997..6318ef4a8 100644
--- a/lib/bb/cooker.py
+++ b/lib/bb/cooker.py
@@ -1459,7 +1459,6 @@ class BBCooker:
if t in task or getAllTaskSignatures:
try:
- rq.rqdata.prepare_task_hash(tid)
sig.append([pn, t, rq.rqdata.get_task_unihash(tid)])
except KeyError:
sig.append(self.getTaskSignatures(target, [t])[0])
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 84a6f4172..999868dd7 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1280,11 +1280,18 @@ class RunQueueData:
dealtwith = set()
todeal = set(self.runtaskentries)
while todeal:
+ ready = set()
for tid in todeal.copy():
if not (self.runtaskentries[tid].depends - dealtwith):
- dealtwith.add(tid)
- todeal.remove(tid)
- self.prepare_task_hash(tid)
+ self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
+ # get_taskhash for a given tid *must* be called before get_unihash* below
+ self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
+ ready.add(tid)
+ unihashes = bb.parse.siggen.get_unihashes(ready)
+ for tid in ready:
+ dealtwith.add(tid)
+ todeal.remove(tid)
+ self.runtaskentries[tid].unihash = unihashes[tid]
bb.event.check_for_interrupts(self.cooker.data)
@@ -1301,11 +1308,6 @@ class RunQueueData:
#self.dump_data()
return len(self.runtaskentries)
- def prepare_task_hash(self, tid):
- self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
- self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
- self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
-
def dump_data(self):
"""
Dump some debug information on the internal data structures
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 5/8] runqueue: Improve rehash get_unihash parallelism
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
` (3 preceding siblings ...)
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 4/8] runqueue: Process unihashes in parallel at init Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 6/8] bb: Use namedtuple for Task data Steve Sakoman
` (2 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: Richard Purdie <richard.purdie@linuxfoundation.org>
Improve the rehash code to query unihashes in parallel since this is more
efficient on slower links.
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/bb/runqueue.py | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 999868dd7..47f48304e 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -2578,13 +2578,21 @@ class RunQueueExecute:
while next:
current = next.copy()
next = set()
+ ready = {}
for tid in current:
if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
continue
+ # get_taskhash for a given tid *must* be called before get_unihash* below
+ ready[tid] = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches)
+
+ unihashes = bb.parse.siggen.get_unihashes(ready.keys())
+
+ for tid in ready:
orighash = self.rqdata.runtaskentries[tid].hash
- newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches)
+ newhash = ready[tid]
origuni = self.rqdata.runtaskentries[tid].unihash
- newuni = bb.parse.siggen.get_unihash(tid)
+ newuni = unihashes[tid]
+
# FIXME, need to check it can come from sstate at all for determinism?
remapped = False
if newuni == origuni:
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 6/8] bb: Use namedtuple for Task data
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
` (4 preceding siblings ...)
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 5/8] runqueue: Improve rehash get_unihash parallelism Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 7/8] hashserv: client: Add batch stream API Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 8/8] siggen: Enable batching of unihash queries Steve Sakoman
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: Joshua Watt <JPEWhacker@gmail.com>
Task dependency data is becoming unwieldy with the number of indices it
contains. Convert it to use a named tuple instead, which allows members
to be indexed by a named property or an index (which allows it to retain
backward compatibility).
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/bb/__init__.py | 12 ++++++++++++
lib/bb/runqueue.py | 45 +++++++++++++++++++++++++++------------------
2 files changed, 39 insertions(+), 18 deletions(-)
diff --git a/lib/bb/__init__.py b/lib/bb/__init__.py
index eef45fe4e..cdec9e4d6 100644
--- a/lib/bb/__init__.py
+++ b/lib/bb/__init__.py
@@ -36,6 +36,7 @@ class BBHandledException(Exception):
import os
import logging
+from collections import namedtuple
class NullHandler(logging.Handler):
@@ -227,3 +228,14 @@ def deprecate_import(current, modulename, fromlist, renames = None):
setattr(sys.modules[current], newname, newobj)
+TaskData = namedtuple("TaskData", [
+ "pn",
+ "taskname",
+ "fn",
+ "deps",
+ "provides",
+ "taskhash",
+ "unihash",
+ "hashfn",
+ "taskhash_deps",
+])
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 47f48304e..93079a977 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -2452,15 +2452,17 @@ class RunQueueExecute:
taskdepdata_cache = {}
for task in self.rqdata.runtaskentries:
(mc, fn, taskname, taskfn) = split_tid_mcfn(task)
- pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
- deps = self.rqdata.runtaskentries[task].depends
- provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
- taskhash = self.rqdata.runtaskentries[task].hash
- unihash = self.rqdata.runtaskentries[task].unihash
- deps = self.filtermcdeps(task, mc, deps)
- hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
- taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps
- taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn, taskhash_deps]
+ taskdepdata_cache[task] = bb.TaskData(
+ pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn],
+ taskname = taskname,
+ fn = fn,
+ deps = self.filtermcdeps(task, mc, self.rqdata.runtaskentries[task].depends),
+ provides = self.rqdata.dataCaches[mc].fn_provides[taskfn],
+ taskhash = self.rqdata.runtaskentries[task].hash,
+ unihash = self.rqdata.runtaskentries[task].unihash,
+ hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn],
+ taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps,
+ )
self.taskdepdata_cache = taskdepdata_cache
@@ -2475,9 +2477,11 @@ class RunQueueExecute:
while next:
additional = []
for revdep in next:
- self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash
+ self.taskdepdata_cache[revdep] = self.taskdepdata_cache[revdep]._replace(
+ unihash=self.rqdata.runtaskentries[revdep].unihash
+ )
taskdepdata[revdep] = self.taskdepdata_cache[revdep]
- for revdep2 in self.taskdepdata_cache[revdep][3]:
+ for revdep2 in self.taskdepdata_cache[revdep].deps:
if revdep2 not in taskdepdata:
additional.append(revdep2)
next = additional
@@ -2841,14 +2845,19 @@ class RunQueueExecute:
additional = []
for revdep in next:
(mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
- pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
deps = getsetscenedeps(revdep)
- provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
- taskhash = self.rqdata.runtaskentries[revdep].hash
- unihash = self.rqdata.runtaskentries[revdep].unihash
- hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
- taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps
- taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn, taskhash_deps]
+
+ taskdepdata[revdep] = bb.TaskData(
+ pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn],
+ taskname = taskname,
+ fn = fn,
+ deps = deps,
+ provides = self.rqdata.dataCaches[mc].fn_provides[taskfn],
+ taskhash = self.rqdata.runtaskentries[revdep].hash,
+ unihash = self.rqdata.runtaskentries[revdep].unihash,
+ hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn],
+ taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps,
+ )
for revdep2 in deps:
if revdep2 not in taskdepdata:
additional.append(revdep2)
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 7/8] hashserv: client: Add batch stream API
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
` (5 preceding siblings ...)
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 6/8] bb: Use namedtuple for Task data Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 8/8] siggen: Enable batching of unihash queries Steve Sakoman
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: Joshua Watt <JPEWhacker@gmail.com>
Changes the stream mode to do "batch" processing. This means that the
sending and reciving of messages is done simultaneously so that messages
can be sent as fast as possible without having to wait for each reply.
This allows multiple messages to be in flight at once, reducing the
effect of the round trip latency from the server.
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/hashserv/client.py | 106 +++++++++++++++++++++++++++++++++++++----
lib/hashserv/tests.py | 75 +++++++++++++++++++++++++++++
2 files changed, 172 insertions(+), 9 deletions(-)
diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
index 0b254bedd..775faf935 100644
--- a/lib/hashserv/client.py
+++ b/lib/hashserv/client.py
@@ -5,6 +5,7 @@
import logging
import socket
+import asyncio
import bb.asyncrpc
import json
from . import create_async_client
@@ -13,6 +14,66 @@ from . import create_async_client
logger = logging.getLogger("hashserv.client")
+class Batch(object):
+ def __init__(self):
+ self.done = False
+ self.cond = asyncio.Condition()
+ self.pending = []
+ self.results = []
+ self.sent_count = 0
+
+ async def recv(self, socket):
+ while True:
+ async with self.cond:
+ await self.cond.wait_for(lambda: self.pending or self.done)
+
+ if not self.pending:
+ if self.done:
+ return
+ continue
+
+ r = await socket.recv()
+ self.results.append(r)
+
+ async with self.cond:
+ self.pending.pop(0)
+
+ async def send(self, socket, msgs):
+ try:
+ # In the event of a restart due to a reconnect, all in-flight
+ # messages need to be resent first to keep to result count in sync
+ for m in self.pending:
+ await socket.send(m)
+
+ for m in msgs:
+ # Add the message to the pending list before attempting to send
+ # it so that if the send fails it will be retried
+ async with self.cond:
+ self.pending.append(m)
+ self.cond.notify()
+ self.sent_count += 1
+
+ await socket.send(m)
+
+ finally:
+ async with self.cond:
+ self.done = True
+ self.cond.notify()
+
+ async def process(self, socket, msgs):
+ await asyncio.gather(
+ self.recv(socket),
+ self.send(socket, msgs),
+ )
+
+ if len(self.results) != self.sent_count:
+ raise ValueError(
+ f"Expected result count {len(self.results)}. Expected {self.sent_count}"
+ )
+
+ return self.results
+
+
class AsyncClient(bb.asyncrpc.AsyncClient):
MODE_NORMAL = 0
MODE_GET_STREAM = 1
@@ -36,11 +97,27 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
if become:
await self.become_user(become)
- async def send_stream(self, mode, msg):
+ async def send_stream_batch(self, mode, msgs):
+ """
+ Does a "batch" process of stream messages. This sends the query
+ messages as fast as possible, and simultaneously attempts to read the
+ messages back. This helps to mitigate the effects of latency to the
+ hash equivalence server be allowing multiple queries to be "in-flight"
+ at once
+
+ The implementation does more complicated tracking using a count of sent
+ messages so that `msgs` can be a generator function (i.e. its length is
+ unknown)
+
+ """
+
+ b = Batch()
+
async def proc():
+ nonlocal b
+
await self._set_mode(mode)
- await self.socket.send(msg)
- return await self.socket.recv()
+ return await b.process(self.socket, msgs)
return await self._send_wrapper(proc)
@@ -89,10 +166,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
self.mode = new_mode
async def get_unihash(self, method, taskhash):
- r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash))
- if not r:
- return None
- return r
+ r = await self.get_unihash_batch([(method, taskhash)])
+ return r[0]
+
+ async def get_unihash_batch(self, args):
+ result = await self.send_stream_batch(
+ self.MODE_GET_STREAM,
+ (f"{method} {taskhash}" for method, taskhash in args),
+ )
+ return [r if r else None for r in result]
async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
m = extra.copy()
@@ -115,8 +197,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
)
async def unihash_exists(self, unihash):
- r = await self.send_stream(self.MODE_EXIST_STREAM, unihash)
- return r == "true"
+ r = await self.unihash_exists_batch([unihash])
+ return r[0]
+
+ async def unihash_exists_batch(self, unihashes):
+ result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes)
+ return [r == "true" for r in result]
async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
return await self.invoke(
@@ -237,10 +323,12 @@ class Client(bb.asyncrpc.Client):
"connect_tcp",
"connect_websocket",
"get_unihash",
+ "get_unihash_batch",
"report_unihash",
"report_unihash_equiv",
"get_taskhash",
"unihash_exists",
+ "unihash_exists_batch",
"get_outhash",
"get_stats",
"reset_stats",
diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
index 0809453cf..5349cd586 100644
--- a/lib/hashserv/tests.py
+++ b/lib/hashserv/tests.py
@@ -594,6 +594,43 @@ class HashEquivalenceCommonTests(object):
7: None,
})
+ def test_get_unihash_batch(self):
+ TEST_INPUT = (
+ # taskhash outhash unihash
+ ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
+ # Duplicated taskhash with multiple output hashes and unihashes.
+ ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
+ # Equivalent hash
+ ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
+ ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
+ ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
+ ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
+ ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
+ )
+ EXTRA_QUERIES = (
+ "6b6be7a84ab179b4240c4302518dc3f6",
+ )
+
+ for taskhash, outhash, unihash in TEST_INPUT:
+ self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
+
+
+ result = self.client.get_unihash_batch(
+ [(self.METHOD, data[0]) for data in TEST_INPUT] +
+ [(self.METHOD, e) for e in EXTRA_QUERIES]
+ )
+
+ self.assertListEqual(result, [
+ "218e57509998197d570e2c98512d0105985dffc9",
+ "218e57509998197d570e2c98512d0105985dffc9",
+ "218e57509998197d570e2c98512d0105985dffc9",
+ "3b5d3d83f07f259e9086fcb422c855286e18a57d",
+ "f46d3fbb439bd9b921095da657a4de906510d2cd",
+ "f46d3fbb439bd9b921095da657a4de906510d2cd",
+ "05d2a63c81e32f0a36542ca677e8ad852365c538",
+ None,
+ ])
+
def test_client_pool_unihash_exists(self):
TEST_INPUT = (
# taskhash outhash unihash
@@ -636,6 +673,44 @@ class HashEquivalenceCommonTests(object):
result = client_pool.unihashes_exist(query)
self.assertDictEqual(result, expected)
+ def test_unihash_exists_batch(self):
+ TEST_INPUT = (
+ # taskhash outhash unihash
+ ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
+ # Duplicated taskhash with multiple output hashes and unihashes.
+ ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
+ # Equivalent hash
+ ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
+ ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
+ ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
+ ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
+ ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
+ )
+ EXTRA_QUERIES = (
+ "6b6be7a84ab179b4240c4302518dc3f6",
+ )
+
+ result_unihashes = set()
+
+
+ for taskhash, outhash, unihash in TEST_INPUT:
+ result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
+ result_unihashes.add(result["unihash"])
+
+ query = []
+ expected = []
+
+ for _, _, unihash in TEST_INPUT:
+ query.append(unihash)
+ expected.append(unihash in result_unihashes)
+
+
+ for unihash in EXTRA_QUERIES:
+ query.append(unihash)
+ expected.append(False)
+
+ result = self.client.unihash_exists_batch(query)
+ self.assertListEqual(result, expected)
def test_auth_read_perms(self):
admin_client = self.start_auth_server()
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [bitbake][scarthgap][2.8][PATCH 8/8] siggen: Enable batching of unihash queries
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
` (6 preceding siblings ...)
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 7/8] hashserv: client: Add batch stream API Steve Sakoman
@ 2024-06-01 12:27 ` Steve Sakoman
7 siblings, 0 replies; 9+ messages in thread
From: Steve Sakoman @ 2024-06-01 12:27 UTC (permalink / raw)
To: bitbake-devel
From: Joshua Watt <JPEWhacker@gmail.com>
Uses the batching API of the client to reduce the effect of latency when
making multiple queries to the server
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
lib/bb/siggen.py | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/lib/bb/siggen.py b/lib/bb/siggen.py
index 03dfda6f3..65ca0811d 100644
--- a/lib/bb/siggen.py
+++ b/lib/bb/siggen.py
@@ -726,10 +726,13 @@ class SignatureGeneratorUniHashMixIn(object):
return result
if self.max_parallel <= 1 or len(queries) <= 1:
- # No parallelism required. Make the query serially with the single client
+ # No parallelism required. Make the query using a single client
with self.client() as client:
- for tid, args in queries.items():
- query_result[tid] = client.get_unihash(*args)
+ keys = list(queries.keys())
+ unihashes = client.get_unihash_batch(queries[k] for k in keys)
+
+ for idx, k in enumerate(keys):
+ query_result[k] = unihashes[idx]
else:
with self.client_pool() as client_pool:
query_result = client_pool.get_unihashes(queries)
--
2.34.1
^ permalink raw reply related [flat|nested] 9+ messages in thread
end of thread, other threads:[~2024-06-01 12:27 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-06-01 12:27 [bitbake][scarthgap][2.8][PATCH 0/8] Patch review Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 1/8] runqueue: Add timing warnings around slow loops Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 2/8] runqueue: Allow rehash loop to exit in case of interrupts Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 3/8] siggen/runqueue: Report which dependencies affect the taskhash Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 4/8] runqueue: Process unihashes in parallel at init Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 5/8] runqueue: Improve rehash get_unihash parallelism Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 6/8] bb: Use namedtuple for Task data Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 7/8] hashserv: client: Add batch stream API Steve Sakoman
2024-06-01 12:27 ` [bitbake][scarthgap][2.8][PATCH 8/8] siggen: Enable batching of unihash queries Steve Sakoman
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.