* [bitbake-devel][PATCH 0/5] Implement parallel Query API
@ 2024-02-18 20:07 Joshua Watt
2024-02-18 20:07 ` [bitbake-devel][PATCH 1/5] hashserv: sqlalchemy: Use _execute() helper Joshua Watt
` (5 more replies)
0 siblings, 6 replies; 15+ messages in thread
From: Joshua Watt @ 2024-02-18 20:07 UTC (permalink / raw)
To: bitbake-devel; +Cc: Joshua Watt
Implements API to allows querying if a hash exists in the server, and to
do so using multiple connections to the server in parallel. This will be
used to query if an sstate object exists before actually looking for it
in the sstate cache, which is important for e.g. on a CDN where negative
lookups are expensive
Joshua Watt (5):
hashserv: sqlalchemy: Use _execute() helper
hashserv: Add unihash-exists API
asyncrpc: Add Client Pool object
hashserv: Add Client Pool
siggen: Add parallel query API
bitbake/bin/bitbake-hashclient | 13 ++
bitbake/lib/bb/asyncrpc/__init__.py | 2 +-
bitbake/lib/bb/asyncrpc/client.py | 77 +++++++
bitbake/lib/bb/siggen.py | 121 +++++++----
bitbake/lib/hashserv/client.py | 124 ++++++++++-
bitbake/lib/hashserv/server.py | 61 +++---
bitbake/lib/hashserv/sqlalchemy.py | 306 ++++++++++++++--------------
bitbake/lib/hashserv/sqlite.py | 16 ++
bitbake/lib/hashserv/tests.py | 122 +++++++++++
9 files changed, 618 insertions(+), 224 deletions(-)
--
2.34.1
^ permalink raw reply [flat|nested] 15+ messages in thread* [bitbake-devel][PATCH 1/5] hashserv: sqlalchemy: Use _execute() helper 2024-02-18 20:07 [bitbake-devel][PATCH 0/5] Implement parallel Query API Joshua Watt @ 2024-02-18 20:07 ` Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 2/5] hashserv: Add unihash-exists API Joshua Watt ` (4 subsequent siblings) 5 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 20:07 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Use the _execute() helper to execute queries. This helper does the logging of the statement that was being done manually everywhere. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/hashserv/sqlalchemy.py | 297 ++++++++++++++--------------- 1 file changed, 140 insertions(+), 157 deletions(-) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 89a6b86d9d8..873547809a0 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -233,124 +233,113 @@ class Database(object): return row.value async def get_unihash_by_taskhash_full(self, method, taskhash): - statement = ( - select( - OuthashesV2, - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.taskhash == taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2, + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.taskhash == taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_unihash_by_outhash(self, method, outhash): - statement = ( - select(OuthashesV2, UnihashesV3.unihash.label("unihash")) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2, UnihashesV3.unihash.label("unihash")) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_outhash(self, method, outhash): - statement = ( - select(OuthashesV2) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent_for_outhash(self, method, outhash, taskhash): - statement = ( - select( - OuthashesV2.taskhash.label("taskhash"), - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - OuthashesV2.taskhash != taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2.taskhash.label("taskhash"), + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + OuthashesV2.taskhash != taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent(self, method, taskhash): - statement = select( - UnihashesV3.unihash, - UnihashesV3.method, - UnihashesV3.taskhash, - ).where( - UnihashesV3.method == method, - UnihashesV3.taskhash == taskhash, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + UnihashesV3.unihash, + UnihashesV3.method, + UnihashesV3.taskhash, + ).where( + UnihashesV3.method == method, + UnihashesV3.taskhash == taskhash, + ) + ) return map_row(result.first()) async def remove(self, condition): async def do_remove(table): where = _make_condition_statement(table, condition) if where: - statement = delete(table).where(*where) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute(delete(table).where(*where)) return result.rowcount return 0 @@ -417,21 +406,21 @@ class Database(object): return result.rowcount async def clean_unused(self, oldest): - statement = delete(OuthashesV2).where( - OuthashesV2.created < oldest, - ~( - select(UnihashesV3.id) - .where( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ) - .limit(1) - .exists() - ), - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(OuthashesV2).where( + OuthashesV2.created < oldest, + ~( + select(UnihashesV3.id) + .where( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ) + .limit(1) + .exists() + ), + ) + ) return result.rowcount async def insert_unihash(self, method, taskhash, unihash): @@ -461,11 +450,9 @@ class Database(object): if "created" in data and not isinstance(data["created"], datetime): data["created"] = datetime.fromisoformat(data["created"]) - statement = insert(OuthashesV2).values(**data) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute(insert(OuthashesV2).values(**data)) return True except IntegrityError: self.logger.debug( @@ -474,16 +461,16 @@ class Database(object): return False async def _get_user(self, username): - statement = select( - Users.username, - Users.permissions, - Users.token, - ).where( - Users.username == username, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + Users.token, + ).where( + Users.username == username, + ) + ) return result.first() async def lookup_user_token(self, username): @@ -496,70 +483,66 @@ class Database(object): return map_user(await self._get_user(username)) async def set_user_token(self, username, token): - statement = ( - update(Users) - .where( - Users.username == username, - ) - .values( - token=token, - ) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where( + Users.username == username, + ) + .values( + token=token, + ) + ) return result.rowcount != 0 async def set_user_perms(self, username, permissions): - statement = ( - update(Users) - .where(Users.username == username) - .values(permissions=" ".join(permissions)) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where(Users.username == username) + .values(permissions=" ".join(permissions)) + ) return result.rowcount != 0 async def get_all_users(self): - statement = select( - Users.username, - Users.permissions, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + ) + ) return [map_user(row) for row in result] async def new_user(self, username, permissions, token): - statement = insert(Users).values( - username=username, - permissions=" ".join(permissions), - token=token, - ) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute( + insert(Users).values( + username=username, + permissions=" ".join(permissions), + token=token, + ) + ) return True except IntegrityError as e: self.logger.debug("Cannot create new user %s: %s", username, e) return False async def delete_user(self, username): - statement = delete(Users).where(Users.username == username) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(Users).where(Users.username == username) + ) return result.rowcount != 0 async def get_usage(self): usage = {} async with self.db.begin() as session: for name, table in Base.metadata.tables.items(): - statement = select(func.count()).select_from(table) - self.logger.debug("%s", statement) - result = await self.db.execute(statement) + result = await self._execute( + statement=select(func.count()).select_from(table) + ) usage[name] = { "rows": result.scalar(), } -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH 2/5] hashserv: Add unihash-exists API 2024-02-18 20:07 [bitbake-devel][PATCH 0/5] Implement parallel Query API Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 1/5] hashserv: sqlalchemy: Use _execute() helper Joshua Watt @ 2024-02-18 20:07 ` Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 3/5] asyncrpc: Add Client Pool object Joshua Watt ` (3 subsequent siblings) 5 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 20:07 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Adds API to check if the server is aware of the existence of a given unihash. This can be used as an optimization for sstate where a client can query the hash equivalence server to check if a unihash exists before querying the sstate cache. If the hash server isn't aware of the existence of a unihash, then there is very likely not a matching sstate object, so this should be able to significantly cut down on the number of negative hits on the sstate cache. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/bin/bitbake-hashclient | 13 +++++++ bitbake/lib/hashserv/client.py | 44 ++++++++++++++++----- bitbake/lib/hashserv/server.py | 61 +++++++++++++++++++----------- bitbake/lib/hashserv/sqlalchemy.py | 11 ++++++ bitbake/lib/hashserv/sqlite.py | 16 ++++++++ bitbake/lib/hashserv/tests.py | 39 +++++++++++++++++++ 6 files changed, 151 insertions(+), 33 deletions(-) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index f71b87404ae..47dd27cd3c2 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -217,6 +217,14 @@ def main(): print("Removed %d rows" % result["count"]) return 0 + def handle_unihash_exists(args, client): + result = client.unihash_exists(args.unihash) + if args.quiet: + return 0 if result else 1 + + print("true" if result else "false") + return 0 + parser = argparse.ArgumentParser(description='Hash Equivalence Client') parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') @@ -309,6 +317,11 @@ def main(): gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") gc_sweep_parser.set_defaults(func=handle_gc_sweep) + unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server") + unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not") + unihash_exists_parser.add_argument("unihash", help="Unihash to check") + unihash_exists_parser.set_defaults(func=handle_unihash_exists) + args = parser.parse_args() logger = logging.getLogger('hashserv') diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index e6dc4179126..daf1e128423 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client") class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 + MODE_EXIST_STREAM = 2 def __init__(self, username=None, password=None): super().__init__("OEHASHEQUIV", "1.1", logger) @@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self.socket.send("END") return await self.socket.recv() - if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: + async def normal_to_stream(command): + r = await self.invoke({command: None}) + if r != "ok": + raise ConnectionError( + f"Unable to transition to stream mode: Bad response from server {r!r}" + ) + + self.logger.debug("Mode is now %s", command) + + if new_mode == self.mode: + return + + self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) + + # Always transition to normal mode before switching to any other mode + if self.mode != self.MODE_NORMAL: r = await self._send_wrapper(stream_to_normal) if r != "ok": self.check_invoke_error(r) - raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) - elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: - r = await self.invoke({"get-stream": None}) - if r != "ok": - raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) - elif new_mode != self.mode: - raise Exception( - "Undefined mode transition %r -> %r" % (self.mode, new_mode) - ) + raise ConnectionError( + f"Unable to transition to normal mode: Bad response from server {r!r}" + ) + self.logger.debug("Mode is now normal") + + if new_mode == self.MODE_GET_STREAM: + await normal_to_stream("get-stream") + elif new_mode == self.MODE_EXIST_STREAM: + await normal_to_stream("exists-stream") + elif new_mode != self.MODE_NORMAL: + raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") self.mode = new_mode @@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient): {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} ) + async def unihash_exists(self, unihash): + await self._set_mode(self.MODE_EXIST_STREAM) + r = await self.send_stream(unihash) + return r == "true" + async def get_outhash(self, method, outhash, taskhash, with_unihash=True): await self._set_mode(self.MODE_NORMAL) return await self.invoke( @@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client): "report_unihash", "report_unihash_equiv", "get_taskhash", + "unihash_exists", "get_outhash", "get_stats", "reset_stats", diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 5ed852d1f30..68f64f983b2 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): "get": self.handle_get, "get-outhash": self.handle_get_outhash, "get-stream": self.handle_get_stream, + "exists-stream": self.handle_exists_stream, "get-stats": self.handle_get_stats, "get-db-usage": self.handle_get_db_usage, "get-db-query-columns": self.handle_get_db_query_columns, @@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) await self.db.insert_outhash(data) - @permissions(READ_PERM) - async def handle_get_stream(self, request): + async def _stream_handler(self, handler): await self.socket.send_message("ok") while True: @@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if l == "END": break - (method, taskhash) = l.split() - # self.logger.debug('Looking up %s %s' % (method, taskhash)) - row = await self.db.get_equivalent(method, taskhash) - - if row is not None: - msg = row["unihash"] - # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) - elif self.upstream_client is not None: - upstream = await self.upstream_client.get_unihash(method, taskhash) - if upstream: - msg = upstream - else: - msg = "" - else: - msg = "" - + msg = await handler(l) await self.socket.send(msg) finally: request_measure.end() self.request_sample.end() - # Post to the backfill queue after writing the result to minimize - # the turn around time on a request - if upstream is not None: - await self.server.backfill_queue.put((method, taskhash)) - await self.socket.send("ok") return self.NO_RESPONSE + @permissions(READ_PERM) + async def handle_get_stream(self, request): + async def handler(l): + (method, taskhash) = l.split() + # self.logger.debug('Looking up %s %s' % (method, taskhash)) + row = await self.db.get_equivalent(method, taskhash) + + if row is not None: + # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + return row["unihash"] + + if self.upstream_client is not None: + upstream = await self.upstream_client.get_unihash(method, taskhash) + if upstream: + await self.server.backfill_queue.put((method, taskhash)) + return upstream + + return "" + + return await self._stream_handler(handler) + + @permissions(READ_PERM) + async def handle_exists_stream(self, request): + async def handler(l): + if await self.db.unihash_exists(l): + return "true" + + if self.upstream_client is not None: + if await self.upstream_client.unihash_exists(l): + return "true" + + return "false" + + return await self._stream_handler(handler) + async def report_readonly(self, data): method = data["method"] outhash = data["outhash"] diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 873547809a0..0e28d738f5a 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -48,6 +48,7 @@ class UnihashesV3(Base): __table_args__ = ( UniqueConstraint("method", "taskhash"), Index("taskhash_lookup_v4", "method", "taskhash"), + Index("unihash_lookup_v1", "unihash"), ) @@ -279,6 +280,16 @@ class Database(object): ) return map_row(result.first()) + async def unihash_exists(self, unihash): + async with self.db.begin(): + result = await self._execute( + select(UnihashesV3) + .where(UnihashesV3.unihash == unihash) + .limit(1) + ) + + return result.first() is not None + async def get_outhash(self, method, outhash): async with self.db.begin(): result = await self._execute( diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index 608490730d7..da2e844a031 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py @@ -144,6 +144,9 @@ class DatabaseEngine(object): cursor.execute( "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" + ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) @@ -255,6 +258,19 @@ class Database(object): ) return cursor.fetchone() + async def unihash_exists(self, unihash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT * FROM unihashes_v3 WHERE unihash=:unihash + LIMIT 1 + """, + { + "unihash": unihash, + }, + ) + return cursor.fetchone() is not None + async def get_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index aeedab3575e..fbbe81512a0 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object): self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') self.assertEqual(result['method'], self.METHOD) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + self.assertTrue(self.client.unihash_exists(unihash)) + self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8')) + def test_ro_server(self): rw_server = self.start_server() rw_client = self.start_client(rw_server.address) @@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): def test_stress(self): self.run_hashclient(["--address", self.server_address, "stress"], check=True) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + ], check=True) + self.assertEqual(p.stdout.strip(), "true") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + ], check=True) + self.assertEqual(p.stdout.strip(), "false") + + def test_unihash_exsits_quiet(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + "--quiet", + ]) + self.assertEqual(p.returncode, 0) + self.assertEqual(p.stdout.strip(), "") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + "--quiet", + ]) + self.assertEqual(p.returncode, 1) + self.assertEqual(p.stdout.strip(), "") + def test_remove_taskhash(self): taskhash, outhash, unihash = self.create_test_hash(self.client) self.run_hashclient([ -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH 3/5] asyncrpc: Add Client Pool object 2024-02-18 20:07 [bitbake-devel][PATCH 0/5] Implement parallel Query API Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 1/5] hashserv: sqlalchemy: Use _execute() helper Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 2/5] hashserv: Add unihash-exists API Joshua Watt @ 2024-02-18 20:07 ` Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 4/5] hashserv: Add Client Pool Joshua Watt ` (2 subsequent siblings) 5 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 20:07 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Adds an abstract base class that can be used to implement a pool of client connections. The class implements a thread that runs an async event loop, and allows derived classes to schedule work on the loop and wait for the work to be finished. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/bb/asyncrpc/__init__.py | 2 +- bitbake/lib/bb/asyncrpc/client.py | 77 +++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index a4371643d74..639e1607f8e 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py @@ -5,7 +5,7 @@ # -from .client import AsyncClient, Client +from .client import AsyncClient, Client, ClientPool from .serv import AsyncServer, AsyncServerConnection from .connection import DEFAULT_MAX_CHUNK from .exceptions import ( diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 0d7cd85780d..a6228bb0ba0 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py @@ -10,6 +10,8 @@ import json import os import socket import sys +import contextlib +from threading import Thread from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK from .exceptions import ConnectionClosedError, InvokeError @@ -180,3 +182,78 @@ class Client(object): def __exit__(self, exc_type, exc_value, traceback): self.close() return False + + +class ClientPool(object): + def __init__(self, max_clients): + self.avail_clients = [] + self.num_clients = 0 + self.max_clients = max_clients + self.loop = None + self.client_condition = None + + @abc.abstractmethod + async def _new_client(self): + raise NotImplementedError("Must be implemented in derived class") + + def close(self): + if self.client_condition: + self.client_condition = None + + if self.loop: + self.loop.run_until_complete(self.__close_clients()) + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + self.loop = None + + def run_tasks(self, tasks): + if not self.loop: + self.loop = asyncio.new_event_loop() + + thread = Thread(target=self.__thread_main, args=(tasks,)) + thread.start() + thread.join() + + @contextlib.asynccontextmanager + async def get_client(self): + async with self.client_condition: + if self.avail_clients: + client = self.avail_clients.pop() + elif self.num_clients < self.max_clients: + self.num_clients += 1 + client = await self._new_client() + else: + while not self.avail_clients: + await self.client_condition.wait() + client = self.avail_clients.pop() + + try: + yield client + finally: + async with self.client_condition: + self.avail_clients.append(client) + self.client_condition.notify() + + def __thread_main(self, tasks): + async def process_task(task): + async with self.get_client() as client: + await task(client) + + asyncio.set_event_loop(self.loop) + if not self.client_condition: + self.client_condition = asyncio.Condition() + tasks = [process_task(t) for t in tasks] + self.loop.run_until_complete(asyncio.gather(*tasks)) + + async def __close_clients(self): + for c in self.avail_clients: + await c.close() + self.avail_clients = [] + self.num_clients = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH 4/5] hashserv: Add Client Pool 2024-02-18 20:07 [bitbake-devel][PATCH 0/5] Implement parallel Query API Joshua Watt ` (2 preceding siblings ...) 2024-02-18 20:07 ` [bitbake-devel][PATCH 3/5] asyncrpc: Add Client Pool object Joshua Watt @ 2024-02-18 20:07 ` Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 5/5] siggen: Add parallel query API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt 5 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 20:07 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Implements a Client Pool derived from the AsyncRPC client pool that allows querying for multiple equivalent hashes in parallel Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/hashserv/client.py | 80 ++++++++++++++++++++++++++++++++ bitbake/lib/hashserv/tests.py | 83 ++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index daf1e128423..b269879ecfd 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -283,3 +283,83 @@ class Client(bb.asyncrpc.Client): def _get_async_client(self): return AsyncClient(self.username, self.password) + + +class ClientPool(bb.asyncrpc.ClientPool): + def __init__( + self, + address, + max_clients, + *, + username=None, + password=None, + become=None, + ): + super().__init__(max_clients) + self.address = address + self.username = username + self.password = password + self.become = become + + async def _new_client(self): + client = await create_async_client( + self.address, + username=self.username, + password=self.password, + ) + if self.become: + await client.become_user(self.become) + return client + + def _run_key_tasks(self, queries, call): + results = {key: None for key in queries.keys()} + + def make_task(key, args): + async def task(client): + nonlocal results + unihash = await call(client, args) + results[key] = unihash + + return task + + def gen_tasks(): + for key, args in queries.items(): + yield make_task(key, args) + + self.run_tasks(gen_tasks()) + return results + + def get_unihashes(self, queries): + """ + Query multiple unihashes in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a tuple of (method, taskhash). + + Returns a dictionary with a corresponding key for each input key, and + the value is the queried unihash (which might be none if the query + failed) + """ + + async def call(client, args): + method, taskhash = args + return await client.get_unihash(method, taskhash) + + return self._run_key_tasks(queries, call) + + def unihashes_exist(self, queries): + """ + Query multiple unihash existence checks in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a unihash. + + Returns a dictionary with a corresponding key for each input key, and + the value is True or False if the unihash is known by the server (or + None if there was a failure) + """ + + async def call(client, unihash): + return await client.unihash_exists(unihash) + + return self._run_key_tasks(queries, call) diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index fbbe81512a0..0809453cf87 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -8,6 +8,7 @@ from . import create_server, create_client from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS from bb.asyncrpc import InvokeError +from .client import ClientPool import hashlib import logging import multiprocessing @@ -554,6 +555,88 @@ class HashEquivalenceCommonTests(object): # shares a taskhash with Task 2 self.assertClientGetHash(self.client, taskhash2, unihash2) + + def test_client_pool_get_unihashes(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + + query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} + for idx, taskhash in enumerate(EXTRA_QUERIES): + query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) + + result = client_pool.get_unihashes(query) + + self.assertDictEqual(result, { + 0: "218e57509998197d570e2c98512d0105985dffc9", + 1: "218e57509998197d570e2c98512d0105985dffc9", + 2: "218e57509998197d570e2c98512d0105985dffc9", + 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", + 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", + 7: None, + }) + + def test_client_pool_unihash_exists(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + result_unihashes = set() + + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + result_unihashes.add(result["unihash"]) + + query = {} + expected = {} + + for _, _, unihash in TEST_INPUT: + idx = len(query) + query[idx] = unihash + expected[idx] = unihash in result_unihashes + + + for unihash in EXTRA_QUERIES: + idx = len(query) + query[idx] = unihash + expected[idx] = False + + result = client_pool.unihashes_exist(query) + self.assertDictEqual(result, expected) + + def test_auth_read_perms(self): admin_client = self.start_auth_server() -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH 5/5] siggen: Add parallel query API 2024-02-18 20:07 [bitbake-devel][PATCH 0/5] Implement parallel Query API Joshua Watt ` (3 preceding siblings ...) 2024-02-18 20:07 ` [bitbake-devel][PATCH 4/5] hashserv: Add Client Pool Joshua Watt @ 2024-02-18 20:07 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt 5 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 20:07 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Implements a new API called get_unihashes() that allows for querying multiple unihashes in parallel. The API is also reworked to make it easier for derived classes to interface with the new API in a consistent manner. Instead of overriding get_unihash() to add custom handling for local hash calculating (e.g. caches) derived classes should now override get_cached_unihash(), and return the local unihash or None if there isn't one. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/bb/siggen.py | 121 ++++++++++++++++++++++++++++----------- 1 file changed, 87 insertions(+), 34 deletions(-) diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py index 58854aee76c..e1a4fa2cdd1 100644 --- a/bitbake/lib/bb/siggen.py +++ b/bitbake/lib/bb/siggen.py @@ -102,9 +102,18 @@ class SignatureGenerator(object): if flag: self.datacaches[mc].stamp_extrainfo[mcfn][t] = flag + def get_cached_unihash(self, tid): + return None + def get_unihash(self, tid): + unihash = self.get_cached_unihash(tid) + if unihash: + return unihash return self.taskhash[tid] + def get_unihashes(self, tids): + return {tid: self.get_unihash(tid) for tid in tids} + def prep_taskhash(self, tid, deps, dataCaches): return @@ -524,28 +533,37 @@ class SignatureGeneratorUniHashMixIn(object): super().__init__(data) def get_taskdata(self): - return (self.server, self.method, self.extramethod) + super().get_taskdata() + return (self.server, self.method, self.extramethod, self.max_parallel) + super().get_taskdata() def set_taskdata(self, data): - self.server, self.method, self.extramethod = data[:3] - super().set_taskdata(data[3:]) + self.server, self.method, self.extramethod, self.max_parallel = data[:4] + super().set_taskdata(data[4:]) def client(self): if getattr(self, '_client', None) is None: self._client = hashserv.create_client(self.server) return self._client + def client_pool(self): + if getattr(self, '_client_pool', None) is None: + self._client_pool = hashserv.client.ClientPool(self.server, self.max_parallel) + return self._client_pool + def reset(self, data): - if getattr(self, '_client', None) is not None: - self._client.close() - self._client = None + self.__close_clients() return super().reset(data) def exit(self): + self.__close_clients() + return super().exit() + + def __close_clients(self): if getattr(self, '_client', None) is not None: self._client.close() self._client = None - return super().exit() + if getattr(self, '_client_pool', None) is not None: + self._client_pool.close() + self._client_pool = None def get_stampfile_hash(self, tid): if tid in self.taskhash: @@ -578,7 +596,7 @@ class SignatureGeneratorUniHashMixIn(object): return None return unihash - def get_unihash(self, tid): + def get_cached_unihash(self, tid): taskhash = self.taskhash[tid] # If its not a setscene task we can return @@ -593,40 +611,74 @@ class SignatureGeneratorUniHashMixIn(object): self.unihash[tid] = unihash return unihash - # In the absence of being able to discover a unique hash from the - # server, make it be equivalent to the taskhash. The unique "hash" only - # really needs to be a unique string (not even necessarily a hash), but - # making it match the taskhash has a few advantages: - # - # 1) All of the sstate code that assumes hashes can be the same - # 2) It provides maximal compatibility with builders that don't use - # an equivalency server - # 3) The value is easy for multiple independent builders to derive the - # same unique hash from the same input. This means that if the - # independent builders find the same taskhash, but it isn't reported - # to the server, there is a better chance that they will agree on - # the unique hash. - unihash = taskhash + return None - try: - method = self.method - if tid in self.extramethod: - method = method + self.extramethod[tid] - data = self.client().get_unihash(method, self.taskhash[tid]) - if data: - unihash = data + def _get_method(self, tid): + method = self.method + if tid in self.extramethod: + method = method + self.extramethod[tid] + + return method + + def get_unihash(self, tid): + return self.get_unihashes([tid])[tid] + + def get_unihashes(self, tids): + """ + For a iterable of tids, returns a dictionary that maps each tid to a + unihash + """ + result = {} + queries = {} + query_result = {} + + for tid in tids: + unihash = self.get_cached_unihash(tid) + if unihash: + result[tid] = unihash + else: + queries[tid] = (self._get_method(tid), self.taskhash[tid]) + + if len(queries) == 0: + return result + + if self.max_parallel <= 1 or len(queries) <= 1: + # No parallelism required. Make the query serially with the single client + for tid, args in queries.items(): + query_result[tid] = self.client().get_unihash(*args) + else: + query_result = self.client_pool().get_unihashes(queries) + + for tid, unihash in query_result.items(): + # In the absence of being able to discover a unique hash from the + # server, make it be equivalent to the taskhash. The unique "hash" only + # really needs to be a unique string (not even necessarily a hash), but + # making it match the taskhash has a few advantages: + # + # 1) All of the sstate code that assumes hashes can be the same + # 2) It provides maximal compatibility with builders that don't use + # an equivalency server + # 3) The value is easy for multiple independent builders to derive the + # same unique hash from the same input. This means that if the + # independent builders find the same taskhash, but it isn't reported + # to the server, there is a better chance that they will agree on + # the unique hash. + taskhash = self.taskhash[tid] + if unihash: # A unique hash equal to the taskhash is not very interesting, # so it is reported it at debug level 2. If they differ, that # is much more interesting, so it is reported at debug level 1 hashequiv_logger.bbdebug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server)) else: hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server)) - except ConnectionError as e: - bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) + unihash = taskhash - self.set_unihash(tid, unihash) - self.unihash[tid] = unihash - return unihash + + self.set_unihash(tid, unihash) + self.unihash[tid] = unihash + result[tid] = unihash + + return result def report_unihash(self, path, task, d): import importlib @@ -754,6 +806,7 @@ class SignatureGeneratorTestEquivHash(SignatureGeneratorUniHashMixIn, SignatureG super().init_rundepcheck(data) self.server = data.getVar('BB_HASHSERVE') self.method = "sstate_output_hash" + self.max_parallel = 1 def clean_checksum_file_path(file_checksum_tuple): f, cs = file_checksum_tuple -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 0/8] Implement parallel Query API 2024-02-18 20:07 [bitbake-devel][PATCH 0/5] Implement parallel Query API Joshua Watt ` (4 preceding siblings ...) 2024-02-18 20:07 ` [bitbake-devel][PATCH 5/5] siggen: Add parallel query API Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 1/8] hashserv: Add Unihash Garbage Collection Joshua Watt ` (7 more replies) 5 siblings, 8 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Implements API to allows querying if a hash exists in the server, and to do so using multiple connections to the server in parallel. This will be used to query if an sstate object exists before actually looking for it in the sstate cache, which is important for e.g. on a CDN where negative lookups are expensive V2: Added some patches that were missed in the first series, and also rebased Tobias patch on top of my changes, as it was easier than rebasing the other way around. Joshua Watt (7): hashserv: Add Unihash Garbage Collection hashserv: sqlalchemy: Use _execute() helper hashserv: Add unihash-exists API asyncrpc: Add Client Pool object hashserv: Add Client Pool siggen: Add parallel query API siggen: Add parallel unihash exist API Tobias Hagelborn (1): bitbake: hashserv: Postgres adaptations for ignoring duplicate inserts bitbake/bin/bitbake-hashclient | 48 +++ bitbake/lib/bb/asyncrpc/__init__.py | 2 +- bitbake/lib/bb/asyncrpc/client.py | 77 +++++ bitbake/lib/bb/siggen.py | 153 +++++++-- bitbake/lib/hashserv/client.py | 155 ++++++++- bitbake/lib/hashserv/server.py | 164 +++++---- bitbake/lib/hashserv/sqlalchemy.py | 515 ++++++++++++++++++---------- bitbake/lib/hashserv/sqlite.py | 221 ++++++++++-- bitbake/lib/hashserv/tests.py | 320 +++++++++++++++++ 9 files changed, 1336 insertions(+), 319 deletions(-) -- 2.34.1 ^ permalink raw reply [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 1/8] hashserv: Add Unihash Garbage Collection 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 2/8] hashserv: sqlalchemy: Use _execute() helper Joshua Watt ` (6 subsequent siblings) 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Adds support for removing unused unihashes from the database. This is done using a "mark and sweep" style of garbage collection where a collection is started by marking which unihashes should be kept in the database, then performing a sweep to remove any unmarked hashes. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/bin/bitbake-hashclient | 35 +++++ bitbake/lib/hashserv/client.py | 31 ++++ bitbake/lib/hashserv/server.py | 105 ++++++++------ bitbake/lib/hashserv/sqlalchemy.py | 226 ++++++++++++++++++++++++----- bitbake/lib/hashserv/sqlite.py | 205 +++++++++++++++++++++----- bitbake/lib/hashserv/tests.py | 198 +++++++++++++++++++++++++ 6 files changed, 684 insertions(+), 116 deletions(-) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index 2cb63386662..f71b87404ae 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -195,6 +195,28 @@ def main(): columns = client.get_db_query_columns() print("\n".join(sorted(columns))) + def handle_gc_status(args, client): + result = client.gc_status() + if not result["mark"]: + print("No Garbage collection in progress") + return 0 + + print("Current Mark: %s" % result["mark"]) + print("Total hashes to keep: %d" % result["keep"]) + print("Total hashes to remove: %s" % result["remove"]) + return 0 + + def handle_gc_mark(args, client): + where = {k: v for k, v in args.where} + result = client.gc_mark(args.mark, where) + print("New hashes marked: %d" % result["count"]) + return 0 + + def handle_gc_sweep(args, client): + result = client.gc_sweep(args.mark) + print("Removed %d rows" % result["count"]) + return 0 + parser = argparse.ArgumentParser(description='Hash Equivalence Client') parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') @@ -274,6 +296,19 @@ def main(): db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries") db_query_columns_parser.set_defaults(func=handle_get_db_query_columns) + gc_status_parser = subparsers.add_parser("gc-status", help="Show garbage collection status") + gc_status_parser.set_defaults(func=handle_gc_status) + + gc_mark_parser = subparsers.add_parser('gc-mark', help="Mark hashes to be kept for garbage collection") + gc_mark_parser.add_argument("mark", help="Mark for this garbage collection operation") + gc_mark_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[], + help="Keep entries in table where KEY == VALUE") + gc_mark_parser.set_defaults(func=handle_gc_mark) + + gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") + gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") + gc_sweep_parser.set_defaults(func=handle_gc_sweep) + args = parser.parse_args() logger = logging.getLogger('hashserv') diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 35a97687fbe..e6dc4179126 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -194,6 +194,34 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self._set_mode(self.MODE_NORMAL) return (await self.invoke({"get-db-query-columns": {}}))["columns"] + async def gc_status(self): + await self._set_mode(self.MODE_NORMAL) + return await self.invoke({"gc-status": {}}) + + async def gc_mark(self, mark, where): + """ + Starts a new garbage collection operation identified by "mark". If + garbage collection is already in progress with "mark", the collection + is continued. + + All unihash entries that match the "where" clause are marked to be + kept. In addition, any new entries added to the database after this + command will be automatically marked with "mark" + """ + await self._set_mode(self.MODE_NORMAL) + return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) + + async def gc_sweep(self, mark): + """ + Finishes garbage collection for "mark". All unihash entries that have + not been marked will be deleted. + + It is recommended to clean unused outhash entries after running this to + cleanup any dangling outhashes + """ + await self._set_mode(self.MODE_NORMAL) + return await self.invoke({"gc-sweep": {"mark": mark}}) + class Client(bb.asyncrpc.Client): def __init__(self, username=None, password=None): @@ -224,6 +252,9 @@ class Client(bb.asyncrpc.Client): "become_user", "get_db_usage", "get_db_query_columns", + "gc_status", + "gc_mark", + "gc_sweep", ) def _get_async_client(self): diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a86507830e8..5ed852d1f30 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -199,7 +199,7 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): if not self.user_has_permissions(*permissions, allow_anon=allow_anon): if not self.user: username = "Anonymous user" - user_perms = self.anon_perms + user_perms = self.server.anon_perms else: username = self.user.username user_perms = self.user.permissions @@ -223,25 +223,11 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): class ServerClient(bb.asyncrpc.AsyncServerConnection): - def __init__( - self, - socket, - db_engine, - request_stats, - backfill_queue, - upstream, - read_only, - anon_perms, - ): - super().__init__(socket, "OEHASHEQUIV", logger) - self.db_engine = db_engine - self.request_stats = request_stats + def __init__(self, socket, server): + super().__init__(socket, "OEHASHEQUIV", server.logger) + self.server = server self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK - self.backfill_queue = backfill_queue - self.upstream = upstream - self.read_only = read_only self.user = None - self.anon_perms = anon_perms self.handlers.update( { @@ -261,13 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): } ) - if not read_only: + if not self.server.read_only: self.handlers.update( { "report-equiv": self.handle_equivreport, "reset-stats": self.handle_reset_stats, "backfill-wait": self.handle_backfill_wait, "remove": self.handle_remove, + "gc-mark": self.handle_gc_mark, + "gc-sweep": self.handle_gc_sweep, + "gc-status": self.handle_gc_status, "clean-unused": self.handle_clean_unused, "refresh-token": self.handle_refresh_token, "set-user-perms": self.handle_set_perms, @@ -282,10 +271,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): def user_has_permissions(self, *permissions, allow_anon=True): permissions = set(permissions) if allow_anon: - if ALL_PERM in self.anon_perms: + if ALL_PERM in self.server.anon_perms: return True - if not permissions - self.anon_perms: + if not permissions - self.server.anon_perms: return True if self.user is None: @@ -303,10 +292,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): return self.proto_version > (1, 0) and self.proto_version <= (1, 1) async def process_requests(self): - async with self.db_engine.connect(self.logger) as db: + async with self.server.db_engine.connect(self.logger) as db: self.db = db - if self.upstream is not None: - self.upstream_client = await create_async_client(self.upstream) + if self.server.upstream is not None: + self.upstream_client = await create_async_client(self.server.upstream) else: self.upstream_client = None @@ -323,7 +312,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if "stream" in k: return await self.handlers[k](msg[k]) else: - with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): + with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): return await self.handlers[k](msg[k]) raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) @@ -404,7 +393,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): # possible (which is why the request sample is handled manually # instead of using 'with', and also why logging statements are # commented out. - self.request_sample = self.request_stats.start_sample() + self.request_sample = self.server.request_stats.start_sample() request_measure = self.request_sample.measure() request_measure.start() @@ -435,7 +424,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): # Post to the backfill queue after writing the result to minimize # the turn around time on a request if upstream is not None: - await self.backfill_queue.put((method, taskhash)) + await self.server.backfill_queue.put((method, taskhash)) await self.socket.send("ok") return self.NO_RESPONSE @@ -461,7 +450,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): # report is made inside the function @permissions(READ_PERM) async def handle_report(self, data): - if self.read_only or not self.user_has_permissions(REPORT_PERM): + if self.server.read_only or not self.user_has_permissions(REPORT_PERM): return await self.report_readonly(data) outhash_data = { @@ -538,24 +527,24 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): @permissions(READ_PERM) async def handle_get_stats(self, request): return { - "requests": self.request_stats.todict(), + "requests": self.server.request_stats.todict(), } @permissions(DB_ADMIN_PERM) async def handle_reset_stats(self, request): d = { - "requests": self.request_stats.todict(), + "requests": self.server.request_stats.todict(), } - self.request_stats.reset() + self.server.request_stats.reset() return d @permissions(READ_PERM) async def handle_backfill_wait(self, request): d = { - "tasks": self.backfill_queue.qsize(), + "tasks": self.server.backfill_queue.qsize(), } - await self.backfill_queue.join() + await self.server.backfill_queue.join() return d @permissions(DB_ADMIN_PERM) @@ -566,6 +555,46 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): return {"count": await self.db.remove(condition)} + @permissions(DB_ADMIN_PERM) + async def handle_gc_mark(self, request): + condition = request["where"] + mark = request["mark"] + + if not isinstance(condition, dict): + raise TypeError("Bad condition type %s" % type(condition)) + + if not isinstance(mark, str): + raise TypeError("Bad mark type %s" % type(mark)) + + return {"count": await self.db.gc_mark(mark, condition)} + + @permissions(DB_ADMIN_PERM) + async def handle_gc_sweep(self, request): + mark = request["mark"] + + if not isinstance(mark, str): + raise TypeError("Bad mark type %s" % type(mark)) + + current_mark = await self.db.get_current_gc_mark() + + if not current_mark or mark != current_mark: + raise bb.asyncrpc.InvokeError( + f"'{mark}' is not the current mark. Refusing to sweep" + ) + + count = await self.db.gc_sweep() + + return {"count": count} + + @permissions(DB_ADMIN_PERM) + async def handle_gc_status(self, request): + (keep_rows, remove_rows, current_mark) = await self.db.gc_status() + return { + "keep": keep_rows, + "remove": remove_rows, + "mark": current_mark, + } + @permissions(DB_ADMIN_PERM) async def handle_clean_unused(self, request): max_age = request["max_age_seconds"] @@ -779,15 +808,7 @@ class Server(bb.asyncrpc.AsyncServer): ) def accept_client(self, socket): - return ServerClient( - socket, - self.db_engine, - self.request_stats, - self.backfill_queue, - self.upstream, - self.read_only, - self.anon_perms, - ) + return ServerClient(socket, self) async def create_admin_user(self): admin_permissions = (ALL_PERM,) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index cee04bffb03..89a6b86d9d8 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -28,6 +28,7 @@ from sqlalchemy import ( delete, update, func, + inspect, ) import sqlalchemy.engine from sqlalchemy.orm import declarative_base @@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError Base = declarative_base() -class UnihashesV2(Base): - __tablename__ = "unihashes_v2" +class UnihashesV3(Base): + __tablename__ = "unihashes_v3" id = Column(Integer, primary_key=True, autoincrement=True) method = Column(Text, nullable=False) taskhash = Column(Text, nullable=False) unihash = Column(Text, nullable=False) + gc_mark = Column(Text, nullable=False) __table_args__ = ( UniqueConstraint("method", "taskhash"), - Index("taskhash_lookup_v3", "method", "taskhash"), + Index("taskhash_lookup_v4", "method", "taskhash"), ) @@ -79,6 +81,36 @@ class Users(Base): __table_args__ = (UniqueConstraint("username"),) +class Config(Base): + __tablename__ = "config" + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(Text, nullable=False) + value = Column(Text) + __table_args__ = ( + UniqueConstraint("name"), + Index("config_lookup", "name"), + ) + + +# +# Old table versions +# +DeprecatedBase = declarative_base() + + +class UnihashesV2(DeprecatedBase): + __tablename__ = "unihashes_v2" + id = Column(Integer, primary_key=True, autoincrement=True) + method = Column(Text, nullable=False) + taskhash = Column(Text, nullable=False) + unihash = Column(Text, nullable=False) + + __table_args__ = ( + UniqueConstraint("method", "taskhash"), + Index("taskhash_lookup_v3", "method", "taskhash"), + ) + + class DatabaseEngine(object): def __init__(self, url, username=None, password=None): self.logger = logging.getLogger("hashserv.sqlalchemy") @@ -91,6 +123,9 @@ class DatabaseEngine(object): self.url = self.url.set(password=password) async def create(self): + def check_table_exists(conn, name): + return inspect(conn).has_table(name) + self.logger.info("Using database %s", self.url) self.engine = create_async_engine(self.url, poolclass=NullPool) @@ -99,6 +134,24 @@ class DatabaseEngine(object): self.logger.info("Creating tables...") await conn.run_sync(Base.metadata.create_all) + if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__): + self.logger.info("Upgrading Unihashes V2 -> V3...") + statement = insert(UnihashesV3).from_select( + ["id", "method", "unihash", "taskhash", "gc_mark"], + select( + UnihashesV2.id, + UnihashesV2.method, + UnihashesV2.unihash, + UnihashesV2.taskhash, + literal("").label("gc_mark"), + ), + ) + self.logger.debug("%s", statement) + await conn.execute(statement) + + await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__]) + self.logger.info("Upgrade complete") + def connect(self, logger): return Database(self.engine, logger) @@ -118,6 +171,15 @@ def map_user(row): ) +def _make_condition_statement(table, condition): + where = {} + for c in table.__table__.columns: + if c.key in condition and condition[c.key] is not None: + where[c] = condition[c.key] + + return [(k == v) for k, v in where.items()] + + class Database(object): def __init__(self, engine, logger): self.engine = engine @@ -135,17 +197,52 @@ class Database(object): await self.db.close() self.db = None + async def _execute(self, statement): + self.logger.debug("%s", statement) + return await self.db.execute(statement) + + async def _set_config(self, name, value): + while True: + result = await self._execute( + update(Config).where(Config.name == name).values(value=value) + ) + + if result.rowcount == 0: + self.logger.debug("Config '%s' not found. Adding it", name) + try: + await self._execute(insert(Config).values(name=name, value=value)) + except IntegrityError: + # Race. Try again + continue + + break + + def _get_config_subquery(self, name, default=None): + if default is not None: + return func.coalesce( + select(Config.value).where(Config.name == name).scalar_subquery(), + default, + ) + return select(Config.value).where(Config.name == name).scalar_subquery() + + async def _get_config(self, name): + result = await self._execute(select(Config.value).where(Config.name == name)) + row = result.first() + if row is None: + return None + return row.value + async def get_unihash_by_taskhash_full(self, method, taskhash): statement = ( select( OuthashesV2, - UnihashesV2.unihash.label("unihash"), + UnihashesV3.unihash.label("unihash"), ) .join( - UnihashesV2, + UnihashesV3, and_( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ), ) .where( @@ -164,12 +261,12 @@ class Database(object): async def get_unihash_by_outhash(self, method, outhash): statement = ( - select(OuthashesV2, UnihashesV2.unihash.label("unihash")) + select(OuthashesV2, UnihashesV3.unihash.label("unihash")) .join( - UnihashesV2, + UnihashesV3, and_( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ), ) .where( @@ -208,13 +305,13 @@ class Database(object): statement = ( select( OuthashesV2.taskhash.label("taskhash"), - UnihashesV2.unihash.label("unihash"), + UnihashesV3.unihash.label("unihash"), ) .join( - UnihashesV2, + UnihashesV3, and_( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ), ) .where( @@ -234,12 +331,12 @@ class Database(object): async def get_equivalent(self, method, taskhash): statement = select( - UnihashesV2.unihash, - UnihashesV2.method, - UnihashesV2.taskhash, + UnihashesV3.unihash, + UnihashesV3.method, + UnihashesV3.taskhash, ).where( - UnihashesV2.method == method, - UnihashesV2.taskhash == taskhash, + UnihashesV3.method == method, + UnihashesV3.taskhash == taskhash, ) self.logger.debug("%s", statement) async with self.db.begin(): @@ -248,13 +345,9 @@ class Database(object): async def remove(self, condition): async def do_remove(table): - where = {} - for c in table.__table__.columns: - if c.key in condition and condition[c.key] is not None: - where[c] = condition[c.key] - + where = _make_condition_statement(table, condition) if where: - statement = delete(table).where(*[(k == v) for k, v in where.items()]) + statement = delete(table).where(*where) self.logger.debug("%s", statement) async with self.db.begin(): result = await self.db.execute(statement) @@ -263,19 +356,74 @@ class Database(object): return 0 count = 0 - count += await do_remove(UnihashesV2) + count += await do_remove(UnihashesV3) count += await do_remove(OuthashesV2) return count + async def get_current_gc_mark(self): + async with self.db.begin(): + return await self._get_config("gc-mark") + + async def gc_status(self): + async with self.db.begin(): + gc_mark_subquery = self._get_config_subquery("gc-mark", "") + + result = await self._execute( + select(func.count()) + .select_from(UnihashesV3) + .where(UnihashesV3.gc_mark == gc_mark_subquery) + ) + keep_rows = result.scalar() + + result = await self._execute( + select(func.count()) + .select_from(UnihashesV3) + .where(UnihashesV3.gc_mark != gc_mark_subquery) + ) + remove_rows = result.scalar() + + return (keep_rows, remove_rows, await self._get_config("gc-mark")) + + async def gc_mark(self, mark, condition): + async with self.db.begin(): + await self._set_config("gc-mark", mark) + + where = _make_condition_statement(UnihashesV3, condition) + if not where: + return 0 + + result = await self._execute( + update(UnihashesV3) + .values(gc_mark=self._get_config_subquery("gc-mark", "")) + .where(*where) + ) + return result.rowcount + + async def gc_sweep(self): + async with self.db.begin(): + result = await self._execute( + delete(UnihashesV3).where( + # A sneaky conditional that provides some errant use + # protection: If the config mark is NULL, this will not + # match any rows because No default is specified in the + # select statement + UnihashesV3.gc_mark + != self._get_config_subquery("gc-mark") + ) + ) + await self._set_config("gc-mark", None) + + return result.rowcount + async def clean_unused(self, oldest): statement = delete(OuthashesV2).where( OuthashesV2.created < oldest, ~( - select(UnihashesV2.id) + select(UnihashesV3.id) .where( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ) .limit(1) .exists() @@ -287,15 +435,17 @@ class Database(object): return result.rowcount async def insert_unihash(self, method, taskhash, unihash): - statement = insert(UnihashesV2).values( - method=method, - taskhash=taskhash, - unihash=unihash, - ) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute( + insert(UnihashesV3).values( + method=method, + taskhash=taskhash, + unihash=unihash, + gc_mark=self._get_config_subquery("gc-mark", ""), + ) + ) + return True except IntegrityError: self.logger.debug( @@ -418,7 +568,7 @@ class Database(object): async def get_query_columns(self): columns = set() - for table in (UnihashesV2, OuthashesV2): + for table in (UnihashesV3, OuthashesV2): for c in table.__table__.columns: if not isinstance(c.type, Text): continue diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index f93cb2c1dd9..608490730d7 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py @@ -15,6 +15,7 @@ UNIHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), ("taskhash", "TEXT NOT NULL", "UNIQUE"), ("unihash", "TEXT NOT NULL", ""), + ("gc_mark", "TEXT NOT NULL", ""), ) UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) @@ -44,6 +45,14 @@ USERS_TABLE_DEFINITION = ( USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) +CONFIG_TABLE_DEFINITION = ( + ("name", "TEXT NOT NULL", "UNIQUE"), + ("value", "TEXT", ""), +) + +CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) + + def _make_table(cursor, name, definition): cursor.execute( """ @@ -71,6 +80,35 @@ def map_user(row): ) +def _make_condition_statement(columns, condition): + where = {} + for c in columns: + if c in condition and condition[c] is not None: + where[c] = condition[c] + + return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys()) + + +def _get_sqlite_version(cursor): + cursor.execute("SELECT sqlite_version()") + + version = [] + for v in cursor.fetchone()[0].split("."): + try: + version.append(int(v)) + except ValueError: + version.append(v) + + return tuple(version) + + +def _schema_table_name(version): + if version >= (3, 33): + return "sqlite_schema" + + return "sqlite_master" + + class DatabaseEngine(object): def __init__(self, dbname, sync): self.dbname = dbname @@ -82,9 +120,10 @@ class DatabaseEngine(object): db.row_factory = sqlite3.Row with closing(db.cursor()) as cursor: - _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) + _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION) _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) _make_table(cursor, "users", USERS_TABLE_DEFINITION) + _make_table(cursor, "config", CONFIG_TABLE_DEFINITION) cursor.execute("PRAGMA journal_mode = WAL") cursor.execute( @@ -96,17 +135,38 @@ class DatabaseEngine(object): cursor.execute("DROP INDEX IF EXISTS outhash_lookup") cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") + cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3") # TODO: Upgrade from tasks_v2? cursor.execute("DROP TABLE IF EXISTS tasks_v2") # Create new indexes cursor.execute( - "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" + "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) + cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") + + sqlite_version = _get_sqlite_version(cursor) + + cursor.execute( + f""" + SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2' + """ + ) + if cursor.fetchone(): + self.logger.info("Upgrading Unihashes V2 -> V3...") + cursor.execute( + """ + INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark) + SELECT id, method, unihash, taskhash, '' FROM unihashes_v2 + """ + ) + cursor.execute("DROP TABLE unihashes_v2") + db.commit() + self.logger.info("Upgrade complete") def connect(self, logger): return Database(logger, self.dbname, self.sync) @@ -126,16 +186,7 @@ class Database(object): "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") ) - cursor.execute("SELECT sqlite_version()") - - version = [] - for v in cursor.fetchone()[0].split("."): - try: - version.append(int(v)) - except ValueError: - version.append(v) - - self.sqlite_version = tuple(version) + self.sqlite_version = _get_sqlite_version(cursor) async def __aenter__(self): return self @@ -143,6 +194,30 @@ class Database(object): async def __aexit__(self, exc_type, exc_value, traceback): await self.close() + async def _set_config(self, cursor, name, value): + cursor.execute( + """ + INSERT OR REPLACE INTO config (id, name, value) VALUES + ((SELECT id FROM config WHERE name=:name), :name, :value) + """, + { + "name": name, + "value": value, + }, + ) + + async def _get_config(self, cursor, name): + cursor.execute( + "SELECT value FROM config WHERE name=:name", + { + "name": name, + }, + ) + row = cursor.fetchone() + if row is None: + return None + return row["value"] + async def close(self): self.db.close() @@ -150,8 +225,8 @@ class Database(object): with closing(self.db.cursor()) as cursor: cursor.execute( """ - SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash ORDER BY outhashes_v2.created ASC LIMIT 1 @@ -167,8 +242,8 @@ class Database(object): with closing(self.db.cursor()) as cursor: cursor.execute( """ - SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash ORDER BY outhashes_v2.created ASC LIMIT 1 @@ -200,8 +275,8 @@ class Database(object): with closing(self.db.cursor()) as cursor: cursor.execute( """ - SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash -- Select any matching output hash except the one we just inserted WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash -- Pick the oldest hash @@ -219,7 +294,7 @@ class Database(object): async def get_equivalent(self, method, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( - "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", + "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash", { "method": method, "taskhash": taskhash, @@ -229,15 +304,9 @@ class Database(object): async def remove(self, condition): def do_remove(columns, table_name, cursor): - where = {} - for c in columns: - if c in condition and condition[c] is not None: - where[c] = condition[c] - + where, clause = _make_condition_statement(columns, condition) if where: - query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( - "%s=:%s" % (k, k) for k in where.keys() - ) + query = f"DELETE FROM {table_name} WHERE {clause}" cursor.execute(query, where) return cursor.rowcount @@ -246,17 +315,80 @@ class Database(object): count = 0 with closing(self.db.cursor()) as cursor: count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) - count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) + count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor) self.db.commit() return count + async def get_current_gc_mark(self): + with closing(self.db.cursor()) as cursor: + return await self._get_config(cursor, "gc-mark") + + async def gc_status(self): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT COUNT() FROM unihashes_v3 WHERE + gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + """ + ) + keep_rows = cursor.fetchone()[0] + + cursor.execute( + """ + SELECT COUNT() FROM unihashes_v3 WHERE + gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + """ + ) + remove_rows = cursor.fetchone()[0] + + current_mark = await self._get_config(cursor, "gc-mark") + + return (keep_rows, remove_rows, current_mark) + + async def gc_mark(self, mark, condition): + with closing(self.db.cursor()) as cursor: + await self._set_config(cursor, "gc-mark", mark) + + where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition) + + new_rows = 0 + if where: + cursor.execute( + f""" + UPDATE unihashes_v3 SET + gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + WHERE {clause} + """, + where, + ) + new_rows = cursor.rowcount + + self.db.commit() + return new_rows + + async def gc_sweep(self): + with closing(self.db.cursor()) as cursor: + # NOTE: COALESCE is not used in this query so that if the current + # mark is NULL, nothing will happen + cursor.execute( + """ + DELETE FROM unihashes_v3 WHERE + gc_mark!=(SELECT value FROM config WHERE name='gc-mark') + """ + ) + count = cursor.rowcount + await self._set_config(cursor, "gc-mark", None) + + self.db.commit() + return count + async def clean_unused(self, oldest): with closing(self.db.cursor()) as cursor: cursor.execute( """ DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( - SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 + SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1 ) """, { @@ -271,7 +403,13 @@ class Database(object): prevrowid = cursor.lastrowid cursor.execute( """ - INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) + INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES + ( + :method, + :taskhash, + :unihash, + COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + ) """, { "method": method, @@ -383,14 +521,9 @@ class Database(object): async def get_usage(self): usage = {} with closing(self.db.cursor()) as cursor: - if self.sqlite_version >= (3, 33): - table_name = "sqlite_schema" - else: - table_name = "sqlite_master" - cursor.execute( f""" - SELECT name FROM {table_name} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' + SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' """ ) for row in cursor.fetchall(): diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 869f7636c53..aeedab3575e 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -810,6 +810,27 @@ class HashEquivalenceCommonTests(object): with self.auth_perms("@user-admin") as client: become = client.become_user(client.username) + def test_auth_gc(self): + admin_client = self.start_auth_server() + + with self.auth_perms() as client, self.assertRaises(InvokeError): + client.gc_mark("ABC", {"unihash": "123"}) + + with self.auth_perms() as client, self.assertRaises(InvokeError): + client.gc_status() + + with self.auth_perms() as client, self.assertRaises(InvokeError): + client.gc_sweep("ABC") + + with self.auth_perms("@db-admin") as client: + client.gc_mark("ABC", {"unihash": "123"}) + + with self.auth_perms("@db-admin") as client: + client.gc_status() + + with self.auth_perms("@db-admin") as client: + client.gc_sweep("ABC") + def test_get_db_usage(self): usage = self.client.get_db_usage() @@ -837,6 +858,147 @@ class HashEquivalenceCommonTests(object): data = client.get_taskhash(self.METHOD, taskhash, True) self.assertEqual(data["owner"], user["username"]) + def test_gc(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) + + # Second hash is still there; mark doesn't delete hashes + self.assertClientGetHash(self.client, taskhash2, unihash2) + + ret = self.client.gc_sweep("ABC") + self.assertEqual(ret, {"count": 1}) + + # Hash is gone. Taskhash is returned for second hash + self.assertClientGetHash(self.client, taskhash2, None) + # First hash is still present + self.assertClientGetHash(self.client, taskhash, unihash) + + def test_gc_switch_mark(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) + + # Second hash is still there; mark doesn't delete hashes + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Switch to a different mark and mark the second hash. This will start + # a new collection cycle + ret = self.client.gc_mark("DEF", {"unihash": unihash2, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "DEF", "keep": 1, "remove": 1}) + + # Both hashes are still present + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, unihash) + + # Sweep with the new mark + ret = self.client.gc_sweep("DEF") + self.assertEqual(ret, {"count": 1}) + + # First hash is gone, second is kept + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, None) + + def test_gc_switch_sweep_mark(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) + + # Sweeping with a different mark raises an error + with self.assertRaises(InvokeError): + self.client.gc_sweep("DEF") + + # Both hashes are present + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, unihash) + + def test_gc_new_hashes(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + # Start a new garbage collection + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 0}) + + # Add second hash. It should inherit the mark from the current garbage + # collection operation + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Sweep should remove nothing + ret = self.client.gc_sweep("ABC") + self.assertEqual(ret, {"count": 0}) + + # Both hashes are present + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, unihash) + class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): def get_server_addr(self, server_idx): @@ -1086,6 +1248,42 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): "get-db-query-columns", ], check=True) + def test_gc(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + self.run_hashclient([ + "--address", self.server_address, + "gc-mark", "ABC", + "--where", "unihash", unihash, + "--where", "method", self.METHOD + ], check=True) + + # Second hash is still there; mark doesn't delete hashes + self.assertClientGetHash(self.client, taskhash2, unihash2) + + self.run_hashclient([ + "--address", self.server_address, + "gc-sweep", "ABC", + ], check=True) + + # Hash is gone. Taskhash is returned for second hash + self.assertClientGetHash(self.client, taskhash2, None) + # First hash is still present + self.assertClientGetHash(self.client, taskhash, unihash) + class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): def get_server_addr(self, server_idx): -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 2/8] hashserv: sqlalchemy: Use _execute() helper 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 1/8] hashserv: Add Unihash Garbage Collection Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 3/8] hashserv: Add unihash-exists API Joshua Watt ` (5 subsequent siblings) 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Use the _execute() helper to execute queries. This helper does the logging of the statement that was being done manually everywhere. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/hashserv/sqlalchemy.py | 297 ++++++++++++++--------------- 1 file changed, 140 insertions(+), 157 deletions(-) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 89a6b86d9d8..873547809a0 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -233,124 +233,113 @@ class Database(object): return row.value async def get_unihash_by_taskhash_full(self, method, taskhash): - statement = ( - select( - OuthashesV2, - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.taskhash == taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2, + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.taskhash == taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_unihash_by_outhash(self, method, outhash): - statement = ( - select(OuthashesV2, UnihashesV3.unihash.label("unihash")) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2, UnihashesV3.unihash.label("unihash")) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_outhash(self, method, outhash): - statement = ( - select(OuthashesV2) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent_for_outhash(self, method, outhash, taskhash): - statement = ( - select( - OuthashesV2.taskhash.label("taskhash"), - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - OuthashesV2.taskhash != taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2.taskhash.label("taskhash"), + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + OuthashesV2.taskhash != taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent(self, method, taskhash): - statement = select( - UnihashesV3.unihash, - UnihashesV3.method, - UnihashesV3.taskhash, - ).where( - UnihashesV3.method == method, - UnihashesV3.taskhash == taskhash, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + UnihashesV3.unihash, + UnihashesV3.method, + UnihashesV3.taskhash, + ).where( + UnihashesV3.method == method, + UnihashesV3.taskhash == taskhash, + ) + ) return map_row(result.first()) async def remove(self, condition): async def do_remove(table): where = _make_condition_statement(table, condition) if where: - statement = delete(table).where(*where) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute(delete(table).where(*where)) return result.rowcount return 0 @@ -417,21 +406,21 @@ class Database(object): return result.rowcount async def clean_unused(self, oldest): - statement = delete(OuthashesV2).where( - OuthashesV2.created < oldest, - ~( - select(UnihashesV3.id) - .where( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ) - .limit(1) - .exists() - ), - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(OuthashesV2).where( + OuthashesV2.created < oldest, + ~( + select(UnihashesV3.id) + .where( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ) + .limit(1) + .exists() + ), + ) + ) return result.rowcount async def insert_unihash(self, method, taskhash, unihash): @@ -461,11 +450,9 @@ class Database(object): if "created" in data and not isinstance(data["created"], datetime): data["created"] = datetime.fromisoformat(data["created"]) - statement = insert(OuthashesV2).values(**data) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute(insert(OuthashesV2).values(**data)) return True except IntegrityError: self.logger.debug( @@ -474,16 +461,16 @@ class Database(object): return False async def _get_user(self, username): - statement = select( - Users.username, - Users.permissions, - Users.token, - ).where( - Users.username == username, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + Users.token, + ).where( + Users.username == username, + ) + ) return result.first() async def lookup_user_token(self, username): @@ -496,70 +483,66 @@ class Database(object): return map_user(await self._get_user(username)) async def set_user_token(self, username, token): - statement = ( - update(Users) - .where( - Users.username == username, - ) - .values( - token=token, - ) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where( + Users.username == username, + ) + .values( + token=token, + ) + ) return result.rowcount != 0 async def set_user_perms(self, username, permissions): - statement = ( - update(Users) - .where(Users.username == username) - .values(permissions=" ".join(permissions)) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where(Users.username == username) + .values(permissions=" ".join(permissions)) + ) return result.rowcount != 0 async def get_all_users(self): - statement = select( - Users.username, - Users.permissions, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + ) + ) return [map_user(row) for row in result] async def new_user(self, username, permissions, token): - statement = insert(Users).values( - username=username, - permissions=" ".join(permissions), - token=token, - ) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute( + insert(Users).values( + username=username, + permissions=" ".join(permissions), + token=token, + ) + ) return True except IntegrityError as e: self.logger.debug("Cannot create new user %s: %s", username, e) return False async def delete_user(self, username): - statement = delete(Users).where(Users.username == username) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(Users).where(Users.username == username) + ) return result.rowcount != 0 async def get_usage(self): usage = {} async with self.db.begin() as session: for name, table in Base.metadata.tables.items(): - statement = select(func.count()).select_from(table) - self.logger.debug("%s", statement) - result = await self.db.execute(statement) + result = await self._execute( + statement=select(func.count()).select_from(table) + ) usage[name] = { "rows": result.scalar(), } -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 3/8] hashserv: Add unihash-exists API 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 1/8] hashserv: Add Unihash Garbage Collection Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 2/8] hashserv: sqlalchemy: Use _execute() helper Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 4/8] asyncrpc: Add Client Pool object Joshua Watt ` (4 subsequent siblings) 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Adds API to check if the server is aware of the existence of a given unihash. This can be used as an optimization for sstate where a client can query the hash equivalence server to check if a unihash exists before querying the sstate cache. If the hash server isn't aware of the existence of a unihash, then there is very likely not a matching sstate object, so this should be able to significantly cut down on the number of negative hits on the sstate cache. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/bin/bitbake-hashclient | 13 +++++++ bitbake/lib/hashserv/client.py | 44 ++++++++++++++++----- bitbake/lib/hashserv/server.py | 61 +++++++++++++++++++----------- bitbake/lib/hashserv/sqlalchemy.py | 11 ++++++ bitbake/lib/hashserv/sqlite.py | 16 ++++++++ bitbake/lib/hashserv/tests.py | 39 +++++++++++++++++++ 6 files changed, 151 insertions(+), 33 deletions(-) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index f71b87404ae..47dd27cd3c2 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -217,6 +217,14 @@ def main(): print("Removed %d rows" % result["count"]) return 0 + def handle_unihash_exists(args, client): + result = client.unihash_exists(args.unihash) + if args.quiet: + return 0 if result else 1 + + print("true" if result else "false") + return 0 + parser = argparse.ArgumentParser(description='Hash Equivalence Client') parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') @@ -309,6 +317,11 @@ def main(): gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") gc_sweep_parser.set_defaults(func=handle_gc_sweep) + unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server") + unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not") + unihash_exists_parser.add_argument("unihash", help="Unihash to check") + unihash_exists_parser.set_defaults(func=handle_unihash_exists) + args = parser.parse_args() logger = logging.getLogger('hashserv') diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index e6dc4179126..daf1e128423 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client") class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 + MODE_EXIST_STREAM = 2 def __init__(self, username=None, password=None): super().__init__("OEHASHEQUIV", "1.1", logger) @@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self.socket.send("END") return await self.socket.recv() - if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: + async def normal_to_stream(command): + r = await self.invoke({command: None}) + if r != "ok": + raise ConnectionError( + f"Unable to transition to stream mode: Bad response from server {r!r}" + ) + + self.logger.debug("Mode is now %s", command) + + if new_mode == self.mode: + return + + self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) + + # Always transition to normal mode before switching to any other mode + if self.mode != self.MODE_NORMAL: r = await self._send_wrapper(stream_to_normal) if r != "ok": self.check_invoke_error(r) - raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) - elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: - r = await self.invoke({"get-stream": None}) - if r != "ok": - raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) - elif new_mode != self.mode: - raise Exception( - "Undefined mode transition %r -> %r" % (self.mode, new_mode) - ) + raise ConnectionError( + f"Unable to transition to normal mode: Bad response from server {r!r}" + ) + self.logger.debug("Mode is now normal") + + if new_mode == self.MODE_GET_STREAM: + await normal_to_stream("get-stream") + elif new_mode == self.MODE_EXIST_STREAM: + await normal_to_stream("exists-stream") + elif new_mode != self.MODE_NORMAL: + raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") self.mode = new_mode @@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient): {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} ) + async def unihash_exists(self, unihash): + await self._set_mode(self.MODE_EXIST_STREAM) + r = await self.send_stream(unihash) + return r == "true" + async def get_outhash(self, method, outhash, taskhash, with_unihash=True): await self._set_mode(self.MODE_NORMAL) return await self.invoke( @@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client): "report_unihash", "report_unihash_equiv", "get_taskhash", + "unihash_exists", "get_outhash", "get_stats", "reset_stats", diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 5ed852d1f30..68f64f983b2 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): "get": self.handle_get, "get-outhash": self.handle_get_outhash, "get-stream": self.handle_get_stream, + "exists-stream": self.handle_exists_stream, "get-stats": self.handle_get_stats, "get-db-usage": self.handle_get_db_usage, "get-db-query-columns": self.handle_get_db_query_columns, @@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) await self.db.insert_outhash(data) - @permissions(READ_PERM) - async def handle_get_stream(self, request): + async def _stream_handler(self, handler): await self.socket.send_message("ok") while True: @@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if l == "END": break - (method, taskhash) = l.split() - # self.logger.debug('Looking up %s %s' % (method, taskhash)) - row = await self.db.get_equivalent(method, taskhash) - - if row is not None: - msg = row["unihash"] - # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) - elif self.upstream_client is not None: - upstream = await self.upstream_client.get_unihash(method, taskhash) - if upstream: - msg = upstream - else: - msg = "" - else: - msg = "" - + msg = await handler(l) await self.socket.send(msg) finally: request_measure.end() self.request_sample.end() - # Post to the backfill queue after writing the result to minimize - # the turn around time on a request - if upstream is not None: - await self.server.backfill_queue.put((method, taskhash)) - await self.socket.send("ok") return self.NO_RESPONSE + @permissions(READ_PERM) + async def handle_get_stream(self, request): + async def handler(l): + (method, taskhash) = l.split() + # self.logger.debug('Looking up %s %s' % (method, taskhash)) + row = await self.db.get_equivalent(method, taskhash) + + if row is not None: + # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + return row["unihash"] + + if self.upstream_client is not None: + upstream = await self.upstream_client.get_unihash(method, taskhash) + if upstream: + await self.server.backfill_queue.put((method, taskhash)) + return upstream + + return "" + + return await self._stream_handler(handler) + + @permissions(READ_PERM) + async def handle_exists_stream(self, request): + async def handler(l): + if await self.db.unihash_exists(l): + return "true" + + if self.upstream_client is not None: + if await self.upstream_client.unihash_exists(l): + return "true" + + return "false" + + return await self._stream_handler(handler) + async def report_readonly(self, data): method = data["method"] outhash = data["outhash"] diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 873547809a0..0e28d738f5a 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -48,6 +48,7 @@ class UnihashesV3(Base): __table_args__ = ( UniqueConstraint("method", "taskhash"), Index("taskhash_lookup_v4", "method", "taskhash"), + Index("unihash_lookup_v1", "unihash"), ) @@ -279,6 +280,16 @@ class Database(object): ) return map_row(result.first()) + async def unihash_exists(self, unihash): + async with self.db.begin(): + result = await self._execute( + select(UnihashesV3) + .where(UnihashesV3.unihash == unihash) + .limit(1) + ) + + return result.first() is not None + async def get_outhash(self, method, outhash): async with self.db.begin(): result = await self._execute( diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index 608490730d7..da2e844a031 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py @@ -144,6 +144,9 @@ class DatabaseEngine(object): cursor.execute( "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" + ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) @@ -255,6 +258,19 @@ class Database(object): ) return cursor.fetchone() + async def unihash_exists(self, unihash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT * FROM unihashes_v3 WHERE unihash=:unihash + LIMIT 1 + """, + { + "unihash": unihash, + }, + ) + return cursor.fetchone() is not None + async def get_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index aeedab3575e..fbbe81512a0 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object): self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') self.assertEqual(result['method'], self.METHOD) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + self.assertTrue(self.client.unihash_exists(unihash)) + self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8')) + def test_ro_server(self): rw_server = self.start_server() rw_client = self.start_client(rw_server.address) @@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): def test_stress(self): self.run_hashclient(["--address", self.server_address, "stress"], check=True) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + ], check=True) + self.assertEqual(p.stdout.strip(), "true") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + ], check=True) + self.assertEqual(p.stdout.strip(), "false") + + def test_unihash_exsits_quiet(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + "--quiet", + ]) + self.assertEqual(p.returncode, 0) + self.assertEqual(p.stdout.strip(), "") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + "--quiet", + ]) + self.assertEqual(p.returncode, 1) + self.assertEqual(p.stdout.strip(), "") + def test_remove_taskhash(self): taskhash, outhash, unihash = self.create_test_hash(self.client) self.run_hashclient([ -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 4/8] asyncrpc: Add Client Pool object 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt ` (2 preceding siblings ...) 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 3/8] hashserv: Add unihash-exists API Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 5/8] hashserv: Add Client Pool Joshua Watt ` (3 subsequent siblings) 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Adds an abstract base class that can be used to implement a pool of client connections. The class implements a thread that runs an async event loop, and allows derived classes to schedule work on the loop and wait for the work to be finished. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/bb/asyncrpc/__init__.py | 2 +- bitbake/lib/bb/asyncrpc/client.py | 77 +++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index a4371643d74..639e1607f8e 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py @@ -5,7 +5,7 @@ # -from .client import AsyncClient, Client +from .client import AsyncClient, Client, ClientPool from .serv import AsyncServer, AsyncServerConnection from .connection import DEFAULT_MAX_CHUNK from .exceptions import ( diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 0d7cd85780d..a6228bb0ba0 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py @@ -10,6 +10,8 @@ import json import os import socket import sys +import contextlib +from threading import Thread from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK from .exceptions import ConnectionClosedError, InvokeError @@ -180,3 +182,78 @@ class Client(object): def __exit__(self, exc_type, exc_value, traceback): self.close() return False + + +class ClientPool(object): + def __init__(self, max_clients): + self.avail_clients = [] + self.num_clients = 0 + self.max_clients = max_clients + self.loop = None + self.client_condition = None + + @abc.abstractmethod + async def _new_client(self): + raise NotImplementedError("Must be implemented in derived class") + + def close(self): + if self.client_condition: + self.client_condition = None + + if self.loop: + self.loop.run_until_complete(self.__close_clients()) + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + self.loop = None + + def run_tasks(self, tasks): + if not self.loop: + self.loop = asyncio.new_event_loop() + + thread = Thread(target=self.__thread_main, args=(tasks,)) + thread.start() + thread.join() + + @contextlib.asynccontextmanager + async def get_client(self): + async with self.client_condition: + if self.avail_clients: + client = self.avail_clients.pop() + elif self.num_clients < self.max_clients: + self.num_clients += 1 + client = await self._new_client() + else: + while not self.avail_clients: + await self.client_condition.wait() + client = self.avail_clients.pop() + + try: + yield client + finally: + async with self.client_condition: + self.avail_clients.append(client) + self.client_condition.notify() + + def __thread_main(self, tasks): + async def process_task(task): + async with self.get_client() as client: + await task(client) + + asyncio.set_event_loop(self.loop) + if not self.client_condition: + self.client_condition = asyncio.Condition() + tasks = [process_task(t) for t in tasks] + self.loop.run_until_complete(asyncio.gather(*tasks)) + + async def __close_clients(self): + for c in self.avail_clients: + await c.close() + self.avail_clients = [] + self.num_clients = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 5/8] hashserv: Add Client Pool 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt ` (3 preceding siblings ...) 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 4/8] asyncrpc: Add Client Pool object Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 6/8] siggen: Add parallel query API Joshua Watt ` (2 subsequent siblings) 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Implements a Client Pool derived from the AsyncRPC client pool that allows querying for multiple equivalent hashes in parallel Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/hashserv/client.py | 80 ++++++++++++++++++++++++++++++++ bitbake/lib/hashserv/tests.py | 83 ++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index daf1e128423..b269879ecfd 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -283,3 +283,83 @@ class Client(bb.asyncrpc.Client): def _get_async_client(self): return AsyncClient(self.username, self.password) + + +class ClientPool(bb.asyncrpc.ClientPool): + def __init__( + self, + address, + max_clients, + *, + username=None, + password=None, + become=None, + ): + super().__init__(max_clients) + self.address = address + self.username = username + self.password = password + self.become = become + + async def _new_client(self): + client = await create_async_client( + self.address, + username=self.username, + password=self.password, + ) + if self.become: + await client.become_user(self.become) + return client + + def _run_key_tasks(self, queries, call): + results = {key: None for key in queries.keys()} + + def make_task(key, args): + async def task(client): + nonlocal results + unihash = await call(client, args) + results[key] = unihash + + return task + + def gen_tasks(): + for key, args in queries.items(): + yield make_task(key, args) + + self.run_tasks(gen_tasks()) + return results + + def get_unihashes(self, queries): + """ + Query multiple unihashes in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a tuple of (method, taskhash). + + Returns a dictionary with a corresponding key for each input key, and + the value is the queried unihash (which might be none if the query + failed) + """ + + async def call(client, args): + method, taskhash = args + return await client.get_unihash(method, taskhash) + + return self._run_key_tasks(queries, call) + + def unihashes_exist(self, queries): + """ + Query multiple unihash existence checks in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a unihash. + + Returns a dictionary with a corresponding key for each input key, and + the value is True or False if the unihash is known by the server (or + None if there was a failure) + """ + + async def call(client, unihash): + return await client.unihash_exists(unihash) + + return self._run_key_tasks(queries, call) diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index fbbe81512a0..0809453cf87 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -8,6 +8,7 @@ from . import create_server, create_client from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS from bb.asyncrpc import InvokeError +from .client import ClientPool import hashlib import logging import multiprocessing @@ -554,6 +555,88 @@ class HashEquivalenceCommonTests(object): # shares a taskhash with Task 2 self.assertClientGetHash(self.client, taskhash2, unihash2) + + def test_client_pool_get_unihashes(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + + query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} + for idx, taskhash in enumerate(EXTRA_QUERIES): + query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) + + result = client_pool.get_unihashes(query) + + self.assertDictEqual(result, { + 0: "218e57509998197d570e2c98512d0105985dffc9", + 1: "218e57509998197d570e2c98512d0105985dffc9", + 2: "218e57509998197d570e2c98512d0105985dffc9", + 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", + 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", + 7: None, + }) + + def test_client_pool_unihash_exists(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + result_unihashes = set() + + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + result_unihashes.add(result["unihash"]) + + query = {} + expected = {} + + for _, _, unihash in TEST_INPUT: + idx = len(query) + query[idx] = unihash + expected[idx] = unihash in result_unihashes + + + for unihash in EXTRA_QUERIES: + idx = len(query) + query[idx] = unihash + expected[idx] = False + + result = client_pool.unihashes_exist(query) + self.assertDictEqual(result, expected) + + def test_auth_read_perms(self): admin_client = self.start_auth_server() -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 6/8] siggen: Add parallel query API 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt ` (4 preceding siblings ...) 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 5/8] hashserv: Add Client Pool Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 7/8] siggen: Add parallel unihash exist API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 8/8] bitbake: hashserv: Postgres adaptations for ignoring duplicate inserts Joshua Watt 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Implements a new API called get_unihashes() that allows for querying multiple unihashes in parallel. The API is also reworked to make it easier for derived classes to interface with the new API in a consistent manner. Instead of overriding get_unihash() to add custom handling for local hash calculating (e.g. caches) derived classes should now override get_cached_unihash(), and return the local unihash or None if there isn't one. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/bb/siggen.py | 121 ++++++++++++++++++++++++++++----------- 1 file changed, 87 insertions(+), 34 deletions(-) diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py index 58854aee76c..e1a4fa2cdd1 100644 --- a/bitbake/lib/bb/siggen.py +++ b/bitbake/lib/bb/siggen.py @@ -102,9 +102,18 @@ class SignatureGenerator(object): if flag: self.datacaches[mc].stamp_extrainfo[mcfn][t] = flag + def get_cached_unihash(self, tid): + return None + def get_unihash(self, tid): + unihash = self.get_cached_unihash(tid) + if unihash: + return unihash return self.taskhash[tid] + def get_unihashes(self, tids): + return {tid: self.get_unihash(tid) for tid in tids} + def prep_taskhash(self, tid, deps, dataCaches): return @@ -524,28 +533,37 @@ class SignatureGeneratorUniHashMixIn(object): super().__init__(data) def get_taskdata(self): - return (self.server, self.method, self.extramethod) + super().get_taskdata() + return (self.server, self.method, self.extramethod, self.max_parallel) + super().get_taskdata() def set_taskdata(self, data): - self.server, self.method, self.extramethod = data[:3] - super().set_taskdata(data[3:]) + self.server, self.method, self.extramethod, self.max_parallel = data[:4] + super().set_taskdata(data[4:]) def client(self): if getattr(self, '_client', None) is None: self._client = hashserv.create_client(self.server) return self._client + def client_pool(self): + if getattr(self, '_client_pool', None) is None: + self._client_pool = hashserv.client.ClientPool(self.server, self.max_parallel) + return self._client_pool + def reset(self, data): - if getattr(self, '_client', None) is not None: - self._client.close() - self._client = None + self.__close_clients() return super().reset(data) def exit(self): + self.__close_clients() + return super().exit() + + def __close_clients(self): if getattr(self, '_client', None) is not None: self._client.close() self._client = None - return super().exit() + if getattr(self, '_client_pool', None) is not None: + self._client_pool.close() + self._client_pool = None def get_stampfile_hash(self, tid): if tid in self.taskhash: @@ -578,7 +596,7 @@ class SignatureGeneratorUniHashMixIn(object): return None return unihash - def get_unihash(self, tid): + def get_cached_unihash(self, tid): taskhash = self.taskhash[tid] # If its not a setscene task we can return @@ -593,40 +611,74 @@ class SignatureGeneratorUniHashMixIn(object): self.unihash[tid] = unihash return unihash - # In the absence of being able to discover a unique hash from the - # server, make it be equivalent to the taskhash. The unique "hash" only - # really needs to be a unique string (not even necessarily a hash), but - # making it match the taskhash has a few advantages: - # - # 1) All of the sstate code that assumes hashes can be the same - # 2) It provides maximal compatibility with builders that don't use - # an equivalency server - # 3) The value is easy for multiple independent builders to derive the - # same unique hash from the same input. This means that if the - # independent builders find the same taskhash, but it isn't reported - # to the server, there is a better chance that they will agree on - # the unique hash. - unihash = taskhash + return None - try: - method = self.method - if tid in self.extramethod: - method = method + self.extramethod[tid] - data = self.client().get_unihash(method, self.taskhash[tid]) - if data: - unihash = data + def _get_method(self, tid): + method = self.method + if tid in self.extramethod: + method = method + self.extramethod[tid] + + return method + + def get_unihash(self, tid): + return self.get_unihashes([tid])[tid] + + def get_unihashes(self, tids): + """ + For a iterable of tids, returns a dictionary that maps each tid to a + unihash + """ + result = {} + queries = {} + query_result = {} + + for tid in tids: + unihash = self.get_cached_unihash(tid) + if unihash: + result[tid] = unihash + else: + queries[tid] = (self._get_method(tid), self.taskhash[tid]) + + if len(queries) == 0: + return result + + if self.max_parallel <= 1 or len(queries) <= 1: + # No parallelism required. Make the query serially with the single client + for tid, args in queries.items(): + query_result[tid] = self.client().get_unihash(*args) + else: + query_result = self.client_pool().get_unihashes(queries) + + for tid, unihash in query_result.items(): + # In the absence of being able to discover a unique hash from the + # server, make it be equivalent to the taskhash. The unique "hash" only + # really needs to be a unique string (not even necessarily a hash), but + # making it match the taskhash has a few advantages: + # + # 1) All of the sstate code that assumes hashes can be the same + # 2) It provides maximal compatibility with builders that don't use + # an equivalency server + # 3) The value is easy for multiple independent builders to derive the + # same unique hash from the same input. This means that if the + # independent builders find the same taskhash, but it isn't reported + # to the server, there is a better chance that they will agree on + # the unique hash. + taskhash = self.taskhash[tid] + if unihash: # A unique hash equal to the taskhash is not very interesting, # so it is reported it at debug level 2. If they differ, that # is much more interesting, so it is reported at debug level 1 hashequiv_logger.bbdebug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server)) else: hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server)) - except ConnectionError as e: - bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) + unihash = taskhash - self.set_unihash(tid, unihash) - self.unihash[tid] = unihash - return unihash + + self.set_unihash(tid, unihash) + self.unihash[tid] = unihash + result[tid] = unihash + + return result def report_unihash(self, path, task, d): import importlib @@ -754,6 +806,7 @@ class SignatureGeneratorTestEquivHash(SignatureGeneratorUniHashMixIn, SignatureG super().init_rundepcheck(data) self.server = data.getVar('BB_HASHSERVE') self.method = "sstate_output_hash" + self.max_parallel = 1 def clean_checksum_file_path(file_checksum_tuple): f, cs = file_checksum_tuple -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 7/8] siggen: Add parallel unihash exist API 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt ` (5 preceding siblings ...) 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 6/8] siggen: Add parallel query API Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 8/8] bitbake: hashserv: Postgres adaptations for ignoring duplicate inserts Joshua Watt 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel; +Cc: Joshua Watt Adds API to query if unihashes are known to the server in parallel Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/bb/siggen.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py index e1a4fa2cdd1..3ab8431a089 100644 --- a/bitbake/lib/bb/siggen.py +++ b/bitbake/lib/bb/siggen.py @@ -530,6 +530,11 @@ class SignatureGeneratorBasicHash(SignatureGeneratorBasic): class SignatureGeneratorUniHashMixIn(object): def __init__(self, data): self.extramethod = {} + # NOTE: The cache only tracks hashes that exist. Hashes that don't + # exist are always queries from the server since it is possible for + # hashes to appear over time, but much less likely for them to + # disappear + self.unihash_exists_cache = set() super().__init__(data) def get_taskdata(self): @@ -620,6 +625,33 @@ class SignatureGeneratorUniHashMixIn(object): return method + def unihashes_exist(self, query): + if len(query) == 0: + return {} + + uncached_query = {} + result = {} + for key, unihash in query.items(): + if unihash in self.unihash_exists_cache: + result[key] = True + else: + uncached_query[key] = unihash + + if self.max_parallel <= 1 or len(uncached_query) <= 1: + # No parallelism required. Make the query serially with the single client + uncached_result = { + key: self.client().unihash_exists(value) for key, value in uncached_query.items() + } + else: + uncached_result = self.client_pool().unihashes_exist(uncached_query) + + for key, exists in uncached_result.items(): + if exists: + self.unihash_exists_cache.add(query[key]) + result[key] = exists + + return result + def get_unihash(self, tid): return self.get_unihashes([tid])[tid] -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
* [bitbake-devel][PATCH v2 8/8] bitbake: hashserv: Postgres adaptations for ignoring duplicate inserts 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt ` (6 preceding siblings ...) 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 7/8] siggen: Add parallel unihash exist API Joshua Watt @ 2024-02-18 22:59 ` Joshua Watt 7 siblings, 0 replies; 15+ messages in thread From: Joshua Watt @ 2024-02-18 22:59 UTC (permalink / raw) To: bitbake-devel Cc: Tobias Hagelborn, Tobias Hagelborn, Richard Purdie, Joshua Watt From: Tobias Hagelborn <tobias.hagelborn@axis.com> Hash Equivalence server performs unconditional insert also of duplicate hash entries. This causes excessive error log entries in Postgres. Rather ignore the duplicate inserts. The alternate behavior should be isolated to the postgres engine type. Signed-off-by: Tobias Hagelborn <tobiasha@axis.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/hashserv/sqlalchemy.py | 53 +++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 0e28d738f5a..fc3ae3d3396 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -33,6 +33,7 @@ from sqlalchemy import ( import sqlalchemy.engine from sqlalchemy.orm import declarative_base from sqlalchemy.exc import IntegrityError +from sqlalchemy.dialects.postgresql import insert as postgres_insert Base = declarative_base() @@ -283,9 +284,7 @@ class Database(object): async def unihash_exists(self, unihash): async with self.db.begin(): result = await self._execute( - select(UnihashesV3) - .where(UnihashesV3.unihash == unihash) - .limit(1) + select(UnihashesV3).where(UnihashesV3.unihash == unihash).limit(1) ) return result.first() is not None @@ -435,18 +434,30 @@ class Database(object): return result.rowcount async def insert_unihash(self, method, taskhash, unihash): - try: - async with self.db.begin(): - await self._execute( - insert(UnihashesV3).values( - method=method, - taskhash=taskhash, - unihash=unihash, - gc_mark=self._get_config_subquery("gc-mark", ""), - ) + # Postgres specific ignore on insert duplicate + if self.engine.name == "postgresql": + statement = ( + postgres_insert(UnihashesV3) + .values( + method=method, + taskhash=taskhash, + unihash=unihash, + gc_mark=self._get_config_subquery("gc-mark", ""), ) + .on_conflict_do_nothing(index_elements=("method", "taskhash")) + ) + else: + statement = insert(UnihashesV3).values( + method=method, + taskhash=taskhash, + unihash=unihash, + gc_mark=self._get_config_subquery("gc-mark", ""), + ) - return True + try: + async with self.db.begin(): + result = await self._execute(statement) + return result.rowcount != 0 except IntegrityError: self.logger.debug( "%s, %s, %s already in unihash database", method, taskhash, unihash @@ -461,10 +472,22 @@ class Database(object): if "created" in data and not isinstance(data["created"], datetime): data["created"] = datetime.fromisoformat(data["created"]) + # Postgres specific ignore on insert duplicate + if self.engine.name == "postgresql": + statement = ( + postgres_insert(OuthashesV2) + .values(**data) + .on_conflict_do_nothing( + index_elements=("method", "taskhash", "outhash") + ) + ) + else: + statement = insert(OuthashesV2).values(**data) + try: async with self.db.begin(): - await self._execute(insert(OuthashesV2).values(**data)) - return True + result = await self._execute(statement) + return result.rowcount != 0 except IntegrityError: self.logger.debug( "%s, %s already in outhash database", data["method"], data["outhash"] -- 2.34.1 ^ permalink raw reply related [flat|nested] 15+ messages in thread
end of thread, other threads:[~2024-02-18 23:00 UTC | newest] Thread overview: 15+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2024-02-18 20:07 [bitbake-devel][PATCH 0/5] Implement parallel Query API Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 1/5] hashserv: sqlalchemy: Use _execute() helper Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 2/5] hashserv: Add unihash-exists API Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 3/5] asyncrpc: Add Client Pool object Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 4/5] hashserv: Add Client Pool Joshua Watt 2024-02-18 20:07 ` [bitbake-devel][PATCH 5/5] siggen: Add parallel query API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 0/8] Implement parallel Query API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 1/8] hashserv: Add Unihash Garbage Collection Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 2/8] hashserv: sqlalchemy: Use _execute() helper Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 3/8] hashserv: Add unihash-exists API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 4/8] asyncrpc: Add Client Pool object Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 5/8] hashserv: Add Client Pool Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 6/8] siggen: Add parallel query API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 7/8] siggen: Add parallel unihash exist API Joshua Watt 2024-02-18 22:59 ` [bitbake-devel][PATCH v2 8/8] bitbake: hashserv: Postgres adaptations for ignoring duplicate inserts Joshua Watt
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.