From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from smtp.kernel.org (aws-us-west-2-korg-mail-1.web.codeaurora.org [10.30.226.201]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id 76ADD280A56 for ; Sun, 19 Apr 2026 16:00:09 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=10.30.226.201 ARC-Seal:i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1776614410; cv=none; b=FtU1NvedJegO3kL5g7CAI5ucIJwIFDAEukBUH+kgtFnQHsYw8KReDNmgNJ076C12S1rMM0QCgQfCNoDJdr501V4zZLeZfNw3FlwiLv4I4fCXWASHuQpbBZyW+WfTBC5hiuQYLgXuspWgbUL0c8YjV/7e5cS2WlhgBOH6OXbRbrY= ARC-Message-Signature:i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1776614410; c=relaxed/simple; bh=YTMlxN/QEuAaVkOExUZDbatjdTzWtb833zZLm765YRs=; h=From:Date:Subject:MIME-Version:Content-Type:Message-Id:References: In-Reply-To:To:Cc; b=kCsTbjeHE7KuScdvuZ+hwWHkb0lBMRpcXVpPneXfNPXG3iLt0nfwSomkDA6/359Ld9OJwCEpWVZL6yVPRBT6u9pSJSHQwjNRZloFJG9iYLUobMqzVXsWjpolHJYZjS7Hh4djVisVQWUr0U9oGQIUMU6A/3/kGU5kjNqOAJOpDXM= ARC-Authentication-Results:i=1; smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=kernel.org header.i=@kernel.org header.b=YbmrjFnR; arc=none smtp.client-ip=10.30.226.201 Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=kernel.org header.i=@kernel.org header.b="YbmrjFnR" Received: by smtp.kernel.org (Postfix) id 6C53DC2BCAF; Sun, 19 Apr 2026 16:00:09 +0000 (UTC) Received: by smtp.kernel.org (Postfix) with ESMTPSA id 1D350C2BCB8; Sun, 19 Apr 2026 16:00:08 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=kernel.org; s=k20201202; t=1776614409; bh=YTMlxN/QEuAaVkOExUZDbatjdTzWtb833zZLm765YRs=; h=From:Date:Subject:References:In-Reply-To:To:Cc:From; b=YbmrjFnRQ0ZpWORTJJfyGrbuOuep2b90/Aq+oWswzpbfVlasUHahe+AoU67yvb1g8 gVM7XzPw24MArehCQfXHE0ozN/k7lrExU+pe8mKONTop4hqnCSrkwrtXmSQhOJsuaN tQUfKMpCqhSnUo/f7V2aJHccf6O/gzA8lA4/MCba1Xf5SMT9UBcPheQEC3IGBTUJfA 7iv53BFsex71nlhoV/dvgAB1gMwEJNZ2RP1LKLIvZNJmSIP3SD8DVkLSHvMpTWw8n+ 2y4Bv/qjCqE8zn/xGu/fYMql0Q67DqZpvGHquWg3uACvpW+zwJT7X6yKljPJynw/Gp 9WNG7xSWAbqbQ== From: Tamir Duberstein Date: Sun, 19 Apr 2026 11:59:59 -0400 Subject: [PATCH b4 v2 04/11] Add ruff format check to CI Precedence: bulk X-Mailing-List: tools@linux.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit Message-Id: <20260419-ruff-check-v2-4-089dfb264501@kernel.org> References: <20260419-ruff-check-v2-0-089dfb264501@kernel.org> In-Reply-To: <20260419-ruff-check-v2-0-089dfb264501@kernel.org> To: "Kernel.org Tools" Cc: Konstantin Ryabitsev , Tamir Duberstein , "str = 'reply"@example.com', "str = 'reviewer"@example.com', "str = 'author"@example.com', "str = 'patch1"@example.com' X-Mailer: b4 0.16-dev X-Developer-Signature: v=1; a=openpgp-sha256; l=1297583; i=tamird@kernel.org; h=from:subject:message-id; bh=YTMlxN/QEuAaVkOExUZDbatjdTzWtb833zZLm765YRs=; b=owGbwMvMwCV2wYdPVfy60HTG02pJDJlP/rAo6cy507j0Q64Uh9SuxDdNEofyfyx0Cv7/wa4+6 va8k9OUOyayMIhxMViKKbIkih7am556e49s5rvjMHNYmUCGSIs0MAABCwNfbmJeqZGOkZ6ptqGe oZGOgY4xAxenAEz132mMDAvZRQKt89hSrdNvtf0ysU6aE9HGaNyaH6D0y3bL/Z2xTxj+F2i3zxP nmHLo8NqPNv+qeBp6e/lPvoyVu952+5Ni9l8/XgA= X-Developer-Key: i=tamird@kernel.org; a=openpgp; fpr=5A6714204D41EC844C50273C19D6FF6092365380 Enable ruff format checking in the b4 CI script and configure Ruff's formatter in pyproject.toml. Apply a one-time repo-wide format pass so the new check enforces the current style without leaving the branch permanently red. Signed-off-by: Tamir Duberstein --- ci.sh | 1 + misc/retrieve_lore_thread.py | 2 +- misc/review-ci-example.py | 4 +- misc/send-receive.py | 256 ++++-- pyproject.toml | 3 + src/b4/__init__.py | 1333 +++++++++++++++++++++-------- src/b4/bugs/__init__.py | 44 +- src/b4/bugs/_import.py | 14 +- src/b4/bugs/_tui.py | 433 ++++++---- src/b4/command.py | 1354 ++++++++++++++++++++++------- src/b4/diff.py | 34 +- src/b4/dig.py | 58 +- src/b4/ez.py | 908 ++++++++++++++------ src/b4/kr.py | 8 +- src/b4/mbox.py | 274 ++++-- src/b4/pr.py | 127 ++- src/b4/review/__init__.py | 23 +- src/b4/review/_review.py | 485 +++++++---- src/b4/review/checks.py | 379 +++++---- src/b4/review/messages.py | 63 +- src/b4/review/tracking.py | 646 ++++++++------ src/b4/review_tui/__init__.py | 18 +- src/b4/review_tui/_common.py | 173 ++-- src/b4/review_tui/_entry.py | 66 +- src/b4/review_tui/_lite_app.py | 92 +- src/b4/review_tui/_modals.py | 612 +++++++++----- src/b4/review_tui/_pw_app.py | 173 ++-- src/b4/review_tui/_review_app.py | 426 +++++++--- src/b4/review_tui/_tracking_app.py | 1474 +++++++++++++++++++++----------- src/b4/tui/__init__.py | 1 + src/b4/tui/_common.py | 19 +- src/b4/tui/_modals.py | 38 +- src/b4/ty.py | 187 +++-- src/tests/conftest.py | 10 +- src/tests/test___init__.py | 799 ++++++++++++------ src/tests/test_ez.py | 143 +++- src/tests/test_mbox.py | 85 +- src/tests/test_messages.py | 21 +- src/tests/test_patatt.py | 30 +- src/tests/test_rethread.py | 174 ++-- src/tests/test_review.py | 1636 +++++++++++++++++++++--------------- src/tests/test_review_checks.py | 454 ++++++---- src/tests/test_review_show_info.py | 92 +- src/tests/test_review_tracking.py | 982 ++++++++++++++-------- src/tests/test_three_way_merge.py | 181 ++-- src/tests/test_tui_bugs.py | 38 +- src/tests/test_tui_modals.py | 44 +- src/tests/test_tui_review.py | 94 +-- src/tests/test_tui_tracking.py | 1332 ++++++++++++++++++----------- 49 files changed, 10637 insertions(+), 5206 deletions(-) diff --git a/ci.sh b/ci.sh index b65ae97..ddd4cff 100755 --- a/ci.sh +++ b/ci.sh @@ -2,5 +2,6 @@ set -eu +uv run ruff format --check uv run ruff check uv run mypy . diff --git a/misc/retrieve_lore_thread.py b/misc/retrieve_lore_thread.py index 4de39fb..aad586b 100644 --- a/misc/retrieve_lore_thread.py +++ b/misc/retrieve_lore_thread.py @@ -21,7 +21,7 @@ class Function(OpenAISchema): ) class Config: - title = "retrieve_lore_thread" + title = 'retrieve_lore_thread' @classmethod def execute(cls, message_id: str) -> str: diff --git a/misc/review-ci-example.py b/misc/review-ci-example.py index cbac2ae..dee0a5d 100755 --- a/misc/review-ci-example.py +++ b/misc/review-ci-example.py @@ -76,7 +76,9 @@ def main() -> None: if build_status == 'warn': build_result['details'] = 'Warning: unused variable in drivers/foo.c:42' elif build_status == 'fail': - build_result['details'] = 'Error: implicit declaration of function bar\n drivers/foo.c:57:5' + build_result['details'] = ( + 'Error: implicit declaration of function bar\n drivers/foo.c:57:5' + ) results.append(build_result) # Simulate a test suite check diff --git a/misc/send-receive.py b/misc/send-receive.py index 35c5e99..22a5d99 100644 --- a/misc/send-receive.py +++ b/misc/send-receive.py @@ -36,7 +36,6 @@ logger.setLevel(logging.DEBUG) class SendReceiveListener(object): - def __init__(self, _engine, _config) -> None: self._engine = _engine self._config = _config @@ -51,7 +50,9 @@ class SendReceiveListener(object): def _init_logger(self, logfile: str, loglevel: str) -> None: global logger lch = logging.handlers.WatchedFileHandler(os.path.expanduser(logfile)) - lfmt = logging.Formatter('[%(process)d] %(asctime)s - %(levelname)s - %(message)s') + lfmt = logging.Formatter( + '[%(process)d] %(asctime)s - %(levelname)s - %(message)s' + ) lch.setFormatter(lfmt) if loglevel == 'critical': lch.setLevel(logging.CRITICAL) @@ -65,18 +66,23 @@ class SendReceiveListener(object): logger.info('Setting up SQLite database') conn = self._engine.connect() md = sa.MetaData() - meta = sa.Table('meta', md, - sa.Column('version', sa.Integer()) - ) - auth = sa.Table('auth', md, - sa.Column('auth_id', sa.Integer(), primary_key=True), - sa.Column('created', sa.DateTime(), nullable=False, server_default=sa.sql.func.now()), - sa.Column('identity', sa.Text(), nullable=False), - sa.Column('selector', sa.Text(), nullable=False), - sa.Column('pubkey', sa.Text(), nullable=False), - sa.Column('challenge', sa.Text(), nullable=True), - sa.Column('verified', sa.Integer(), nullable=False), - ) + meta = sa.Table('meta', md, sa.Column('version', sa.Integer())) + auth = sa.Table( + 'auth', + md, + sa.Column('auth_id', sa.Integer(), primary_key=True), + sa.Column( + 'created', + sa.DateTime(), + nullable=False, + server_default=sa.sql.func.now(), + ), + sa.Column('identity', sa.Text(), nullable=False), + sa.Column('selector', sa.Text(), nullable=False), + sa.Column('pubkey', sa.Text(), nullable=False), + sa.Column('challenge', sa.Text(), nullable=True), + sa.Column('verified', sa.Integer(), nullable=False), + ) sa.Index('idx_identity_selector', auth.c.identity, auth.c.selector, unique=True) md.create_all(self._engine) q = sa.insert(meta).values(version=DB_VERSION) @@ -98,7 +104,9 @@ class SendReceiveListener(object): logger.debug('Returning success: %s', message) resp.text = json.dumps({'result': 'success', 'message': message}) - def get_smtp(self) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, None], Tuple[str, str]]: + def get_smtp( + self, + ) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, None], Tuple[str, str]]: sconfig = self._config['sendemail'] server = sconfig.get('smtpserver', 'localhost') port = sconfig.get('smtpserverport', 0) @@ -120,7 +128,9 @@ class SendReceiveListener(object): # We do TLS from the get-go smtp = smtplib.SMTP_SSL(server, port) else: - raise smtplib.SMTPException('Unclear what to do with smtpencryption=%s' % encryption) + raise smtplib.SMTPException( + 'Unclear what to do with smtpencryption=%s' % encryption + ) # If we got to this point, we should do authentication. auser = sconfig.get('smtpuser') @@ -144,21 +154,35 @@ class SendReceiveListener(object): logger.info('New authentication request for %s/%s', identity, selector) pubkey = jdata.get('pubkey') t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine) - q = sa.select([t_auth.c.auth_id]).where(t_auth.c.identity == identity, t_auth.c.selector == selector, - t_auth.c.verified == 1) + q = sa.select([t_auth.c.auth_id]).where( + t_auth.c.identity == identity, + t_auth.c.selector == selector, + t_auth.c.verified == 1, + ) rp = conn.execute(q) if len(rp.fetchall()): - self.send_error(resp, message='i=%s;s=%s is already authorized' % (identity, selector)) + self.send_error( + resp, message='i=%s;s=%s is already authorized' % (identity, selector) + ) return # delete any existing challenges for this and create a new one - q = sa.delete(t_auth).where(t_auth.c.identity == identity, t_auth.c.selector == selector, - t_auth.c.verified == 0) + q = sa.delete(t_auth).where( + t_auth.c.identity == identity, + t_auth.c.selector == selector, + t_auth.c.verified == 0, + ) conn.execute(q) # create new challenge import uuid + cstr = str(uuid.uuid4()) - q = sa.insert(t_auth).values(identity=identity, selector=selector, pubkey=pubkey, challenge=cstr, - verified=0) + q = sa.insert(t_auth).values( + identity=identity, + selector=selector, + pubkey=pubkey, + challenge=cstr, + verified=0, + ) conn.execute(q) logger.info('Created new challenge for %s/%s: %s', identity, selector, cstr) conn.close() @@ -172,7 +196,9 @@ class SendReceiveListener(object): tpt_subject = self._config['templates']['verify-subject'].strip() tpt_body = self._config['templates']['verify-body'].strip() signature = self._config['templates']['signature'].strip() - subject = Template(tpt_subject).safe_substitute({'identity': jdata.get('identity')}) + subject = Template(tpt_subject).safe_substitute( + {'identity': jdata.get('identity')} + ) cmsg.add_header('Subject', subject) name = jdata.get('name', 'Anonymous Llama') cmsg.add_header('To', f'{name} <{identity}>') @@ -215,9 +241,11 @@ class SendReceiveListener(object): if s: selector = s.decode() logger.debug('i=%s; s=%s', identity, selector) - q = sa.select([t_auth.c.auth_id, t_auth.c.pubkey]).where(t_auth.c.identity == identity, - t_auth.c.selector == selector, - t_auth.c.verified == verified) + q = sa.select([t_auth.c.auth_id, t_auth.c.pubkey]).where( + t_auth.c.identity == identity, + t_auth.c.selector == selector, + t_auth.c.verified == verified, + ) rp = conn.execute(q) res = rp.fetchall() if res: @@ -228,7 +256,9 @@ class SendReceiveListener(object): logger.debug('Did not find a matching identity!') raise patatt.NoKeyError('No match for this identity') - logger.debug('Found matching %s/%s with auth_id=%s', identity, selector, auth_id) + logger.debug( + 'Found matching %s/%s with auth_id=%s', identity, selector, auth_id + ) pm.validate(identity, pubkey.encode()) return identity, selector, auth_id @@ -243,11 +273,18 @@ class SendReceiveListener(object): t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine) bdata = msg.encode() try: - identity, selector, auth_id = self.validate_message(conn, t_auth, bdata, verified=0) + identity, selector, auth_id = self.validate_message( + conn, t_auth, bdata, verified=0 + ) except Exception as ex: self.send_error(resp, message='Signature validation failed: %s' % ex) return - logger.debug('Message validation passed for %s/%s with auth_id=%s', identity, selector, auth_id) + logger.debug( + 'Message validation passed for %s/%s with auth_id=%s', + identity, + selector, + auth_id, + ) # Now compare the challenge to what we received q = sa.select([t_auth.c.challenge]).where(t_auth.c.auth_id == auth_id) @@ -255,13 +292,28 @@ class SendReceiveListener(object): res = rp.fetchall() challenge = res[0][0] if msg.find(f'\nverify:{challenge}') < 0: - self.send_error(resp, message='Challenge verification for %s/%s did not match' % (identity, selector)) + self.send_error( + resp, + message='Challenge verification for %s/%s did not match' + % (identity, selector), + ) return - logger.info('Successfully verified challenge for %s/%s with auth_id=%s', identity, selector, auth_id) - q = sa.update(t_auth).where(t_auth.c.auth_id == auth_id).values(challenge=None, verified=1) + logger.info( + 'Successfully verified challenge for %s/%s with auth_id=%s', + identity, + selector, + auth_id, + ) + q = ( + sa.update(t_auth) + .where(t_auth.c.auth_id == auth_id) + .values(challenge=None, verified=1) + ) conn.execute(q) conn.close() - self.send_success(resp, message='Challenge verified for %s/%s' % (identity, selector)) + self.send_success( + resp, message='Challenge verified for %s/%s' % (identity, selector) + ) def auth_delete(self, jdata, resp) -> None: msg = jdata.get('msg') @@ -278,11 +330,15 @@ class SendReceiveListener(object): self.send_error(resp, message='Signature validation failed: %s' % ex) return - logger.info('Deleting record for %s/%s with auth_id=%s', identity, selector, auth_id) + logger.info( + 'Deleting record for %s/%s with auth_id=%s', identity, selector, auth_id + ) q = sa.delete(t_auth).where(t_auth.c.auth_id == auth_id) conn.execute(q) conn.close() - self.send_success(resp, message='Record deleted for %s/%s' % (identity, selector)) + self.send_success( + resp, message='Record deleted for %s/%s' % (identity, selector) + ) def clean_header(self, hdrval: str) -> str: if hdrval is None: @@ -312,7 +368,11 @@ class SendReceiveListener(object): # Remove any quoted-printable header junk from the name pair = (self.clean_header(pair[0]), pair[1]) # Work around https://github.com/python/cpython/issues/100900 - if not pair[0].startswith('=?') and not pair[0].startswith('"') and qspecials.search(pair[0]): + if ( + not pair[0].startswith('=?') + and not pair[0].startswith('"') + and qspecials.search(pair[0]) + ): quoted = email.utils.quote(pair[0]) addrs.append(f'"{quoted}" <{pair[1]}>') continue @@ -325,14 +385,21 @@ class SendReceiveListener(object): except AttributeError: return all([ord(c) < 128 for c in strval]) - def wrap_header(self, hdr, width: int = 75, nl: str = '\r\n', transform: str = 'preserve') -> bytes: + def wrap_header( + self, hdr, width: int = 75, nl: str = '\r\n', transform: str = 'preserve' + ) -> bytes: hname, hval = hdr if hname.lower() in ('to', 'cc', 'from', 'x-original-from'): _parts = [f'{hname}: '] first = True for addr in email.utils.getaddresses([hval]): if transform == 'encode' and not self.isascii(addr[0]): - addr = (email.quoprimime.header_encode(addr[0].encode(), charset='utf-8'), addr[1]) + addr = ( + email.quoprimime.header_encode( + addr[0].encode(), charset='utf-8' + ), + addr[1], + ) qp = self.format_addrs([addr], clean=False) elif transform == 'decode': qp = self.format_addrs([addr], clean=True) @@ -359,11 +426,18 @@ class SendReceiveListener(object): # Use simple textwrap, with a small trick that ensures that long non-breakable # strings don't show up on the next line from the bare header hdata = hdata.replace(': ', ':_', 1) - wrapped = textwrap.wrap(hdata, break_long_words=False, break_on_hyphens=False, - subsequent_indent=' ', width=width) + wrapped = textwrap.wrap( + hdata, + break_long_words=False, + break_on_hyphens=False, + subsequent_indent=' ', + width=width, + ) return nl.join(wrapped).replace(':_', ': ', 1).encode() - qp = f'{hname}: ' + email.quoprimime.header_encode(hval.encode(), charset='utf-8') + qp = f'{hname}: ' + email.quoprimime.header_encode( + hval.encode(), charset='utf-8' + ) # is it longer than width? if len(qp) <= width: return qp.encode() @@ -375,17 +449,22 @@ class SendReceiveListener(object): # Also allow for the ' ' at the front on continuation lines wrapat -= 1 # Make sure we don't break on a =XX escape sequence - while '=' in qp[wrapat - 2:wrapat]: + while '=' in qp[wrapat - 2 : wrapat]: wrapat -= 1 _parts.append(qp[:wrapat] + '?=') - qp = ('=?utf-8?q?' + qp[wrapat:]) + qp = '=?utf-8?q?' + qp[wrapat:] _parts.append(qp) return f'{nl} '.join(_parts).encode() - def get_msg_as_bytes(self, msg: email.message.Message, nl: str = '\r\n', headers: str = 'preserve') -> bytes: + def get_msg_as_bytes( + self, msg: email.message.Message, nl: str = '\r\n', headers: str = 'preserve' + ) -> bytes: bdata = b'' for hname, hval in msg.items(): - bdata += self.wrap_header((hname, str(hval)), nl=nl, transform=headers) + nl.encode() + bdata += ( + self.wrap_header((hname, str(hval)), nl=nl, transform=headers) + + nl.encode() + ) bdata += nl.encode() payload = msg.get_payload(decode=True) for bline in payload.split(b'\n'): @@ -402,8 +481,13 @@ class SendReceiveListener(object): return logger.debug('Received a request for %s messages', len(umsgs)) - diffre = re.compile(rb'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', flags=re.M | re.I) - diffstatre = re.compile(rb'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I) + diffre = re.compile( + rb'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', + flags=re.M | re.I, + ) + diffstatre = re.compile( + rb'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I + ) msgs = list() conn = self._engine.connect() @@ -417,7 +501,9 @@ class SendReceiveListener(object): try: identity, selector, auth_id = self.validate_message(conn, t_auth, bdata) except patatt.NoKeyError: - self.send_error(resp, message='No matching key, please complete web auth first.') + self.send_error( + resp, message='No matching key, please complete web auth first.' + ) return except Exception as ex: self.send_error(resp, message='Signature validation failed: %s' % ex) @@ -427,7 +513,10 @@ class SendReceiveListener(object): if seenid is None: seenid = auth_id elif seenid != auth_id: - self.send_error(resp, message='We only support a single signing identity across patch series.') + self.send_error( + resp, + message='We only support a single signing identity across patch series.', + ) return msg = email.message_from_bytes(bdata, policy=emlpolicy) @@ -454,8 +543,15 @@ class SendReceiveListener(object): return # Make sure that From, Date, Subject, and Message-Id headers exist - if not msg.get('From') or not msg.get('Date') or not msg.get('Subject') or not msg.get('Message-Id'): - self.send_error(resp, message='Message is missing some required headers.') + if ( + not msg.get('From') + or not msg.get('Date') + or not msg.get('Subject') + or not msg.get('Message-Id') + ): + self.send_error( + resp, message='Message is missing some required headers.' + ) return # Make sure that From: matches the validated identity. We allow + expansion, @@ -463,19 +559,26 @@ class SendReceiveListener(object): allfroms = utils.getaddresses([str(x) for x in msg.get_all('from')]) # Allow only a single From: address if len(allfroms) > 1: - self.send_error(resp, message='Message may only contain a single From: address.') + self.send_error( + resp, message='Message may only contain a single From: address.' + ) return fromaddr = allfroms[0][1] if validfrom != fromaddr: ldparts = fromaddr.split('@') if len(ldparts) != 2: - self.send_error(resp, message=f'Invalid address in From: {fromaddr}') + self.send_error( + resp, message=f'Invalid address in From: {fromaddr}' + ) return lparts = ldparts[0].split('+', maxsplit=1) toval = f'{lparts[0]}@{ldparts[1]}' if toval != identity: - self.send_error(resp, message=f'From header invalid for identity {identity}: {fromaddr}') + self.send_error( + resp, + message=f'From header invalid for identity {identity}: {fromaddr}', + ) return # usually, all From: addresses will be the same, so use validfrom as a quick bypass if validfrom is None: @@ -492,9 +595,15 @@ class SendReceiveListener(object): matched = True break if not matched: - self.send_error(resp, message='Destinations must include a mailing list we recognize.') + self.send_error( + resp, + message='Destinations must include a mailing list we recognize.', + ) return - msg.add_header('X-Endpoint-Received', f'by {servicename} for {identity}/{selector} with auth_id={auth_id}') + msg.add_header( + 'X-Endpoint-Received', + f'by {servicename} for {identity}/{selector} with auth_id={auth_id}', + ) msgs.append((msg, destaddrs)) conn.close() @@ -512,7 +621,11 @@ class SendReceiveListener(object): bccaddrs.update([x[1] for x in utils.getaddresses([_bcc])]) repo = listid = None - if 'public-inbox' in self._config and self._config['public-inbox'].get('repo') and not reflect: + if ( + 'public-inbox' in self._config + and self._config['public-inbox'].get('repo') + and not reflect + ): repo = self._config['public-inbox'].get('repo') listid = self._config['public-inbox'].get('listid') if not os.path.isdir(repo): @@ -549,7 +662,9 @@ class SendReceiveListener(object): logger.debug('%s matches mydomain, no substitution required', origaddr) fromaddr = origaddr else: - logger.debug('%s does not match mydomain, substitution required', origaddr) + logger.debug( + '%s does not match mydomain, substitution required', origaddr + ) # We can't just send this as-is due to DMARC policies. Therefore, we set # Reply-To and X-Original-From. fromaddr = frompair[1] @@ -580,13 +695,21 @@ class SendReceiveListener(object): if cmsg.get('From') is None: newbody = 'From: ' + self.clean_header(origfrom) + '\n' if cmsg.get('Subject'): - newbody += 'Subject: ' + self.clean_header(cmsg.get('Subject')) + '\n' + newbody += ( + 'Subject: ' + + self.clean_header(cmsg.get('Subject')) + + '\n' + ) if cmsg.get('Date'): - newbody += 'Date: ' + self.clean_header(cmsg.get('Date')) + '\n' + newbody += ( + 'Date: ' + self.clean_header(cmsg.get('Date')) + '\n' + ) newbody += '\n' + body.decode() msg.set_payload(newbody, charset='utf-8') # If we have non-ascii content in the new body, force CTE to 8bit - if msg['Content-Transfer-Encoding'] == '7bit' and not all(ord(char) < 128 for char in newbody): + if msg['Content-Transfer-Encoding'] == '7bit' and not all( + ord(char) < 128 for char in newbody + ): msg.set_charset('utf-8') msg.replace_header('Content-Transfer-Encoding', '8bit') @@ -610,8 +733,12 @@ class SendReceiveListener(object): # run it once after writing all messages logger.debug('Running public-inbox repo hook (if present)') ezpi.run_hook(repo) - logger.info('%s %s messages for %s/%s', sentaction, len(msgs), identity, selector) - self.send_success(resp, message=f'{sentaction} {len(msgs)} messages for {identity}/{selector}') + logger.info( + '%s %s messages for %s/%s', sentaction, len(msgs), identity, selector + ) + self.send_success( + resp, message=f'{sentaction} {len(msgs)} messages for {identity}/{selector}' + ) def on_post(self, req, resp): if not req.content_length: @@ -677,6 +804,7 @@ app.add_route(mp, srl) if __name__ == '__main__': from wsgiref.simple_server import make_server + logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() formatter = logging.Formatter('%(message)s') diff --git a/pyproject.toml b/pyproject.toml index 6eb2fbb..0c4f024 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,9 @@ extend-select = [ ] flake8-quotes.inline-quotes = "single" +[tool.ruff.format] +quote-style = "single" + [tool.pyright] typeCheckingMode = "off" diff --git a/src/b4/__init__.py b/src/b4/__init__.py index df5c58c..3c1c127 100644 --- a/src/b4/__init__.py +++ b/src/b4/__init__.py @@ -61,7 +61,9 @@ ConfigDictT = Dict[str, Union[str, List[str], None]] charset.add_charset('utf-8', None) # Policy we use for saving mail locally -emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None, message_factory=EmailMessage) +emlpolicy = email.policy.EmailPolicy( + utf8=True, cte_type='8bit', max_line_length=None, message_factory=EmailMessage +) # Presence of these characters requires quoting of the name in the header # adapted from email._parseaddr @@ -99,7 +101,9 @@ logging.getLogger('liblore').parent = logger HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@') FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)') -DIFF_RE = re.compile(r'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', flags=re.M | re.I) +DIFF_RE = re.compile( + r'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', flags=re.M | re.I +) DIFFSTAT_RE = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I) ATT_PASS_SIMPLE = 'v' @@ -247,19 +251,32 @@ class LoreMailbox: ppatch = self.msgid_map[patch.in_reply_to] found = False while True: - if patch.counter == ppatch.counter and patch.expected == ppatch.expected: - logger.debug('Found a previous matching patch in v%s', ppatch.revision) + if ( + patch.counter == ppatch.counter + and patch.expected == ppatch.expected + ): + logger.debug( + 'Found a previous matching patch in v%s', ppatch.revision + ) found = True break # Do we have another level up? - if ppatch.in_reply_to is None or ppatch.in_reply_to not in self.msgid_map: + if ( + ppatch.in_reply_to is None + or ppatch.in_reply_to not in self.msgid_map + ): break ppatch = self.msgid_map[ppatch.in_reply_to] if not found: sane = False - logger.debug('Patch not a reply to a patch with the same counter/expected (%s/%s != %s/%s)', - patch.counter, patch.expected, ppatch.counter, ppatch.expected) + logger.debug( + 'Patch not a reply to a patch with the same counter/expected (%s/%s != %s/%s)', + patch.counter, + patch.expected, + ppatch.counter, + ppatch.expected, + ) break if not sane: @@ -301,19 +318,22 @@ class LoreMailbox: if not q: return query = ' OR '.join(q) - qmsgs = get_pi_search_results(query, message='Looking for additional code-review trailers on %s') + qmsgs = get_pi_search_results( + query, message='Looking for additional code-review trailers on %s' + ) if not qmsgs: logger.debug('No matching code-review messages') return logger.debug('Retrieved %s matching code-review messages', len(qmsgs)) - patchid_map = map_codereview_trailers(qmsgs, ignore_msgids=set(self.msgid_map.keys())) + patchid_map = map_codereview_trailers( + qmsgs, ignore_msgids=set(self.msgid_map.keys()) + ) for patchid, fmsgs in patchid_map.items(): if patchid not in self.trailer_map: self.trailer_map[patchid] = list() self.trailer_map[patchid] += fmsgs - def get_latest_revision(self) -> Optional[int]: if not len(self.series): return None @@ -323,9 +343,13 @@ class LoreMailbox: revs.sort(key=lambda r: self.series[r].submission_date or 0) return revs[-1] - - def get_series(self, revision: Optional[int] = None, sloppytrailers: bool = False, - reroll: bool = True, codereview_trailers: bool = True) -> Optional['LoreSeries']: + def get_series( + self, + revision: Optional[int] = None, + sloppytrailers: bool = False, + reroll: bool = True, + codereview_trailers: bool = True, + ) -> Optional['LoreSeries']: if revision is None: if not len(self.series): return None @@ -360,7 +384,11 @@ class LoreMailbox: for member in lser.patches: if member is not None and member.in_reply_to is not None: potential = self.get_by_msgid(member.in_reply_to) - if potential is not None and potential.has_diffstat and not potential.has_diff: + if ( + potential is not None + and potential.has_diffstat + and not potential.has_diff + ): # This is *probably* the cover letter lser.patches[0] = potential lser.has_cover = True @@ -371,7 +399,9 @@ class LoreMailbox: # Do we have any follow-ups? for fmsg in self.followups: - logger.debug('Analyzing follow-up: %s (%s)', fmsg.full_subject, fmsg.fromemail) + logger.debug( + 'Analyzing follow-up: %s (%s)', fmsg.full_subject, fmsg.fromemail + ) # If there are no trailers in this one, ignore it if not len(fmsg.trailers): logger.debug(' no trailers found, skipping') @@ -397,7 +427,9 @@ class LoreMailbox: trailers, mismatches = fmsg.get_trailers(sloppy=sloppytrailers) for ltr in mismatches: - lser.trailer_mismatches.add((ltr.name, ltr.value, fmsg.fromname, fmsg.fromemail)) + lser.trailer_mismatches.add( + (ltr.name, ltr.value, fmsg.fromname, fmsg.fromemail) + ) lvl = 1 while True: logger.debug('%sParent: %s', ' ' * lvl, pmsg.full_subject) @@ -437,7 +469,9 @@ class LoreMailbox: for lmsg in lser.patches: if lmsg is None or lmsg.git_patch_id is None: continue - logger.debug(' matching patch_id %s from: %s', lmsg.git_patch_id, lmsg.full_subject) + logger.debug( + ' matching patch_id %s from: %s', lmsg.git_patch_id, lmsg.full_subject + ) if lmsg.git_patch_id in self.trailer_map: for fmsg in self.trailer_map[lmsg.git_patch_id]: logger.debug(' matched: %s', fmsg.msgid) @@ -449,14 +483,18 @@ class LoreMailbox: if fltr in lmsg.followup_trailers: logger.debug(' identical trailer received for this series') continue - logger.debug(' carrying over the trailer to this series (may be duplicate)') + logger.debug( + ' carrying over the trailer to this series (may be duplicate)' + ) logger.debug(' %s', lmsg.full_subject) logger.debug(' + %s', fltr.as_string()) if fltr.lmsg: logger.debug(' via: %s', fltr.lmsg.msgid) lmsg.followup_trailers.append(fltr) for fltr in fmis: - lser.trailer_mismatches.add((fltr.name, fltr.value, fmsg.fromname, fmsg.fromemail)) + lser.trailer_mismatches.add( + (fltr.name, fltr.value, fmsg.fromname, fmsg.fromemail) + ) return lser @@ -497,7 +535,12 @@ class LoreMailbox: logger.debug(' fixed revision to v%s', irt.revision) lmsg.revision = irt.revision # alternatively, see if upthread is patch 1 - elif lmsg.counter > 0 and irt is not None and irt.has_diff and irt.counter == 1: + elif ( + lmsg.counter > 0 + and irt is not None + and irt.has_diff + and irt.counter == 1 + ): logger.debug(' fixed revision to v%s', irt.revision) lmsg.revision = irt.revision @@ -509,14 +552,29 @@ class LoreMailbox: # Attempt to auto-number series from the same author who did not bother # to set v2, v3, etc. in the patch revision - if (lmsg.counter == 1 and lmsg.counters_inferred - and not lmsg.reply and lmsg.lsubject.patch and not lmsg.lsubject.resend): + if ( + lmsg.counter == 1 + and lmsg.counters_inferred + and not lmsg.reply + and lmsg.lsubject.patch + and not lmsg.lsubject.resend + ): omsg = self.series[lmsg.revision].patches[lmsg.counter] - if (omsg is not None and omsg.counters_inferred and lmsg.fromemail == omsg.fromemail - and omsg.date < lmsg.date): + if ( + omsg is not None + and omsg.counters_inferred + and lmsg.fromemail == omsg.fromemail + and omsg.date < lmsg.date + ): lmsg.revision = len(self.series) + 1 - self.series[lmsg.revision] = LoreSeries(lmsg.revision, lmsg.expected) - logger.info('Assuming new revision: v%s (%s)', lmsg.revision, lmsg.full_subject) + self.series[lmsg.revision] = LoreSeries( + lmsg.revision, lmsg.expected + ) + logger.info( + 'Assuming new revision: v%s (%s)', + lmsg.revision, + lmsg.full_subject, + ) logger.debug(' adding as patch') self.series[lmsg.revision].add_patch(lmsg) return @@ -600,7 +658,6 @@ class LoreSeries: return lmsg raise IndexError('No such patch in series') - def add_patch(self, lmsg: 'LoreMessage') -> None: while len(self.patches) < lmsg.expected + 1: self.patches.append(None) @@ -608,7 +665,9 @@ class LoreSeries: omsg = self.patches[lmsg.counter] if omsg is not None: # Okay, strange, is the one in there a reply? - logger.warning('WARNING: duplicate messages found at index %s', lmsg.counter) + logger.warning( + 'WARNING: duplicate messages found at index %s', lmsg.counter + ) logger.warning(' Subject 1: %s', lmsg.subject) logger.warning(' Subject 2: %s', omsg.subject) if omsg.reply or (omsg.counters_inferred and not lmsg.counters_inferred): @@ -628,17 +687,28 @@ class LoreSeries: if lmsg.counter < 2: # Cover letter or first patch if not self.base_commit and '\nbase-commit:' in lmsg.body: - matches = re.search(r'^base-commit: .*?([\da-f]+)', lmsg.body, flags=re.I | re.M) + matches = re.search( + r'^base-commit: .*?([\da-f]+)', lmsg.body, flags=re.I | re.M + ) if matches: self.base_commit = matches.groups()[0] if not self.change_id and '\nchange-id:' in lmsg.body: - matches = re.search(r'^change-id:\s+(\S+)', lmsg.body, flags=re.I | re.M) + matches = re.search( + r'^change-id:\s+(\S+)', lmsg.body, flags=re.I | re.M + ) if matches: self.change_id = matches.groups()[0] if not self.prereq_patch_ids and '\nprerequisite-patch-id:' in lmsg.body: - self.prereq_patch_ids = re.findall(r'^prerequisite-patch-id:\s+(\S+)', lmsg.body, flags=re.I | re.M) - if not self.prereq_base_commit and '\nprerequisite-base-commit:' in lmsg.body: - matches = re.search(r'^prerequisite-base-id:\s+(\S+)', lmsg.body, flags=re.I | re.M) + self.prereq_patch_ids = re.findall( + r'^prerequisite-patch-id:\s+(\S+)', lmsg.body, flags=re.I | re.M + ) + if ( + not self.prereq_base_commit + and '\nprerequisite-base-commit:' in lmsg.body + ): + matches = re.search( + r'^prerequisite-base-id:\s+(\S+)', lmsg.body, flags=re.I | re.M + ) if matches: self.prereq_base_commit = matches.groups()[0] @@ -666,8 +736,9 @@ class LoreSeries: msg['Subject'] = new_subject @staticmethod - def identify_cover_letter(all_msgs: List[EmailMessage], - msgids: List[str]) -> Tuple[Optional[str], List[EmailMessage]]: + def identify_cover_letter( + all_msgs: List[EmailMessage], msgids: List[str] + ) -> Tuple[Optional[str], List[EmailMessage]]: """Identify the cover letter and patch messages among the user-specified msgids. Scans the messages matching the given msgids for one with an explicit @@ -723,8 +794,9 @@ class LoreSeries: LoreSeries.rewrite_subject_counter(msg, i, num_patches) @staticmethod - def rethread_messages(all_msgs: List[EmailMessage], cover_msgid: str, - patch_msgids: Set[str]) -> None: + def rethread_messages( + all_msgs: List[EmailMessage], cover_msgid: str, patch_msgids: Set[str] + ) -> None: """Rewrite threading headers so all top-level patches are children of the cover. The cover letter has its In-Reply-To and References stripped (it @@ -750,8 +822,9 @@ class LoreSeries: msg['References'] = f'<{cover_msgid}>' @staticmethod - def rethread_series(msgids: List[str], - all_msgs: List[EmailMessage]) -> Tuple[str, List[EmailMessage]]: + def rethread_series( + msgids: List[str], all_msgs: List[EmailMessage] + ) -> Tuple[str, List[EmailMessage]]: """Reconstitute a properly threaded series from unthreaded messages. Runs the full rethread pipeline: identify a cover letter (or use @@ -808,7 +881,9 @@ class LoreSeries: return 'undefined' prefix = lmsg.date.strftime('%Y%m%d') - authorline = email.utils.getaddresses([str(x) for x in lmsg.msg.get_all('from', [])])[0] + authorline = email.utils.getaddresses( + [str(x) for x in lmsg.msg.get_all('from', [])] + )[0] if extended: local = authorline[1].split('@')[0] unsafe = '%s_%s_%s' % (prefix, local, lmsg.subject) @@ -832,16 +907,25 @@ class LoreSeries: if self.patches[0] and self.patches[0].followup_trailers: self.add_extra_trailers(self.patches[0].followup_trailers) - def get_am_ready(self, noaddtrailers: bool = False, addmysob: bool = False, - addlink: bool = False, cherrypick: Optional[List[int]] = None, copyccs: bool = False, - allowbadchars: bool = False, showchecks: bool = False) -> List[EmailMessage]: + def get_am_ready( + self, + noaddtrailers: bool = False, + addmysob: bool = False, + addlink: bool = False, + cherrypick: Optional[List[int]] = None, + copyccs: bool = False, + allowbadchars: bool = False, + showchecks: bool = False, + ) -> List[EmailMessage]: usercfg = get_user_config() config = get_main_config() if addmysob: if 'name' not in usercfg or 'email' not in usercfg: - logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email') + logger.critical( + 'WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email' + ) addmysob = False attpolicy = str(config.get('attestation-policy', 'softfail')) @@ -864,7 +948,9 @@ class LoreSeries: attsame = False break - checkmark, trailers, attcrit = lmsg.get_attestation_trailers(attpolicy, maxdays) + checkmark, trailers, attcrit = lmsg.get_attestation_trailers( + attpolicy, maxdays + ) if attref is None: attref = trailers attmark = checkmark @@ -905,7 +991,10 @@ class LoreSeries: # TODO: Progress bar? lmsg.load_pw_ci_status() if not lmsg.pw_ci_status or lmsg.pw_ci_status == 'pending': - logger.debug('No CI on patch %s, skipping the rest of the checks', lmsg.counter) + logger.debug( + 'No CI on patch %s, skipping the rest of the checks', + lmsg.counter, + ) lmsg.pw_ci_status = None self.add_cover_trailers() @@ -917,10 +1006,16 @@ class LoreSeries: if cherrypick is not None: if at not in cherrypick: at += 1 - logger.debug(' skipped: [%s/%s] (not in cherrypick)', at, self.expected) + logger.debug( + ' skipped: [%s/%s] (not in cherrypick)', at, self.expected + ) continue if lmsg is None: - logger.critical('CRITICAL: [%s/%s] is missing, cannot cherrypick', at, self.expected) + logger.critical( + 'CRITICAL: [%s/%s] is missing, cannot cherrypick', + at, + self.expected, + ) raise KeyError('Cherrypick not in series') if lmsg is not None: @@ -935,16 +1030,24 @@ class LoreSeries: llval = lparts[1].strip() % lmsg.msgid linktrailer = LoreTrailer(name=llname, value=llval) else: - logger.critical('linktrailermask does not look like a valid trailer, using defaults') + logger.critical( + 'linktrailermask does not look like a valid trailer, using defaults' + ) if not linktrailer: defmask = LOREADDR + '/r/%s' cfg_llval = config.get('linkmask', defmask) if isinstance(cfg_llval, str) and '%s' in cfg_llval: - linktrailer = LoreTrailer(name='Link', value=cfg_llval % lmsg.msgid) + linktrailer = LoreTrailer( + name='Link', value=cfg_llval % lmsg.msgid + ) else: - logger.critical('linkmask does not look like a valid mask, using defaults') - linktrailer = LoreTrailer(name='Link', value=defmask % lmsg.msgid) + logger.critical( + 'linkmask does not look like a valid mask, using defaults' + ) + linktrailer = LoreTrailer( + name='Link', value=defmask % lmsg.msgid + ) extras.append(linktrailer) @@ -955,7 +1058,9 @@ class LoreSeries: logger.info(' %s', lmsg.get_am_subject()) else: - checkmark, trailers, critical = lmsg.get_attestation_trailers(attpolicy, maxdays) + checkmark, trailers, critical = lmsg.get_attestation_trailers( + attpolicy, maxdays + ) if checkmark: logger.info(' %s %s', checkmark, lmsg.get_am_subject()) else: @@ -966,6 +1071,7 @@ class LoreSeries: if critical: import sys + logger.critical('---') logger.critical('Exiting due to attestation-policy: hardfail') sys.exit(128) @@ -973,11 +1079,20 @@ class LoreSeries: add_trailers = True if noaddtrailers: add_trailers = False - msg = lmsg.get_am_message(add_trailers=add_trailers, extras=extras, copyccs=copyccs, - addmysob=addmysob, allowbadchars=allowbadchars) + msg = lmsg.get_am_message( + add_trailers=add_trailers, + extras=extras, + copyccs=copyccs, + addmysob=addmysob, + allowbadchars=allowbadchars, + ) if local_check_cmds: lmsg.load_local_ci_status(local_check_cmds) - if lmsg.local_ci_status or lmsg.pw_ci_status in {'success', 'fail', 'warning'}: + if lmsg.local_ci_status or lmsg.pw_ci_status in { + 'success', + 'fail', + 'warning', + }: if lmsg.local_ci_status: for flag, status in lmsg.local_ci_status: logger.info(' %s %s', CI_FLAGS_FANCY[flag], status) @@ -985,8 +1100,12 @@ class LoreSeries: pwproj = config.get('pw-project') if lmsg.pw_ci_status in {'fail', 'warning'}: pwlink = f'{pwurl}/project/{pwproj}/patch/{lmsg.msgid}' - logger.info(' %s patchwork: %s: %s', CI_FLAGS_FANCY[lmsg.pw_ci_status], - str(lmsg.pw_ci_status).upper(), pwlink) + logger.info( + ' %s patchwork: %s: %s', + CI_FLAGS_FANCY[lmsg.pw_ci_status], + str(lmsg.pw_ci_status).upper(), + pwlink, + ) msgs.append(msg) else: logger.error(' ERROR: missing [%s/%s]!', at, self.expected) @@ -1002,7 +1121,6 @@ class LoreSeries: return msgs - @property def submission_date(self) -> Optional[datetime.datetime]: # Find the date of the first patch we have @@ -1015,7 +1133,6 @@ class LoreSeries: break return self._submission_date - @property def indexes(self) -> List[Tuple[str, str]]: if self._indexes is not None: @@ -1026,8 +1143,15 @@ class LoreSeries: if lmsg is None or not lmsg.blob_indexes: continue for ofn, obh, nfn, fmod in lmsg.blob_indexes: - logger.debug('%s/%s: ofn=%s, obh=%s, nfn=%s, fmod=%s', - lmsg.counter, lmsg.expected, ofn, obh, nfn, fmod) + logger.debug( + '%s/%s: ofn=%s, obh=%s, nfn=%s, fmod=%s', + lmsg.counter, + lmsg.expected, + ofn, + obh, + nfn, + fmod, + ) if ofn in seenfiles: # if we have seen this file once already, then it's a repeat patch # it's no longer going to match current hash @@ -1039,8 +1163,9 @@ class LoreSeries: self._indexes.append((ofn, obh)) return self._indexes - def check_applies_clean(self, gitdir: Optional[str] = None, - at: Optional[str] = None) -> Tuple[int, List[Tuple[str, str]]]: + def check_applies_clean( + self, gitdir: Optional[str] = None, at: Optional[str] = None + ) -> Tuple[int, List[Tuple[str, str]]]: mismatches = list() if at is None: at = 'HEAD' @@ -1059,7 +1184,12 @@ class LoreSeries: return len(self.indexes), mismatches - def find_base(self, gitdir: Optional[str], branches: Optional[List[str]] = None, maxdays: int = 30) -> Tuple[str, int, int]: + def find_base( + self, + gitdir: Optional[str], + branches: Optional[List[str]] = None, + maxdays: int = 30, + ) -> Tuple[str, int, int]: if self.indexes is None: self.populate_indexes() if self.indexes is None or not len(self.indexes): @@ -1076,7 +1206,13 @@ class LoreSeries: else: where = ['--all'] - gitargs = ['log', '--pretty=oneline', '--until', guntil, '--max-count=1'] + where + gitargs = [ + 'log', + '--pretty=oneline', + '--until', + guntil, + '--max-count=1', + ] + where lines = git_get_command_lines(gitdir, gitargs) if not lines: raise IndexError('No commits found before %s' % guntil) @@ -1090,8 +1226,16 @@ class LoreSeries: best = commit for fn, bi in mismatches: logger.debug('Finding tree matching %s=%s in %s', fn, bi, where) - gitargs = ['log', '--pretty=oneline', '--since', gsince, '--until', guntil, - '--find-object', bi] + where + gitargs = [ + 'log', + '--pretty=oneline', + '--since', + gsince, + '--until', + guntil, + '--find-object', + bi, + ] + where lines = git_get_command_lines(gitdir, gitargs) if not lines: logger.debug('Could not find object %s in the tree', bi) @@ -1127,8 +1271,9 @@ class LoreSeries: raise IndexError('Could not describe commit %s' % best) - def make_fake_am_range(self, gitdir: Optional[str], - at_base: Optional[str] = None) -> Tuple[Optional[str], Optional[str]]: + def make_fake_am_range( + self, gitdir: Optional[str], at_base: Optional[str] = None + ) -> Tuple[Optional[str], Optional[str]]: start_commit = end_commit = None # Use the msgid of the first non-None patch in the series msgid = None @@ -1149,7 +1294,9 @@ class LoreSeries: stalecache = True if start_commit is not None and end_commit is not None: # Make sure they are still there - if git_commit_exists(gitdir, start_commit) and git_commit_exists(gitdir, end_commit): + if git_commit_exists(gitdir, start_commit) and git_commit_exists( + gitdir, end_commit + ): logger.debug('Using previously generated range') return start_commit, end_commit stalecache = True @@ -1175,7 +1322,10 @@ class LoreSeries: seenfiles = set() for lmsg in self.patches[1:]: if lmsg is None: - logger.critical('ERROR: v%s series incomplete; unable to create a fake-am range', self.revision) + logger.critical( + 'ERROR: v%s series incomplete; unable to create a fake-am range', + self.revision, + ) return None, None logger.debug('Looking at %s', lmsg.full_subject) @@ -1203,19 +1353,37 @@ class LoreSeries: try: ohash = git_revparse_obj(ofi) logger.debug(' Found matching blob for: %s', ofn) - gitargs = ['update-index', '--add', '--cacheinfo', f'{fmod},{ohash},{ofn}'] + gitargs = [ + 'update-index', + '--add', + '--cacheinfo', + f'{fmod},{ohash},{ofn}', + ] except RuntimeError: - logger.debug('Could not find matching blob for %s (%s)', ofn, ofi) + logger.debug( + 'Could not find matching blob for %s (%s)', ofn, ofi + ) try: chash = git_revparse_obj(f':{ofn}', topdir) - gitargs = ['update-index', '--add', '--cacheinfo', f'{fmod},{chash},{ofn}'] + gitargs = [ + 'update-index', + '--add', + '--cacheinfo', + f'{fmod},{chash},{ofn}', + ] except RuntimeError: - logger.critical(' ERROR: Could not find anything matching %s', ofn) + logger.critical( + ' ERROR: Could not find anything matching %s', ofn + ) return None, None ecode, out = git_run_command(None, gitargs) if ecode > 0: - logger.critical(' ERROR: Could not run update-index for %s (%s)', ofn, ohash) + logger.critical( + ' ERROR: Could not run update-index for %s (%s)', + ofn, + ohash, + ) return None, None msgs.append(lmsg.get_am_message(add_trailers=False)) @@ -1227,7 +1395,9 @@ class LoreSeries: treeid = out.strip() # At this point we have a worktree with files that should (hopefully) cleanly receive a git am gitargs = ['commit-tree', treeid + '^{tree}', '-F', '-'] - ecode, out = git_run_command(None, gitargs, stdin='Initial fake commit'.encode('utf-8')) + ecode, out = git_run_command( + None, gitargs, stdin='Initial fake commit'.encode('utf-8') + ) if ecode > 0: logger.critical('ERROR: Could not commit-tree') return None, None @@ -1271,11 +1441,25 @@ class LoreTrailer: addr: Optional[Tuple[str, str]] = None lmsg: Optional['LoreMessage'] = None # Small list of recognized utility trailers - _utility: Set[str] = {'fixes', 'link', 'buglink', 'closes', 'obsoleted-by', 'message-id', 'change-id', - 'base-commit', 'based-on'} + _utility: Set[str] = { + 'fixes', + 'link', + 'buglink', + 'closes', + 'obsoleted-by', + 'message-id', + 'change-id', + 'base-commit', + 'based-on', + } - def __init__(self, name: Optional[str] = None, value: Optional[str] = None, extinfo: Optional[str] = None, - msg: Optional[EmailMessage] = None): + def __init__( + self, + name: Optional[str] = None, + value: Optional[str] = None, + extinfo: Optional[str] = None, + msg: Optional[EmailMessage] = None, + ): if name is None or value is None: self.name = 'Signed-off-by' self.type = 'person' @@ -1363,8 +1547,9 @@ class LoreTrailer: if olocal != tlocal: return False - return (abs(odomain.count('.') - tdomain.count('.')) == 1 - and (odomain.endswith(f'.{tdomain}') or tdomain.endswith(f'.{odomain}'))) + return abs(odomain.count('.') - tdomain.count('.')) == 1 and ( + odomain.endswith(f'.{tdomain}') or tdomain.endswith(f'.{odomain}') + ) @staticmethod def _extract_link_msgid(url: str) -> Optional[str]: @@ -1526,8 +1711,9 @@ class LoreMessage: self.references.append(self.in_reply_to) try: - fromdata = email.utils.getaddresses([LoreMessage.clean_header(str(x)) - for x in self.msg.get_all('from', [])])[0] + fromdata = email.utils.getaddresses( + [LoreMessage.clean_header(str(x)) for x in self.msg.get_all('from', [])] + )[0] self.fromname = fromdata[0] self.fromemail = fromdata[1] if not len(self.fromname.strip()): @@ -1572,19 +1758,33 @@ class LoreMessage: trailers, _others = LoreMessage.find_trailers(self.body, followup=True) # We only pay attention to trailers that are sent in reply if trailers and self.references and not self.has_diff and not self.reply: - logger.debug('A follow-up missing a Re: but containing a trailer with no patch diff') + logger.debug( + 'A follow-up missing a Re: but containing a trailer with no patch diff' + ) self.reply = True if self.reply: for trailer in trailers: # These are commonly part of patch/commit metadata - badtrailers = {'from', 'author', 'cc', 'to', 'date', 'subject', - 'subscribe', 'unsubscribe', 'base-commit', 'change-id', - 'message-id'} + badtrailers = { + 'from', + 'author', + 'cc', + 'to', + 'date', + 'subject', + 'subscribe', + 'unsubscribe', + 'base-commit', + 'change-id', + 'message-id', + } if trailer.lname not in badtrailers: trailer.lmsg = self self.trailers.append(trailer) - def get_trailers(self, sloppy: bool = False) -> Tuple[List[LoreTrailer], Set[LoreTrailer]]: + def get_trailers( + self, sloppy: bool = False + ) -> Tuple[List[LoreTrailer], Set[LoreTrailer]]: trailers = list() mismatches = set() @@ -1679,10 +1879,10 @@ class LoreMessage: # Identify all DKIM-Signature headers and try them in reverse order # until we come to a passing one dkhdrs = list() - for header in list(self.msg._headers): # type: ignore[attr-defined] + for header in list(self.msg._headers): # type: ignore[attr-defined] if header[0].lower() == 'dkim-signature': dkhdrs.append(header) - self.msg._headers.remove(header) # type: ignore[attr-defined] + self.msg._headers.remove(header) # type: ignore[attr-defined] dkhdrs.reverse() seenatts = list() @@ -1693,7 +1893,11 @@ class LoreMessage: hval = str(email.header.make_header(email.header.decode_header(hval))) errors = list() hdata = LoreMessage.get_parts_from_header(hval) - logger.debug('Loading DKIM attestation for d=%s, s=%s', hdata.get('d'), hdata.get('s')) + logger.debug( + 'Loading DKIM attestation for d=%s, s=%s', + hdata.get('d'), + hdata.get('s'), + ) identity = hdata.get('i') if not identity: @@ -1712,9 +1916,11 @@ class LoreMessage: if isinstance(sh, str) and 'date' in sh.lower().split(':'): signtime = self.date - self.msg._headers.append((hn, hval)) # type: ignore[attr-defined] + self.msg._headers.append((hn, hval)) # type: ignore[attr-defined] try: - res = dkim.verify(self.msg.as_bytes(policy=emlpolicy), logger=dkimlogger) + res = dkim.verify( + self.msg.as_bytes(policy=emlpolicy), logger=dkimlogger + ) logger.debug('DKIM verify results: %s=%s', identity, res) except Exception as ex: # Usually, this is due to some DNS resolver failure, which we can't @@ -1729,7 +1935,7 @@ class LoreMessage: self._attestors.append(attestor) return - self.msg._headers.pop(-1) # type: ignore[attr-defined] + self.msg._headers.pop(-1) # type: ignore[attr-defined] seenatts.append(attestor) # No exact domain matches, so return everything we have @@ -1756,7 +1962,10 @@ class LoreMessage: if i.get('Subject') != self.subject: ibh.append('Subject: %s' % str(i.get('Subject'))) if i.get('Email') != self.fromemail or i.get('Author') != self.fromname: - ibh.append('From: ' + format_addrs([(str(i.get('Author')), str(i.get('Email')))])) + ibh.append( + 'From: ' + + format_addrs([(str(i.get('Author')), str(i.get('Email')))]) + ) if len(ibh): self.body = '\n'.join(ibh) + '\n\n' + self.body @@ -1771,10 +1980,16 @@ class LoreMessage: sources = config.get('keyringsrc') if not sources: # fallback to patatt's keyring if none is specified for b4 - patatt_config = patatt.get_config_from_git(r'patatt\..*', multivals=['keyringsrc']) + patatt_config = patatt.get_config_from_git( + r'patatt\..*', multivals=['keyringsrc'] + ) sources = patatt_config.get('keyringsrc') if not sources: - sources = ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:'] + sources = [ + 'ref:::.keys', + 'ref:::.local-keys', + 'ref::refs/meta/keyring:', + ] if not isinstance(sources, list): sources = [sources] if pdir not in sources: @@ -1782,8 +1997,9 @@ class LoreMessage: # Push our logger and GPGBIN into patatt patatt.logger = logger - assert isinstance(config['gpgbin'], str), \ + assert isinstance(config['gpgbin'], str), ( 'gpgbin config value is not a string: %s' % str(config['gpgbin']) + ) patatt.GPGBIN = config['gpgbin'] logger.debug('Loading patatt attestations with sources=%s', str(sources)) @@ -1791,7 +2007,9 @@ class LoreMessage: success = False trim_body = False while True: - attestations = patatt.validate_message(self.msg.as_bytes(policy=emlpolicy), sources, trim_body=trim_body) + attestations = patatt.validate_message( + self.msg.as_bytes(policy=emlpolicy), sources, trim_body=trim_body + ) # Do we have any successes? for attestation in attestations: if attestation[0] == patatt.RES_VALID: @@ -1826,18 +2044,22 @@ class LoreMessage: signdt = LoreAttestor.parse_ts(signtime) else: signdt = None - attestor = LoreAttestorPatatt(result, identity, signdt, keysrc, keyalgo, errors) + attestor = LoreAttestorPatatt( + result, identity, signdt, keysrc, keyalgo, errors + ) self._attestors.append(attestor) @staticmethod - def run_local_check(cmdargs: List[str], ident: str, msg: EmailMessage, - nocache: bool = False) -> List[Tuple[str, str]]: + def run_local_check( + cmdargs: List[str], ident: str, msg: EmailMessage, nocache: bool = False + ) -> List[Tuple[str, str]]: cacheid = ' '.join(cmdargs) + ident if not nocache: cachedata = get_cache(cacheid, suffix='checks', as_json=True) if cachedata is not None: - assert isinstance(cachedata, list), \ + assert isinstance(cachedata, list), ( 'Cache data for %s is not a list: %s' % (cacheid, str(cachedata)) + ) return cachedata logger.debug('Checking ident=%s using %s', ident, cmdargs[0]) @@ -1888,13 +2110,16 @@ class LoreMessage: pwurl = str(config.get('pw-url', '')) pwproj = str(config.get('pw-project', '')) if not (pwkey and pwurl and pwproj): - logger.debug('Patchwork support requires pw-key, pw-url and pw-project settings') + logger.debug( + 'Patchwork support requires pw-key, pw-url and pw-project settings' + ) raise LookupError('Error looking up %s in patchwork' % msgid) cachedata = get_cache(pwurl + pwproj + msgid, suffix='lookup', as_json=True) if cachedata is not None: - assert isinstance(cachedata, dict), \ + assert isinstance(cachedata, dict), ( 'Cache data for %s is not a dict: %s' % (msgid, str(cachedata)) + ) return cachedata pses, url = get_patchwork_session(pwkey, pwurl) @@ -1926,7 +2151,7 @@ class LoreMessage: save_cache(pwdata, pwurl + pwproj + msgid, suffix='lookup', is_json=True) return pwdata - def get_patchwork_info(self) -> Optional[Dict[str,str]]: + def get_patchwork_info(self) -> Optional[Dict[str, str]]: if not self.pwhash: return None try: @@ -1945,7 +2170,9 @@ class LoreMessage: logger.debug('ci_state for %s: %s', self.msgid, ci_status) self.pw_ci_status = ci_status - def get_attestation_status(self, attpolicy: str, maxdays: int = 0) -> Tuple[List[Dict[str, Any]], bool, bool]: + def get_attestation_status( + self, attpolicy: str, maxdays: int = 0 + ) -> Tuple[List[Dict[str, Any]], bool, bool]: """Get attestation status for this message. Args: @@ -1968,7 +2195,11 @@ class LoreMessage: critical = False for attestor in self.attestors: - if attestor.passing and maxdays and not attestor.check_time_drift(self.date, maxdays): + if ( + attestor.passing + and maxdays + and not attestor.check_time_drift(self.date, maxdays) + ): logger.debug('The time drift is too much, marking as non-passing') attestor.passing = False @@ -1978,27 +2209,33 @@ class LoreMessage: if attestor.have_key: # This was signed, and we have a key, but it's failing has_failing = True - attestations.append({ - 'status': 'badsig', - 'identity': attestor.trailer, - 'passing': False, - }) + attestations.append( + { + 'status': 'badsig', + 'identity': attestor.trailer, + 'passing': False, + } + ) elif attpolicy in ('softfail', 'hardfail'): has_failing = True - attestations.append({ - 'status': 'nokey', - 'identity': attestor.trailer, - 'passing': False, - }) + attestations.append( + { + 'status': 'nokey', + 'identity': attestor.trailer, + 'passing': False, + } + ) # This is not critical even in hardfail continue elif attpolicy in ('softfail', 'hardfail'): has_failing = True - attestations.append({ - 'status': 'badsig', - 'identity': attestor.trailer, - 'passing': False, - }) + attestations.append( + { + 'status': 'badsig', + 'identity': attestor.trailer, + 'passing': False, + } + ) if attpolicy == 'hardfail': critical = True @@ -2018,9 +2255,12 @@ class LoreMessage: self.fromname = xpair[0] self.fromemail = xpair[1] # Drop the reply-to header if it's exactly the same - for header in list(self.msg._headers): # type: ignore[attr-defined] - if header[0].lower() == 'reply-to' and header[1].find(xpair[1]) > 0: - self.msg._headers.remove(header) # type: ignore[attr-defined] + for header in list(self.msg._headers): # type: ignore[attr-defined] + if ( + header[0].lower() == 'reply-to' + and header[1].find(xpair[1]) > 0 + ): + self.msg._headers.remove(header) # type: ignore[attr-defined] has_passing = True att_info: Dict[str, Any] = { @@ -2037,7 +2277,9 @@ class LoreMessage: overall_passing = not has_failing or has_passing return attestations, overall_passing, critical - def get_attestation_trailers(self, attpolicy: str, maxdays: int = 0) -> Tuple[Optional[str], List[str], bool]: + def get_attestation_trailers( + self, attpolicy: str, maxdays: int = 0 + ) -> Tuple[Optional[str], List[str], bool]: """Get formatted attestation trailers with checkmarks for display. Args: @@ -2050,7 +2292,9 @@ class LoreMessage: - trailers: List of formatted trailer strings with checkmarks - critical: True if hardfail policy triggered """ - attestations, _overall_passing, critical = self.get_attestation_status(attpolicy, maxdays) + attestations, _overall_passing, critical = self.get_attestation_status( + attpolicy, maxdays + ) config = get_main_config() if config['attestation-checkmarks'] == 'fancy': @@ -2069,7 +2313,9 @@ class LoreMessage: if checkmark is None: checkmark = mark if 'mismatch' in att: - trailers.append(f'{mark} Signed: {att["identity"]} (From: {att["mismatch"]})') + trailers.append( + f'{mark} Signed: {att["identity"]} (From: {att["mismatch"]})' + ) else: trailers.append(f'{mark} Signed: {att["identity"]}') else: @@ -2200,9 +2446,10 @@ class LoreMessage: return new_hdrval.strip() @staticmethod - def make_reply_addrs(to_addrs: List[Tuple[str, str]], - cc_addrs: List[Tuple[str, str]], - ) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]: + def make_reply_addrs( + to_addrs: List[Tuple[str, str]], + cc_addrs: List[Tuple[str, str]], + ) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]: """Deduplicate To and Cc address lists for a reply. Removes duplicates within To, then removes any Cc entries that @@ -2224,8 +2471,9 @@ class LoreMessage: deduped_cc.append((name, addr)) return deduped_to, deduped_cc - def make_reply(self, body: str, - mailfrom: Optional[Tuple[str, str]] = None) -> EmailMessage: + def make_reply( + self, body: str, mailfrom: Optional[Tuple[str, str]] = None + ) -> EmailMessage: """Build a reply EmailMessage addressing this message. Handles Reply-To → To promotion, folds the original To into Cc, @@ -2242,7 +2490,9 @@ class LoreMessage: subject = f'Re: {subject}' try: - reply_to = email.utils.getaddresses([str(x) for x in self.msg.get_all('reply-to', [])]) + reply_to = email.utils.getaddresses( + [str(x) for x in self.msg.get_all('reply-to', [])] + ) except Exception: reply_to = [] if reply_to: @@ -2251,15 +2501,21 @@ class LoreMessage: to_addrs = [(self.fromname, self.fromemail)] try: - orig_to = email.utils.getaddresses([str(x) for x in self.msg.get_all('to', [])]) + orig_to = email.utils.getaddresses( + [str(x) for x in self.msg.get_all('to', [])] + ) except Exception: orig_to = [] try: - orig_cc = email.utils.getaddresses([str(x) for x in self.msg.get_all('cc', [])]) + orig_cc = email.utils.getaddresses( + [str(x) for x in self.msg.get_all('cc', [])] + ) except Exception: orig_cc = [] - deduped_to, deduped_cc = LoreMessage.make_reply_addrs(to_addrs, orig_to + orig_cc) + deduped_to, deduped_cc = LoreMessage.make_reply_addrs( + to_addrs, orig_to + orig_cc + ) msg = EmailMessage() msg.set_payload(body, charset='utf-8') @@ -2276,15 +2532,24 @@ class LoreMessage: return msg @staticmethod - def wrap_header(hdr: Tuple[str, str], width: int = 75, nl: str = '\n', - transform: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes: + def wrap_header( + hdr: Tuple[str, str], + width: int = 75, + nl: str = '\n', + transform: Literal['encode', 'decode', 'preserve'] = 'preserve', + ) -> bytes: hname, hval = hdr if hname.lower() in ('to', 'cc', 'from', 'x-original-from'): _parts = [f'{hname}: '] first = True for addr in email.utils.getaddresses([hval]): if transform == 'encode' and not addr[0].isascii(): - addr = (email.quoprimime.header_encode(addr[0].encode(), charset='utf-8'), addr[1]) + addr = ( + email.quoprimime.header_encode( + addr[0].encode(), charset='utf-8' + ), + addr[1], + ) qp = format_addrs([addr], clean=False) elif transform == 'decode': qp = format_addrs([addr], clean=True) @@ -2311,11 +2576,18 @@ class LoreMessage: # Use simple textwrap, with a small trick that ensures that long non-breakable # strings don't show up on the next line from the bare header hdata = hdata.replace(': ', ':_', 1) - wrapped = textwrap.wrap(hdata, break_long_words=False, break_on_hyphens=False, - subsequent_indent=' ', width=width) + wrapped = textwrap.wrap( + hdata, + break_long_words=False, + break_on_hyphens=False, + subsequent_indent=' ', + width=width, + ) return nl.join(wrapped).replace(':_', ': ', 1).encode() - qp = f'{hname}: ' + email.quoprimime.header_encode(hval.encode(), charset='utf-8') + qp = f'{hname}: ' + email.quoprimime.header_encode( + hval.encode(), charset='utf-8' + ) # is it longer than width? if len(qp) <= width: return qp.encode() @@ -2327,19 +2599,25 @@ class LoreMessage: # Also allow for the ' ' at the front on continuation lines wrapat -= 1 # Make sure we don't break on a =XX escape sequence - while '=' in qp[wrapat - 2:wrapat]: + while '=' in qp[wrapat - 2 : wrapat]: wrapat -= 1 _parts.append(qp[:wrapat] + '?=') - qp = ('=?utf-8?q?' + qp[wrapat:]) + qp = '=?utf-8?q?' + qp[wrapat:] _parts.append(qp) return f'{nl} '.join(_parts).encode() @staticmethod - def get_msg_as_bytes(msg: EmailMessage, nl: str = '\n', - headers: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes: + def get_msg_as_bytes( + msg: EmailMessage, + nl: str = '\n', + headers: Literal['encode', 'decode', 'preserve'] = 'preserve', + ) -> bytes: bdata = b'' for hname, hval in msg.items(): - bdata += LoreMessage.wrap_header((hname, str(hval)), nl=nl, transform=headers) + nl.encode() + bdata += ( + LoreMessage.wrap_header((hname, str(hval)), nl=nl, transform=headers) + + nl.encode() + ) bdata += nl.encode() payload = msg.get_payload(decode=True) if not isinstance(payload, bytes): @@ -2371,7 +2649,9 @@ class LoreMessage: listidpref = config['listid-preference'] if not isinstance(listidpref, list): listidpref = [str(listidpref)] - return liblore.utils.get_preferred_duplicate(msg1, msg2, listid_preference=listidpref) + return liblore.utils.get_preferred_duplicate( + msg1, msg2, listid_preference=listidpref + ) @staticmethod def get_patch_id(diff: str) -> Optional[str]: @@ -2455,17 +2735,28 @@ class LoreMessage: return indexes @staticmethod - def find_trailers(body: str, followup: bool = False) -> Tuple[List[LoreTrailer], List[str]]: + def find_trailers( + body: str, followup: bool = False + ) -> Tuple[List[LoreTrailer], List[str]]: ignores = {'phone', 'mail', 'email', 'e-mail', 'prerequisite-message-id'} headers = {'subject', 'date', 'from'} links = {'link', 'buglink', 'closes'} - nonperson = links | {'fixes', 'subject', 'date', 'obsoleted-by', 'change-id', 'base-commit'} + nonperson = links | { + 'fixes', + 'subject', + 'date', + 'obsoleted-by', + 'change-id', + 'base-commit', + } # Ignore everything below standard email signature marker body = body.split('\n-- \n', 1)[0].strip() + '\n' # Fix some more common copypasta trailer wrapping # Fixes: abcd0123 (foo bar # baz quux) - body = re.sub(r'^(\S+:\s+[\da-f]+\s+\([^)]+)\n([^\n]+\))', r'\1 \2', body, flags=re.M) + body = re.sub( + r'^(\S+:\s+[\da-f]+\s+\([^)]+)\n([^\n]+\))', r'\1 \2', body, flags=re.M + ) # Signed-off-by: Long Name # body = re.sub(r'^(\S+:\s+[^<]+)\n(<[^>]+>)$', r'\1 \2', body, flags=re.M) @@ -2490,19 +2781,31 @@ class LoreMessage: logger.debug('Ignoring %d: %s (known non-trailer)', at, line) continue if len(others) and lname in headers: - logger.debug('Ignoring %d: %s (header after other content)', at, line) + logger.debug( + 'Ignoring %d: %s (header after other content)', at, line + ) continue if followup: if not lname.isascii(): - logger.debug('Ignoring %d: %s (known non-ascii follow-up trailer)', at, lname) + logger.debug( + 'Ignoring %d: %s (known non-ascii follow-up trailer)', + at, + lname, + ) continue mperson = re.search(r'\S+@\S+\.\S+', ovalue) if not mperson and lname not in nonperson: - logger.debug('Ignoring %d: %s (not a recognized non-person trailer)', at, line) + logger.debug( + 'Ignoring %d: %s (not a recognized non-person trailer)', + at, + line, + ) continue mlink = re.search(r'https?://', ovalue) if mlink and lname not in links: - logger.debug('Ignoring %d: %s (not a recognized link trailer)', at, line) + logger.debug( + 'Ignoring %d: %s (not a recognized link trailer)', at, line + ) continue extinfo = None @@ -2531,8 +2834,13 @@ class LoreMessage: return trailers, others @staticmethod - def rebuild_message(headers: List[LoreTrailer], message: str, trailers: List[LoreTrailer], - basement: str, signature: str) -> str: + def rebuild_message( + headers: List[LoreTrailer], + message: str, + trailers: List[LoreTrailer], + basement: str, + signature: str, + ) -> str: body = '' if headers: for ltr in headers: @@ -2551,8 +2859,10 @@ class LoreMessage: if len(basement): if not len(trailers): body += '\n' - if (DIFFSTAT_RE.search(basement) - or not (basement.strip().startswith('diff --git') or basement.lstrip().startswith('--- '))): + if DIFFSTAT_RE.search(basement) or not ( + basement.strip().startswith('diff --git') + or basement.lstrip().startswith('--- ') + ): body += '---\n' else: # We don't need to add a --- @@ -2566,7 +2876,9 @@ class LoreMessage: return body @staticmethod - def get_body_parts(body: str) -> Tuple[List[LoreTrailer], str, List[LoreTrailer], str, str]: + def get_body_parts( + body: str, + ) -> Tuple[List[LoreTrailer], str, List[LoreTrailer], str, str]: # remove any starting/trailing blank lines body = body.replace('\r', '') body = body.strip('\n') @@ -2640,14 +2952,20 @@ class LoreMessage: return githeaders, message, trailers, basement, signature - def fix_trailers(self, extras: Optional[List[LoreTrailer]] = None, - copyccs: bool = False, addmysob: bool = False, - fallback_order: str = '*', - omit_trailers: Optional[List[str]] = None) -> None: + def fix_trailers( + self, + extras: Optional[List[LoreTrailer]] = None, + copyccs: bool = False, + addmysob: bool = False, + fallback_order: str = '*', + omit_trailers: Optional[List[str]] = None, + ) -> None: config = get_main_config() - bheaders, message, btrailers, basement, signature = LoreMessage.get_body_parts(self.body) + bheaders, message, btrailers, basement, signature = LoreMessage.get_body_parts( + self.body + ) sobtr = LoreTrailer() hasmysob = False @@ -2666,10 +2984,20 @@ class LoreMessage: addmysob = True if copyccs: - alldests = email.utils.getaddresses([str(x) for x in self.msg.get_all('to', [])]) - alldests += email.utils.getaddresses([str(x) for x in self.msg.get_all('cc', [])]) + alldests = email.utils.getaddresses( + [str(x) for x in self.msg.get_all('to', [])] + ) + alldests += email.utils.getaddresses( + [str(x) for x in self.msg.get_all('cc', [])] + ) # Sort by domain name, then local - alldests.sort(key=lambda x: x[1].find('@') > 0 and x[1].split('@')[1] + x[1].split('@')[0] or x[1]) + alldests.sort( + key=lambda x: ( + x[1].find('@') > 0 + and x[1].split('@')[1] + x[1].split('@')[0] + or x[1] + ) + ) for pair in alldests: found = False for fltr in btrailers + new_trailers: @@ -2687,7 +3015,9 @@ class LoreMessage: torder = config.get('trailer-order', fallback_order) if not isinstance(torder, str): - logger.critical('b4.trailer-order must be a string, falling back to default') + logger.critical( + 'b4.trailer-order must be a string, falling back to default' + ) torder = fallback_order if torder and torder != '*': @@ -2727,7 +3057,9 @@ class LoreMessage: if ltr in fixtrailers or ltr in ignored: continue - if (ltr.addr and ltr.addr[1].lower() in ignores) or (ltr.lmsg and ltr.lmsg.fromemail.lower() in ignores): + if (ltr.addr and ltr.addr[1].lower() in ignores) or ( + ltr.lmsg and ltr.lmsg.fromemail.lower() in ignores + ): logger.info(' x %s', ltr.as_string(omit_extinfo=True)) ignored.add(ltr) continue @@ -2742,8 +3074,11 @@ class LoreMessage: extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer) if attpolicy == 'hardfail': import sys + logger.critical('---') - logger.critical('Exiting due to attestation-policy: hardfail') + logger.critical( + 'Exiting due to attestation-policy: hardfail' + ) sys.exit(1) logger.info(' + %s%s', ltr.as_string(omit_extinfo=True), extra) @@ -2784,9 +3119,13 @@ class LoreMessage: if bparts: self.message += '---\n' + '---\n'.join(bparts) - self.body = LoreMessage.rebuild_message(bheaders, message, fixtrailers, basement, signature) + self.body = LoreMessage.rebuild_message( + bheaders, message, fixtrailers, basement, signature + ) - def get_am_subject(self, indicate_reroll: bool = True, use_subject: Optional[str] = None) -> str: + def get_am_subject( + self, indicate_reroll: bool = True, use_subject: Optional[str] = None + ) -> str: # Return a clean patch subject parts = ['PATCH'] if self.lsubject.rfc: @@ -2794,9 +3133,14 @@ class LoreMessage: if self.reroll_from_revision: if indicate_reroll: if self.reroll_from_revision != self.revision: - parts.append('v%d->v%d' % (self.reroll_from_revision, self.revision)) + parts.append( + 'v%d->v%d' % (self.reroll_from_revision, self.revision) + ) else: - parts.append(' %s v%d' % (' ' * len(str(self.reroll_from_revision)), self.revision)) + parts.append( + ' %s v%d' + % (' ' * len(str(self.reroll_from_revision)), self.revision) + ) else: parts.append('v%d' % self.revision) elif not self.revision_inferred: @@ -2809,14 +3153,25 @@ class LoreMessage: return '[%s] %s' % (' '.join(parts), use_subject) - def get_am_message(self, add_trailers: bool = True, addmysob: bool = False, - extras: Optional[List['LoreTrailer']] = None, copyccs: bool = False, - allowbadchars: bool = False) -> EmailMessage: + def get_am_message( + self, + add_trailers: bool = True, + addmysob: bool = False, + extras: Optional[List['LoreTrailer']] = None, + copyccs: bool = False, + allowbadchars: bool = False, + ) -> EmailMessage: # Look through the body to make sure there aren't any suspicious unicode control flow chars # First, encode into ascii and compare for a quick utf8 presence test - if not allowbadchars and self.body.encode('ascii', errors='replace') != self.body.encode(): + if ( + not allowbadchars + and self.body.encode('ascii', errors='replace') != self.body.encode() + ): import unicodedata - logger.debug('Body contains non-ascii characters. Running Unicode Cf char tests.') + + logger.debug( + 'Body contains non-ascii characters. Running Unicode Cf char tests.' + ) for line in self.body.split('\n'): # Does this line have any unicode? if line.encode() == line.encode('ascii', errors='replace'): @@ -2829,12 +3184,20 @@ class LoreMessage: for at, c in enumerate(line.rstrip('\r')): if unicodedata.category(c) == 'Cf': logger.critical('---') - logger.critical('WARNING: Message contains suspicious unicode control characters!') + logger.critical( + 'WARNING: Message contains suspicious unicode control characters!' + ) logger.critical(' Subject: %s', self.full_subject) logger.critical(' Line: %s', line.rstrip('\r')) logger.critical(' ------%s^', '-' * at) - logger.critical(' Char: %s (%s)', unicodedata.name(c), hex(ord(c))) - logger.critical(' If you are sure about this, rerun with the right flag to allow.') + logger.critical( + ' Char: %s (%s)', + unicodedata.name(c), + hex(ord(c)), + ) + logger.critical( + ' If you are sure about this, rerun with the right flag to allow.' + ) sys.exit(1) # Remove anything cut off by scissors @@ -2854,7 +3217,10 @@ class LoreMessage: am_msg = EmailMessage() hfrom = format_addrs([(str(i.get('Author', '')), str(i.get('Email')))]) - am_msg.add_header('Subject', self.get_am_subject(indicate_reroll=False, use_subject=i.get('Subject'))) + am_msg.add_header( + 'Subject', + self.get_am_subject(indicate_reroll=False, use_subject=i.get('Subject')), + ) am_msg.add_header('From', hfrom) am_msg.add_header('Date', str(i.get('Date'))) am_msg.add_header('Message-Id', f'<{self.msgid}>') @@ -2893,7 +3259,9 @@ class LoreSubject: self.full_subject = subject # Is it a reply? - if re.search(r'^(Re|Aw|Fwd):', subject, re.I) or re.search(r'^\w{2,3}:\s*\[', subject): + if re.search(r'^(Re|Aw|Fwd):', subject, re.I) or re.search( + r'^\w{2,3}:\s*\[', subject + ): subject = re.sub(r'^\w{2,3}:\s*\[', '[', subject) self.reply = True @@ -2956,8 +3324,9 @@ class LoreSubject: return ret - def get_rebuilt_subject(self, eprefixes: Optional[List[str]] = None, - presubject: Optional[str] = None) -> str: + def get_rebuilt_subject( + self, eprefixes: Optional[List[str]] = None, presubject: Optional[str] = None + ) -> str: exclude = None if eprefixes and 'PATCH' in eprefixes: @@ -2972,9 +3341,12 @@ class LoreSubject: if self.revision > 1: _pfx.append(f'v{self.revision}') if self.expected > 1: - _pfx.append('%s/%s' % (str(self.counter).zfill(len(str(self.expected))), self.expected)) + _pfx.append( + '%s/%s' + % (str(self.counter).zfill(len(str(self.expected))), self.expected) + ) - subject = "" + subject = '' if len(_pfx): subject = '[' + ' '.join(_pfx) + '] ' + self.subject else: @@ -3077,15 +3449,27 @@ class LoreAttestor: if self.level == 'domain': if emlfrom.lower().endswith('@' + self.identity.lower()): - logger.debug('PASS : sig domain %s matches from identity %s', self.identity, emlfrom) + logger.debug( + 'PASS : sig domain %s matches from identity %s', + self.identity, + emlfrom, + ) return True - self.errors.append('signing domain %s does not match From: %s' % (self.identity, emlfrom)) + self.errors.append( + 'signing domain %s does not match From: %s' % (self.identity, emlfrom) + ) return False if emlfrom.lower() == self.identity.lower(): - logger.debug('PASS : sig identity %s matches from identity %s', self.identity, emlfrom) + logger.debug( + 'PASS : sig identity %s matches from identity %s', + self.identity, + emlfrom, + ) return True - self.errors.append('signing identity %s does not match From: %s' % (self.identity, emlfrom)) + self.errors.append( + 'signing identity %s does not match From: %s' % (self.identity, emlfrom) + ) return False @staticmethod @@ -3113,7 +3497,13 @@ class LoreAttestor: class LoreAttestorDKIM(LoreAttestor): - def __init__(self, passing: bool, identity: str, signtime: Optional[datetime.datetime], errors: List[str]) -> None: + def __init__( + self, + passing: bool, + identity: str, + signtime: Optional[datetime.datetime], + errors: List[str], + ) -> None: super().__init__() self.mode = 'DKIM' self.level = 'domain' @@ -3128,12 +3518,15 @@ class LoreAttestorDKIM(LoreAttestor): class LoreAttestorPatatt(LoreAttestor): - def __init__(self, result: int, - identity: Optional[str], - signtime: Optional[datetime.datetime], - keysrc: Optional[str], - keyalgo: Optional[str], - errors: List[str]) -> None: + def __init__( + self, + result: int, + identity: Optional[str], + signtime: Optional[datetime.datetime], + keysrc: Optional[str], + keyalgo: Optional[str], + errors: List[str], + ) -> None: super().__init__() self.mode = 'patatt' self.level = 'person' @@ -3149,8 +3542,9 @@ class LoreAttestorPatatt(LoreAttestor): self.have_key = True -def _run_command(cmdargs: List[str], stdin: Optional[bytes] = None, - rundir: Optional[str] = None) -> Tuple[int, bytes, bytes]: +def _run_command( + cmdargs: List[str], stdin: Optional[bytes] = None, rundir: Optional[str] = None +) -> Tuple[int, bytes, bytes]: if rundir: logger.debug('Changing dir to %s', rundir) curdir = os.getcwd() @@ -3159,7 +3553,9 @@ def _run_command(cmdargs: List[str], stdin: Optional[bytes] = None, curdir = None logger.debug('Running %s', ' '.join(cmdargs)) - sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + sp = subprocess.Popen( + cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE + ) (output, error) = sp.communicate(input=stdin) if curdir: logger.debug('Changing back into %s', curdir) @@ -3168,7 +3564,9 @@ def _run_command(cmdargs: List[str], stdin: Optional[bytes] = None, return sp.returncode, output, error -def gpg_run_command(args: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]: +def gpg_run_command( + args: List[str], stdin: Optional[bytes] = None +) -> Tuple[int, bytes, bytes]: config = get_main_config() gpgbin = config.get('gpgbin', 'gpg') if not isinstance(gpgbin, str): @@ -3183,22 +3581,38 @@ def gpg_run_command(args: List[str], stdin: Optional[bytes] = None) -> Tuple[int @overload -def git_run_command(gitdir: Optional[Union[str, Path]], args: List[str], stdin: Optional[bytes] = ..., - *, logstderr: bool = ..., decode: Literal[False], - rundir: Optional[str] = ...) -> Tuple[int, bytes]: - ... +def git_run_command( + gitdir: Optional[Union[str, Path]], + args: List[str], + stdin: Optional[bytes] = ..., + *, + logstderr: bool = ..., + decode: Literal[False], + rundir: Optional[str] = ..., +) -> Tuple[int, bytes]: ... @overload -def git_run_command(gitdir: Optional[Union[str, Path]], args: List[str], stdin: Optional[bytes] = ..., - *, logstderr: bool = ..., decode: Literal[True] = ..., - rundir: Optional[str] = ...) -> Tuple[int, str]: - ... - - -def git_run_command(gitdir: Optional[Union[str, Path]], args: List[str], stdin: Optional[bytes] = None, - *, logstderr: bool = False, decode: bool = True, - rundir: Optional[str] = None) -> Tuple[int, Union[str, bytes]]: +def git_run_command( + gitdir: Optional[Union[str, Path]], + args: List[str], + stdin: Optional[bytes] = ..., + *, + logstderr: bool = ..., + decode: Literal[True] = ..., + rundir: Optional[str] = ..., +) -> Tuple[int, str]: ... + + +def git_run_command( + gitdir: Optional[Union[str, Path]], + args: List[str], + stdin: Optional[bytes] = None, + *, + logstderr: bool = False, + decode: bool = True, + rundir: Optional[str] = None, +) -> Tuple[int, Union[str, bytes]]: cmdargs = ['git', '--no-pager'] if gitdir: if os.path.exists(os.path.join(gitdir, '.git')): @@ -3228,13 +3642,16 @@ def git_run_command(gitdir: Optional[Union[str, Path]], args: List[str], stdin: def git_check_minimal_version(min_version: str) -> bool: - _ecode, out = git_run_command(None, ["version"]) - current_version = re.sub(r"git version (\d+\.\d+)\..*", r"\1", out) - return tuple(map(int, current_version.split(".")[:2])) >= tuple(map(int, min_version.split(".")[:2])) - + _ecode, out = git_run_command(None, ['version']) + current_version = re.sub(r'git version (\d+\.\d+)\..*', r'\1', out) + return tuple(map(int, current_version.split('.')[:2])) >= tuple( + map(int, min_version.split('.')[:2]) + ) -def git_credential_fill(gitdir: Optional[str], protocol: str, host: str, username: str) -> Optional[str]: +def git_credential_fill( + gitdir: Optional[str], protocol: str, host: str, username: str +) -> Optional[str]: stdin = f'protocol={protocol}\nhost={host}\nusername={username}\n'.encode() ecode, out = git_run_command(gitdir, args=['credential', 'fill'], stdin=stdin) if ecode == 0: @@ -3258,7 +3675,9 @@ def git_get_command_lines(gitdir: Optional[str], args: List[str]) -> List[str]: return lines -def git_get_repo_status(gitdir: Optional[str] = None, untracked: bool = False) -> List[str]: +def git_get_repo_status( + gitdir: Optional[str] = None, untracked: bool = False +) -> List[str]: args = ['status', '--porcelain=v1'] if not untracked: args.append('--untracked-files=no') @@ -3266,7 +3685,9 @@ def git_get_repo_status(gitdir: Optional[str] = None, untracked: bool = False) - @contextmanager -def git_temp_worktree(gitdir: Optional[str] = None, commitish: Optional[str] = None) -> Generator[str, None, None]: +def git_temp_worktree( + gitdir: Optional[str] = None, commitish: Optional[str] = None +) -> Generator[str, None, None]: """Context manager that creates a temporary work tree and chdirs into it. The worktree is deleted when the contex manager is closed. Taken from gj_tools.""" dfn = None @@ -3323,7 +3744,9 @@ def setup_config(cmdargs: argparse.Namespace) -> None: _setup_sendemail_config(cmdargs) -def _cmdline_config_override(cmdargs: argparse.Namespace, config: Dict[str, Any], section: str) -> None: +def _cmdline_config_override( + cmdargs: argparse.Namespace, config: Dict[str, Any], section: str +) -> None: """Use cmdline.config to set and override config values for section.""" if not cmdargs.config: return @@ -3331,7 +3754,7 @@ def _cmdline_config_override(cmdargs: argparse.Namespace, config: Dict[str, Any] section += '.' config_override = { - key[len(section):]: val + key[len(section) :]: val for key, val in cmdargs.config.items() if key.startswith(section) } @@ -3339,14 +3762,20 @@ def _cmdline_config_override(cmdargs: argparse.Namespace, config: Dict[str, Any] config.update(config_override) -def git_set_config(fullpath: Optional[str], param: str, value: str, operation: str = '--replace-all') -> int: +def git_set_config( + fullpath: Optional[str], param: str, value: str, operation: str = '--replace-all' +) -> int: args = ['config', operation, param, value] ecode, _out = git_run_command(fullpath, args) return ecode -def get_config_from_git(regexp: str, defaults: Optional[Dict[str, Any]] = None, - multivals: Optional[List[str]] = None, source: Optional[str] = None) -> Dict[str, Any]: +def get_config_from_git( + regexp: str, + defaults: Optional[Dict[str, Any]] = None, + multivals: Optional[List[str]] = None, + source: Optional[str] = None, +) -> Dict[str, Any]: if multivals is None: multivals = list() args = ['config'] @@ -3396,10 +3825,23 @@ def _setup_main_config(cmdargs: Optional[argparse.Namespace] = None) -> None: # some options can be provided via the toplevel .b4-config file, # so load them up and use as defaults topdir = git_get_toplevel() - wtglobs = ['prep-*-check-cmd', 'review-*-check-cmd', 'send-*', '*mask', '*template*', 'trailer*', 'pw-*'] - multivals = ['keyringsrc', 'am-perpatch-check-cmd', 'prep-perpatch-check-cmd', - 'review-perpatch-check-cmd', 'review-series-check-cmd', - 'review-target-branch'] + wtglobs = [ + 'prep-*-check-cmd', + 'review-*-check-cmd', + 'send-*', + '*mask', + '*template*', + 'trailer*', + 'pw-*', + ] + multivals = [ + 'keyringsrc', + 'am-perpatch-check-cmd', + 'prep-perpatch-check-cmd', + 'review-perpatch-check-cmd', + 'review-series-check-cmd', + 'review-target-branch', + ] if topdir: wtcfg = os.path.join(topdir, '.b4-config') if os.access(wtcfg, os.R_OK): @@ -3427,10 +3869,13 @@ def _setup_main_config(cmdargs: Optional[argparse.Namespace] = None) -> None: # If we specify DNS resolvers, configure them now if config['attestation-dns-resolvers'] is not None: try: - resolvers = [x.strip() for x in config['attestation-dns-resolvers'].split(',')] + resolvers = [ + x.strip() for x in config['attestation-dns-resolvers'].split(',') + ] if resolvers: # Don't force this as an automatically discovered dependency import dns.resolver + dns.resolver.default_resolver = dns.resolver.Resolver(configure=False) dns.resolver.default_resolver.nameservers = resolvers except ImportError: @@ -3472,7 +3917,10 @@ def get_cache_dir(appname: str = 'b4') -> str: try: expmin = int(str(config['cache-expire'])) * 60 except ValueError: - logger.critical('ERROR: cache-expire must be an integer (minutes): %s', config['cache-expire']) + logger.critical( + 'ERROR: cache-expire must be an integer (minutes): %s', + config['cache-expire'], + ) expmin = 600 expage = time.time() - expmin # Expire anything else that is older than 30 days @@ -3503,7 +3951,9 @@ def get_cache_file(identifier: str, suffix: Optional[str] = None) -> str: return os.path.join(cachedir, cachefile) -def get_cache(identifier: str, suffix: Optional[str] = None, as_json: bool = False) -> Optional[Any]: +def get_cache( + identifier: str, suffix: Optional[str] = None, as_json: bool = False +) -> Optional[Any]: fullpath = get_cache_file(identifier, suffix=suffix) cachedata = None try: @@ -3530,7 +3980,9 @@ def clear_cache(identifier: str, suffix: Optional[str] = None) -> None: logger.debug('Removed cache %s for %s', fullpath, identifier) -def save_cache(contents: Any, identifier: str, suffix: Optional[str] = None, is_json: bool = False) -> None: +def save_cache( + contents: Any, identifier: str, suffix: Optional[str] = None, is_json: bool = False +) -> None: fullpath = get_cache_file(identifier, suffix=suffix) try: with open(fullpath, 'w') as fh: @@ -3554,7 +4006,7 @@ def _setup_user_config(cmdargs: argparse.Namespace) -> None: USER_CONFIG['name'] = os.environ['GIT_AUTHOR_NAME'] else: udata = pwd.getpwuid(os.getuid()) - USER_CONFIG['name'] = udata.pw_gecos.strip(",") + USER_CONFIG['name'] = udata.pw_gecos.strip(',') if 'email' not in USER_CONFIG: if 'GIT_COMMITTER_EMAIL' in os.environ: USER_CONFIG['email'] = os.environ['GIT_COMMITTER_EMAIL'] @@ -3613,8 +4065,9 @@ def get_lore_node() -> liblore.LoreNode: def get_msgid_from_stdin() -> Optional[str]: if not sys.stdin.isatty(): - message = email.parser.BytesParser(policy=emlpolicy, _class=EmailMessage).parsebytes( - sys.stdin.buffer.read(), headersonly=True) + message = email.parser.BytesParser( + policy=emlpolicy, _class=EmailMessage + ).parsebytes(sys.stdin.buffer.read(), headersonly=True) msgid = message.get('Message-ID', None) if msgid: return str(msgid) @@ -3626,7 +4079,9 @@ def parse_msgid(msgid: str) -> str: msgid = msgid.strip().strip('<>') # Handle the case when someone pastes a full URL to the message # Is this a patchwork URL? - matches = re.search(r'^https?://.*/project/.*/patch/([^/]+@[^/]+)', msgid, re.IGNORECASE) + matches = re.search( + r'^https?://.*/project/.*/patch/([^/]+@[^/]+)', msgid, re.IGNORECASE + ) if matches: logger.debug('Looks like a patchwork URL') chunks = matches.groups() @@ -3671,7 +4126,9 @@ def get_msgid(cmdargs: argparse.Namespace) -> Optional[str]: return parse_msgid(msgid) -def get_strict_thread(msgs: List[EmailMessage], msgid: str, noparent: bool = False) -> Optional[List[EmailMessage]]: +def get_strict_thread( + msgs: List[EmailMessage], msgid: str, noparent: bool = False +) -> Optional[List[EmailMessage]]: # Attempt to automatically recognize the situation when someone posts # a standalone patch or series in the middle of a large discussion for another series. # We recommend dealing with this using --no-parent, but we can also catch this @@ -3711,26 +4168,36 @@ def mailsplit_bytes(bmbox: bytes, pipesep: Optional[str] = None) -> List[EmailMe logger.debug('Mailsplitting using pipesep=%s', pipesep) if '\\' in pipesep: import codecs + pipesep = codecs.decode(pipesep.encode(), 'unicode_escape') msgs: List[EmailMessage] = [] for chunk in bmbox.split(pipesep.encode()): if chunk.strip(): - msgs.append(email.parser.BytesParser(policy=emlpolicy, - _class=EmailMessage).parsebytes(chunk)) + msgs.append( + email.parser.BytesParser( + policy=emlpolicy, _class=EmailMessage + ).parsebytes(chunk) + ) return msgs return liblore.utils.split_mbox(bmbox) -def get_pi_search_results(query: str, nocache: bool = False, message: Optional[str] = None, - full_threads: bool = True) -> Optional[List[EmailMessage]]: +def get_pi_search_results( + query: str, + nocache: bool = False, + message: Optional[str] = None, + full_threads: bool = True, +) -> Optional[List[EmailMessage]]: node = get_lore_node() if message is not None and len(message): logger.info(message, node.hostname) else: logger.info('Grabbing search results from %s', node.hostname) try: - t_mbox = node.get_mbox_by_query(query, full_threads=full_threads, nocache=nocache) + t_mbox = node.get_mbox_by_query( + query, full_threads=full_threads, nocache=nocache + ) except liblore.RemoteError: logger.info('Server returned an error.') return None @@ -3761,7 +4228,9 @@ def get_series_by_msgid(msgid: str, nocache: bool = False) -> Optional['LoreMail return lmbx -def get_series_by_change_id(change_id: str, nocache: bool = False) -> Optional['LoreMailbox']: +def get_series_by_change_id( + change_id: str, nocache: bool = False +) -> Optional['LoreMailbox']: q = f'nq:"change-id:{change_id}"' q_msgs = get_pi_search_results(q, nocache=nocache, full_threads=False) if not q_msgs: @@ -3770,11 +4239,15 @@ def get_series_by_change_id(change_id: str, nocache: bool = False) -> Optional[' for q_msg in q_msgs: body, _bcharset = LoreMessage.get_payload(q_msg) if not re.search(rf'^\s*change-id:\s*{change_id}$', body, flags=re.M | re.I): - logger.debug('No change-id match for %s', q_msg.get('Subject', '(no subject)')) + logger.debug( + 'No change-id match for %s', q_msg.get('Subject', '(no subject)') + ) continue q_msgid = LoreMessage.get_clean_msgid(q_msg) if q_msgid is None: - logger.debug('No message-id found, ignoring %s', q_msg.get('Subject', '(no subject)')) + logger.debug( + 'No message-id found, ignoring %s', q_msg.get('Subject', '(no subject)') + ) continue t_msgs = get_pi_thread_by_msgid(q_msgid, nocache=nocache) if t_msgs: @@ -3784,9 +4257,12 @@ def get_series_by_change_id(change_id: str, nocache: bool = False) -> Optional[' return lmbx -def get_msgs_by_patch_id(patch_id: str, extra_query: Optional[str] = None, - nocache: bool = False, full_threads: bool = False - ) -> Optional[List[EmailMessage]]: +def get_msgs_by_patch_id( + patch_id: str, + extra_query: Optional[str] = None, + nocache: bool = False, + full_threads: bool = False, +) -> Optional[List[EmailMessage]]: q = f'patchid:{patch_id}' if extra_query: q = f'{q} {extra_query}' @@ -3798,7 +4274,9 @@ def get_msgs_by_patch_id(patch_id: str, extra_query: Optional[str] = None, return q_msgs -def get_series_by_patch_id(patch_id: str, nocache: bool = False) -> Optional['LoreMailbox']: +def get_series_by_patch_id( + patch_id: str, nocache: bool = False +) -> Optional['LoreMailbox']: q_msgs = get_msgs_by_patch_id(patch_id, full_threads=True, nocache=nocache) if not q_msgs: return None @@ -3809,10 +4287,13 @@ def get_series_by_patch_id(patch_id: str, nocache: bool = False) -> Optional['Lo return lmbx -def get_pi_thread_by_msgid(msgid: str, nocache: bool = False, - onlymsgids: Optional[Set[str]] = None, - with_thread: bool = True, - quiet: bool = False) -> Optional[List[EmailMessage]]: +def get_pi_thread_by_msgid( + msgid: str, + nocache: bool = False, + onlymsgids: Optional[Set[str]] = None, + with_thread: bool = True, + quiet: bool = False, +) -> Optional[List[EmailMessage]]: if not quiet: logger.info('Looking up %s', msgid) node = get_lore_node() @@ -3847,16 +4328,20 @@ def get_pi_thread_by_msgid(msgid: str, nocache: bool = False, return strict -def git_range_to_patches(gitdir: Optional[str], start: str, end: str, - prefixes: Optional[List[str]] = None, - revision: Optional[int] = 1, - msgid_tpt: Optional[str] = None, - seriests: Optional[int] = None, - mailfrom: Optional[Tuple[str, str]] = None, - extrahdrs: Optional[List[Tuple[str, str]]] = None, - ignore_commits: Optional[Set[str]] = None, - limit_committer: Optional[str] = None, - presubject: Optional[str] = None) -> List[Tuple[str, EmailMessage]]: +def git_range_to_patches( + gitdir: Optional[str], + start: str, + end: str, + prefixes: Optional[List[str]] = None, + revision: Optional[int] = 1, + msgid_tpt: Optional[str] = None, + seriests: Optional[int] = None, + mailfrom: Optional[Tuple[str, str]] = None, + extrahdrs: Optional[List[Tuple[str, str]]] = None, + ignore_commits: Optional[Set[str]] = None, + limit_committer: Optional[str] = None, + presubject: Optional[str] = None, +) -> List[Tuple[str, EmailMessage]]: gitargs = ['rev-list', '--no-merges', '--reverse'] if limit_committer: gitargs += ['-F', f'--committer={limit_committer}'] @@ -3881,20 +4366,23 @@ def git_range_to_patches(gitdir: Optional[str], start: str, end: str, '--find-renames', ] - if git_check_minimal_version("2.40"): - showargs.append("--default-prefix") + if git_check_minimal_version('2.40'): + showargs.append('--default-prefix') smcfg = get_sendemail_config() if not get_git_bool(str(smcfg.get('mailmap', 'false'))): showargs.append('--no-mailmap') logger.debug('showargs=%s', showargs) ecode, out = git_run_command( - gitdir, ['show'] + showargs + [commit], + gitdir, + ['show'] + showargs + [commit], decode=False, ) if ecode > 0: raise RuntimeError(f'Could not get a patch out of {commit}') - msg = email.parser.BytesParser(policy=emlpolicy, _class=EmailMessage).parsebytes(out) + msg = email.parser.BytesParser( + policy=emlpolicy, _class=EmailMessage + ).parsebytes(out) patches.append((commit, msg)) fullcount = len(patches) @@ -3917,8 +4405,9 @@ def git_range_to_patches(gitdir: Optional[str], start: str, end: str, lsubject.expected = expected if revision is not None: lsubject.revision = revision - subject = lsubject.get_rebuilt_subject(eprefixes=prefixes, - presubject=presubject) + subject = lsubject.get_rebuilt_subject( + eprefixes=prefixes, presubject=presubject + ) logger.debug(' %s', subject) msg.replace_header('Subject', subject) @@ -3937,7 +4426,9 @@ def git_range_to_patches(gitdir: Optional[str], start: str, end: str, patchts = seriests + counter + 1 origdate = msg.get('Date') if origdate: - msg.replace_header('Date', email.utils.formatdate(patchts, localtime=True)) + msg.replace_header( + 'Date', email.utils.formatdate(patchts, localtime=True) + ) else: msg.add_header('Date', email.utils.formatdate(patchts, localtime=True)) @@ -3988,7 +4479,9 @@ def git_revparse_tag(gitdir: Optional[str], tagname: str) -> Optional[str]: return out.strip() -def git_branch_contains(gitdir: Optional[str], commit_id: str, checkall: bool = False) -> List[str]: +def git_branch_contains( + gitdir: Optional[str], commit_id: str, checkall: bool = False +) -> List[str]: gitargs = ['branch', '--format=%(refname:short)', '--contains', commit_id] if checkall: gitargs.append('--all') @@ -4031,8 +4524,9 @@ def git_get_common_dir(path: Optional[str] = None) -> Optional[str]: return None -def format_addrs(pairs: List[Tuple[str, str]], clean: bool = True, - header_safe: bool = True) -> str: +def format_addrs( + pairs: List[Tuple[str, str]], clean: bool = True, header_safe: bool = True +) -> str: addrs = list() for pair in pairs: if not pair[0] or pair[0] == pair[1]: @@ -4069,7 +4563,9 @@ def print_pretty_addrs(addrs: List[Tuple[str, str]], hdrname: str) -> None: def make_quote(body: str, maxlines: int = 5) -> str: - _headers, message, _trailers, _basement, _signature = LoreMessage.get_body_parts(body) + _headers, message, _trailers, _basement, _signature = LoreMessage.get_body_parts( + body + ) if not len(message): # Sometimes there is no message, just trailers return '> \n' @@ -4122,7 +4618,9 @@ def parse_int_range(intrange: str, upper: int) -> Iterator[int]: logger.critical('Unknown range value specified: %s', n) -def check_gpg_status(status: str) -> Tuple[bool, bool, bool, Optional[str], Optional[str]]: +def check_gpg_status( + status: str, +) -> Tuple[bool, bool, bool, Optional[str], Optional[str]]: good = False valid = False trusted = False @@ -4139,7 +4637,9 @@ def check_gpg_status(status: str) -> Tuple[bool, bool, bool, Optional[str], Opti if gs_matches: good = True keyid = gs_matches.groups()[0] - vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([\dA-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M) + vs_matches = re.search( + r'^\[GNUPG:] VALIDSIG ([\dA-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M + ) if vs_matches: valid = True signtime = vs_matches.groups()[2] @@ -4181,8 +4681,12 @@ def save_git_am_mbox(msgs: List[EmailMessage], dest: BinaryIO) -> None: dest.write(LoreMessage.get_msg_as_bytes(msg, headers='decode')) -def save_mboxrd_mbox(msgs: List[EmailMessage], dest: BinaryIO, mangle_from: bool = False) -> None: - gen = email.generator.BytesGenerator(dest, mangle_from_=mangle_from, policy=emlpolicy) +def save_mboxrd_mbox( + msgs: List[EmailMessage], dest: BinaryIO, mangle_from: bool = False +) -> None: + gen = email.generator.BytesGenerator( + dest, mangle_from_=mangle_from, policy=emlpolicy + ) for msg in msgs: dest.write(b'From mboxrd@z Thu Jan 1 00:00:00 1970\n') gen.flatten(msg) @@ -4198,13 +4702,20 @@ def save_maildir(msgs: List[EmailMessage], dest: str) -> None: for msg in msgs: # make a slug out of it lsubj = LoreSubject(msg.get('subject', '')) - slug = '%04d_%s' % (lsubj.counter, re.sub(r'\W+', '_', lsubj.subject).strip('_').lower()) + slug = '%04d_%s' % ( + lsubj.counter, + re.sub(r'\W+', '_', lsubj.subject).strip('_').lower(), + ) with open(os.path.join(d_tmp, f'{slug}.eml'), 'wb') as mfh: mfh.write(LoreMessage.get_msg_as_bytes(msg, headers='decode')) - os.rename(os.path.join(d_tmp, f'{slug}.eml'), os.path.join(d_new, f'{slug}.eml')) + os.rename( + os.path.join(d_tmp, f'{slug}.eml'), os.path.join(d_new, f'{slug}.eml') + ) -def get_mailinfo(bmsg: bytes, scissors: bool = False) -> Tuple[Dict[str, str], bytes, bytes]: +def get_mailinfo( + bmsg: bytes, scissors: bool = False +) -> Tuple[Dict[str, str], bytes, bytes]: with tempfile.TemporaryDirectory() as tfd: m_out = os.path.join(tfd, 'm') p_out = os.path.join(tfd, 'p') @@ -4258,10 +4769,16 @@ def _setup_sendemail_config(cmdargs: argparse.Namespace) -> None: identity = config.get('sendemail-identity') or _basecfg.get('identity') if identity: # Use this identity to override what we got from the default one - sconfig = get_config_from_git(rf'sendemail\.{identity}\..*', multivals=['smtpserveroption'], defaults=_basecfg) + sconfig = get_config_from_git( + rf'sendemail\.{identity}\..*', + multivals=['smtpserveroption'], + defaults=_basecfg, + ) sectname = f'sendemail.{identity}' if not len(sconfig): - raise smtplib.SMTPException('Unable to find %s settings in any applicable git config' % sectname) + raise smtplib.SMTPException( + 'Unable to find %s settings in any applicable git config' % sectname + ) else: sconfig = _basecfg sectname = 'sendemail' @@ -4277,7 +4794,9 @@ def get_sendemail_config() -> Dict[str, Optional[Union[str, List[str]]]]: return SENDEMAIL_CONFIG -def get_smtp(dryrun: bool = False) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], str]: +def get_smtp( + dryrun: bool = False, +) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], str]: sconfig = get_sendemail_config() # Limited support for smtp settings to begin with, but should cover the vast majority of cases fromaddr = sconfig.get('from') @@ -4292,7 +4811,9 @@ def get_smtp(dryrun: bool = False) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL try: port = int(str(sconfig.get('smtpserverport', '0'))) except ValueError as exc: - raise smtplib.SMTPException('Invalid smtpport entry in config: %s' % sconfig.get('smtpserverport')) from exc + raise smtplib.SMTPException( + 'Invalid smtpport entry in config: %s' % sconfig.get('smtpserverport') + ) from exc # If server contains slashes, then it's a local command if '/' in server: @@ -4337,7 +4858,9 @@ def get_smtp(dryrun: bool = False) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL # We do TLS from the get-go smtp = smtplib.SMTP_SSL(server, port) else: - raise smtplib.SMTPException('Unclear what to do with smtpencryption=%s' % encryption) + raise smtplib.SMTPException( + 'Unclear what to do with smtpencryption=%s' % encryption + ) # If we got to this point, we should do authentication, # unless smtpauth is set to a special "none" value @@ -4353,11 +4876,15 @@ def get_smtp(dryrun: bool = False) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL gchost = f'{server}:{port}' else: gchost = server - gc_pass = git_credential_fill(None, protocol='smtp', host=gchost, username=auser) + gc_pass = git_credential_fill( + None, protocol='smtp', host=gchost, username=auser + ) if gc_pass: apass = gc_pass if not apass: - raise smtplib.SMTPException('No password specified for connecting to %s', server) + raise smtplib.SMTPException( + 'No password specified for connecting to %s', server + ) if auser and apass: # Let any exceptions bubble up if smtpauth in ('oauth', 'oauth2', 'xoauth2'): @@ -4374,10 +4901,12 @@ def get_smtp(dryrun: bool = False) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL def get_patchwork_session(pwkey: str, pwurl: str) -> Tuple[requests.Session, str]: session = requests.session() - session.headers.update({ - 'User-Agent': 'b4/%s' % __VERSION__, - 'Authorization': f'Token {pwkey}', - }) + session.headers.update( + { + 'User-Agent': 'b4/%s' % __VERSION__, + 'Authorization': f'Token {pwkey}', + } + ) url = '/'.join((pwurl.rstrip('/'), 'api', PW_REST_API_VERSION)) logger.debug('pw url=%s', url) return session, url @@ -4390,7 +4919,9 @@ def patchwork_set_state(msgids: List[str], state: str) -> None: pwurl = str(config.get('pw-url', '')) pwproj = str(config.get('pw-project', '')) if not (pwkey and pwurl and pwproj): - logger.debug('Patchwork support requires pw-key, pw-url and pw-project settings') + logger.debug( + 'Patchwork support requires pw-key, pw-url and pw-project settings' + ) return pses, url = get_patchwork_session(pwkey, pwurl) patches_url = '/'.join((url, 'patches')) @@ -4432,11 +4963,17 @@ def patchwork_set_state(msgids: List[str], state: str) -> None: logger.debug('Patchwork REST error: %s', ex) -def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], msgs: Sequence[EmailMessage], - fromaddr: Optional[str], destaddrs: Optional[Union[Set[str], List[str]]] = None, - patatt_sign: bool = False, dryrun: bool = False, - output_dir: Optional[str] = None, web_endpoint: Optional[str] = None, - reflect: bool = False) -> Optional[int]: +def send_mail( + smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], + msgs: Sequence[EmailMessage], + fromaddr: Optional[str], + destaddrs: Optional[Union[Set[str], List[str]]] = None, + patatt_sign: bool = False, + dryrun: bool = False, + output_dir: Optional[str] = None, + web_endpoint: Optional[str] = None, + reflect: bool = False, +) -> Optional[int]: tosend = list() if output_dir is not None: dryrun = True @@ -4457,16 +4994,21 @@ def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], msgs ls = LoreSubject(subject) if patatt_sign: import patatt + # patatt.logger = logger try: bdata = patatt.rfc2822_sign(bdata) except patatt.NoKeyError as ex: logger.critical('CRITICAL: Error signing: no key configured') - logger.critical(' Run "patatt genkey" or configure "user.signingKey" to use PGP') + logger.critical( + ' Run "patatt genkey" or configure "user.signingKey" to use PGP' + ) logger.critical(' As a last resort, rerun with --no-sign') raise RuntimeError(str(ex)) from ex except patatt.SigningError as ex: - raise RuntimeError('Failure trying to patatt-sign: %s' % str(ex)) from ex + raise RuntimeError( + 'Failure trying to patatt-sign: %s' % str(ex) + ) from ex if dryrun: if output_dir: filen = '%s.eml' % ls.get_slug(sep='-') @@ -4481,7 +5023,9 @@ def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], msgs continue if not destaddrs: alldests = email.utils.getaddresses([str(x) for x in msg.get_all('to', [])]) - alldests += email.utils.getaddresses([str(x) for x in msg.get_all('cc', [])]) + alldests += email.utils.getaddresses( + [str(x) for x in msg.get_all('cc', [])] + ) myaddrs = {x[1] for x in alldests} else: myaddrs = set(destaddrs) @@ -4538,7 +5082,9 @@ def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], msgs cmdargs = list(smtp) + list(destaddrs) ecode, _out, err = _run_command(cmdargs, stdin=bdata) if ecode > 0: - raise RuntimeError('Error running %s: %s' % (' '.join(smtp), err.decode())) + raise RuntimeError( + 'Error running %s: %s' % (' '.join(smtp), err.decode()) + ) sent += 1 elif smtp: @@ -4555,7 +5101,9 @@ def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, List[str], None], msgs return sent -def git_get_current_branch(gitdir: Optional[str] = None, short: bool = True) -> Optional[str]: +def git_get_current_branch( + gitdir: Optional[str] = None, short: bool = True +) -> Optional[str]: gitargs = ['symbolic-ref', '-q', 'HEAD'] ecode, out = git_run_command(gitdir, gitargs) if ecode > 0: @@ -4578,13 +5126,14 @@ def get_excluded_addrs() -> Set[str]: return excludes -def cleanup_email_addrs(addresses: List[Tuple[str, str]], excludes: Set[str], - gitdir: Optional[str]) -> List[Tuple[str, str]]: +def cleanup_email_addrs( + addresses: List[Tuple[str, str]], excludes: Set[str], gitdir: Optional[str] +) -> List[Tuple[str, str]]: global ALIAS_INFO global MAILMAP_INFO # Translate aliases if support is available - if git_check_minimal_version("2.47"): + if git_check_minimal_version('2.47'): logger.debug('Translating aliases via git send-email') unqual_addrs: Set[str] = set() @@ -4598,23 +5147,27 @@ def cleanup_email_addrs(addresses: List[Tuple[str, str]], excludes: Set[str], tocheck = list(unqual_addrs) data = '\n'.join(tocheck).encode('utf-8') args = ['send-email', '--translate-aliases'] - ecode, out = git_run_command(gitdir, - ['send-email', '--translate-aliases'], - stdin=data) + ecode, out = git_run_command( + gitdir, ['send-email', '--translate-aliases'], stdin=data + ) if ecode == 0: translated_addrs = email.utils.getaddresses(out.strip().splitlines()) for alias, entry in zip(tocheck, translated_addrs): if alias != entry[1]: - logger.debug('Translated alias %s to qualified address %s', - alias, entry[1]) + logger.debug( + 'Translated alias %s to qualified address %s', + alias, + entry[1], + ) ALIAS_INFO[alias] = entry else: logger.debug('"%s" is not a known alias', alias) ALIAS_INFO[alias] = None else: - logger.debug('git send-email --translate-aliases failed with exit code %s', - ecode) + logger.debug( + 'git send-email --translate-aliases failed with exit code %s', ecode + ) def _replace_aliases(entry: Tuple[str, str]) -> Tuple[str, str]: if entry[1] in ALIAS_INFO: @@ -4646,7 +5199,9 @@ def cleanup_email_addrs(addresses: List[Tuple[str, str]], excludes: Set[str], replacement = MAILMAP_INFO[entry[1]] # If it's None, we don't want to replace it if replacement is not None: - logger.debug('Replaced %s with mailmap-updated %s', entry[1], replacement[1]) + logger.debug( + 'Replaced %s with mailmap-updated %s', entry[1], replacement[1] + ) addresses.remove(entry) addresses.append(replacement) continue @@ -4707,8 +5262,13 @@ def discover_rethread_series(msgid: str, nocache: bool = False) -> List[str]: seed_msg = seed_msgs[0] seed = LoreMessage(seed_msg) - logger.info('Seed: [%d/%d] %s (from %s)', - seed.counter, seed.expected, seed.subject, seed.fromemail) + logger.info( + 'Seed: [%d/%d] %s (from %s)', + seed.counter, + seed.expected, + seed.subject, + seed.fromemail, + ) # Build a 1-hour date window around the seed (30 min each way) # Convert to UTC since public-inbox dt: expects UTC timestamps @@ -4757,7 +5317,9 @@ def discover_rethread_series(msgid: str, nocache: bool = False) -> List[str]: if seed.counters_inferred: if not found_bare: - logger.warning('Could not find any matching patches, using seed message only') + logger.warning( + 'Could not find any matching patches, using seed message only' + ) return [msgid] logger.info('Discovered %d bare patches from the same author', len(found_bare)) return found_bare @@ -4774,15 +5336,18 @@ def discover_rethread_series(msgid: str, nocache: bool = False) -> List[str]: n_patches = sum(1 for c in found if c > 0) if n_patches < expected: missing = [str(i) for i in range(1, expected + 1) if i not in found] - logger.warning('Found %d/%d patches (missing: %s)', - n_patches, expected, ', '.join(missing)) + logger.warning( + 'Found %d/%d patches (missing: %s)', n_patches, expected, ', '.join(missing) + ) else: logger.info('Discovered %d/%d patches for the series', n_patches, expected) return msgids -def fetch_rethread_messages(msgids: List[str], nocache: bool = False) -> Tuple[List[str], List[EmailMessage]]: +def fetch_rethread_messages( + msgids: List[str], nocache: bool = False +) -> Tuple[List[str], List[EmailMessage]]: """Fetch messages for multiple msgids, deduplicating across threads. Returns (msgids, all_msgs) where msgids is the input list (for @@ -4810,7 +5375,9 @@ def fetch_rethread_messages(msgids: List[str], nocache: bool = False) -> Tuple[L return msgids, all_msgs -def retrieve_rethreaded_messages(cmdargs: argparse.Namespace) -> Tuple[str, List[EmailMessage]]: +def retrieve_rethreaded_messages( + cmdargs: argparse.Namespace, +) -> Tuple[str, List[EmailMessage]]: """Retrieve messages from multiple unthreaded msgids and rethread them into a series.""" raw_ids: List[str] = cmdargs.rethread @@ -4848,7 +5415,9 @@ def retrieve_rethreaded_messages(cmdargs: argparse.Namespace) -> Tuple[str, List return LoreSeries.rethread_series(msgids, all_msgs) -def retrieve_messages(cmdargs: argparse.Namespace) -> Tuple[Optional[str], Optional[List[EmailMessage]]]: +def retrieve_messages( + cmdargs: argparse.Namespace, +) -> Tuple[Optional[str], Optional[List[EmailMessage]]]: # Handle --rethread mode: fetch multiple unrelated messages and stitch them together if getattr(cmdargs, 'rethread', None): if not can_network: @@ -4875,14 +5444,24 @@ def retrieve_messages(cmdargs: argparse.Namespace) -> Tuple[Optional[str], Optio if ('cherrypick' in cmdargs and cmdargs.cherrypick == '_') or not with_thread: # Just that msgid, please pickings.add(msgid) - msgs = get_pi_thread_by_msgid(msgid, nocache=cmdargs.nocache, onlymsgids=pickings, with_thread=with_thread) or [] + msgs = ( + get_pi_thread_by_msgid( + msgid, + nocache=cmdargs.nocache, + onlymsgids=pickings, + with_thread=with_thread, + ) + or [] + ) if not msgs: logger.debug('No messages from the query') return None, msgs else: if cmdargs.localmbox == '-': # The entire mbox is passed via stdin, so mailsplit it and use the first message for our msgid - msgs = mailsplit_bytes(sys.stdin.buffer.read(), pipesep=cmdargs.stdin_pipe_sep) + msgs = mailsplit_bytes( + sys.stdin.buffer.read(), pipesep=cmdargs.stdin_pipe_sep + ) if not len(msgs): raise LookupError('Stdin did not contain any messages') @@ -4895,7 +5474,9 @@ def retrieve_messages(cmdargs: argparse.Namespace) -> Tuple[Optional[str], Optio if with_thread: msgs = get_strict_thread(mb_msgs, msgid) or [] if not msgs: - raise LookupError('Could not find %s in %s' % (msgid, cmdargs.localmbox)) + raise LookupError( + 'Could not find %s in %s' % (msgid, cmdargs.localmbox) + ) else: msgs = list() for msg in mb_msgs: @@ -4929,8 +5510,9 @@ def git_revparse_obj(gitobj: str, gitdir: Optional[str] = None) -> str: def _rewrite_fetch_head_origin(topdir: str, old_origin: str, new_origin: str) -> None: """Rewrite FETCH_HEAD to replace old_origin with a descriptive message.""" - ecode, fhf = git_run_command(topdir, ['rev-parse', '--git-path', 'FETCH_HEAD'], - logstderr=True) + ecode, fhf = git_run_command( + topdir, ['rev-parse', '--git-path', 'FETCH_HEAD'], logstderr=True + ) if ecode > 0: return fhf = fhf.rstrip() @@ -4943,9 +5525,14 @@ def _rewrite_fetch_head_origin(topdir: str, old_origin: str, new_origin: str) -> fhh.write(new_contents) -def git_fetch_am_into_repo(gitdir: Optional[str], ambytes: bytes, at_base: str = 'HEAD', - origin: Optional[str] = None, check_only: bool = False, - am_flags: Optional[List[str]] = None) -> None: +def git_fetch_am_into_repo( + gitdir: Optional[str], + ambytes: bytes, + at_base: str = 'HEAD', + origin: Optional[str] = None, + check_only: bool = False, + am_flags: Optional[List[str]] = None, +) -> None: if gitdir is None: gitdir = os.getcwd() topdir = git_get_toplevel(gitdir) @@ -4969,12 +5556,16 @@ def git_fetch_am_into_repo(gitdir: Optional[str], ambytes: bytes, at_base: str = cleanup = True try: logger.info('Magic: Preparing a sparse worktree') - ecode, out = git_run_command(gwt, ['sparse-checkout', 'set'], logstderr=True, rundir=gwt) + ecode, out = git_run_command( + gwt, ['sparse-checkout', 'set'], logstderr=True, rundir=gwt + ) if ecode > 0: logger.critical('Error running sparse-checkout set') logger.critical(out) raise RuntimeError - ecode, out = git_run_command(gwt, ['checkout', '-f'], logstderr=True, rundir=gwt) + ecode, out = git_run_command( + gwt, ['checkout', '-f'], logstderr=True, rundir=gwt + ) if ecode > 0: logger.critical('Error running checkout into sparse workdir') logger.critical(out) @@ -4982,7 +5573,9 @@ def git_fetch_am_into_repo(gitdir: Optional[str], ambytes: bytes, at_base: str = amargs = ['am'] if am_flags: amargs.extend(am_flags) - ecode, out = git_run_command(gwt, amargs, stdin=ambytes, logstderr=True, rundir=gwt) + ecode, out = git_run_command( + gwt, amargs, stdin=ambytes, logstderr=True, rundir=gwt + ) if ecode > 0: cleanup = False raise AmConflictError(gwt, out.strip()) @@ -5044,13 +5637,21 @@ def edit_in_editor(bdata: bytes, filehint: str = 'COMMIT_EDITMSG') -> bytes: write_branch = git_get_current_branch() if write_branch != read_branch: - with tempfile.NamedTemporaryFile(mode="wb", prefix=f"old-{read_branch}".replace("/", "-"), - delete=False) as save_file: + with tempfile.NamedTemporaryFile( + mode='wb', prefix=f'old-{read_branch}'.replace('/', '-'), delete=False + ) as save_file: save_file.write(bdata) - logger.critical('Editing started on branch %s, but current branch is %s.', - read_branch, write_branch) - logger.critical('To avoid a collision, your text was saved in %s', save_file.name) - raise RuntimeError(f"Branch changed during file editing, the temporary file was saved at {save_file.name}") + logger.critical( + 'Editing started on branch %s, but current branch is %s.', + read_branch, + write_branch, + ) + logger.critical( + 'To avoid a collision, your text was saved in %s', save_file.name + ) + raise RuntimeError( + f'Branch changed during file editing, the temporary file was saved at {save_file.name}' + ) return bdata @@ -5095,8 +5696,9 @@ def view_in_pager(bdata: bytes, filehint: str = 'b4-view.txt') -> None: spop.wait() -def map_codereview_trailers(qmsgs: List[EmailMessage], - ignore_msgids: Optional[Set[str]] = None) -> Dict[str, List['LoreMessage']]: +def map_codereview_trailers( + qmsgs: List[EmailMessage], ignore_msgids: Optional[Set[str]] = None +) -> Dict[str, List['LoreMessage']]: """ Map messages containing code-review trailers to patch-ids they were sent for. :param qmsgs: list of messages to process @@ -5150,9 +5752,14 @@ def map_codereview_trailers(qmsgs: List[EmailMessage], # Is it a patch? logger.debug(' subj: %s', _qmsg.full_subject) # Is it the cover letter? - if (_qmsg.counter == 0 and (not _qmsg.counters_inferred or _qmsg.has_diffstat) - and _qmsg.msgid in ref_map): - logger.debug(' stopping: found the cover letter for %s', qlmsg.full_subject) + if ( + _qmsg.counter == 0 + and (not _qmsg.counters_inferred or _qmsg.has_diffstat) + and _qmsg.msgid in ref_map + ): + logger.debug( + ' stopping: found the cover letter for %s', qlmsg.full_subject + ) if _qmsg.msgid not in covers: covers[_qmsg.msgid] = set() covers[_qmsg.msgid].add(qlmsg.msgid) @@ -5166,7 +5773,9 @@ def map_codereview_trailers(qmsgs: List[EmailMessage], patchid_map[pqpid] = list() if qlmsg not in patchid_map[pqpid]: patchid_map[pqpid].append(qlmsg) - logger.debug(' matched patch-id %s to %s', pqpid, qlmsg.full_subject) + logger.debug( + ' matched patch-id %s to %s', pqpid, qlmsg.full_subject + ) pfound = True break else: @@ -5187,7 +5796,9 @@ def map_codereview_trailers(qmsgs: List[EmailMessage], if qlmsg.in_reply_to == cmsgid and qlmsg.git_patch_id: pqpid = qlmsg.git_patch_id for fwmsgid in fwmsgids: - logger.debug('Adding cover follow-up %s to patch-id %s', fwmsgid, pqpid) + logger.debug( + 'Adding cover follow-up %s to patch-id %s', fwmsgid, pqpid + ) if pqpid not in patchid_map: patchid_map[pqpid] = list() patchid_map[pqpid].append(qmid_map[fwmsgid]) @@ -5215,7 +5826,7 @@ def get_msgs_from_mailbox_or_maildir(mbmd: str) -> List[EmailMessage]: return [x[1] for x in in_mdr.items()] # type: ignore[misc] in_mbx = mailbox.mbox(mbmd, factory=mailbox_email_factory) # type: ignore[arg-type] - return[x[1] for x in in_mbx.items()] # type: ignore[misc] + return [x[1] for x in in_mbx.items()] # type: ignore[misc] def get_mailfrom() -> Tuple[str, str]: @@ -5234,6 +5845,8 @@ def make_msgid(idstring: Optional[str] = None, domain: str = 'b4') -> str: def is_maildir(dest: str) -> bool: - return (os.path.isdir(os.path.join(dest, 'new')) - and os.path.isdir(os.path.join(dest, 'cur')) - and os.path.isdir(os.path.join(dest, 'tmp'))) + return ( + os.path.isdir(os.path.join(dest, 'new')) + and os.path.isdir(os.path.join(dest, 'cur')) + and os.path.isdir(os.path.join(dest, 'tmp')) + ) diff --git a/src/b4/bugs/__init__.py b/src/b4/bugs/__init__.py index cb21611..1cd2db4 100644 --- a/src/b4/bugs/__init__.py +++ b/src/b4/bugs/__init__.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: GPL-2.0-or-later # Copyright (C) 2020 by the Linux Foundation """b4 bugs: manage bug reports from mailing list threads.""" + import argparse import json import logging @@ -46,14 +47,21 @@ def _ensure_identity(topdir: str) -> bool: git_email = git_email.strip() if ecode_e == 0 else '' for user in users: if user.get('email', '') == git_email: - ecode, _out, _err = git_bug_cli(topdir, ['user', 'adopt', user['id']]) + ecode, _out, _err = git_bug_cli( + topdir, ['user', 'adopt', user['id']] + ) if ecode == 0: - logger.info('Adopted existing git-bug identity: %s', user.get('name', '')) + logger.info( + 'Adopted existing git-bug identity: %s', + user.get('name', ''), + ) return True # No email match -- adopt the first one ecode, _out, _err = git_bug_cli(topdir, ['user', 'adopt', users[0]['id']]) if ecode == 0: - logger.info('Adopted existing git-bug identity: %s', users[0].get('name', '')) + logger.info( + 'Adopted existing git-bug identity: %s', users[0].get('name', '') + ) return True # No identities at all -- create from git config after confirmation @@ -62,7 +70,9 @@ def _ensure_identity(topdir: str) -> bool: git_name = git_name.strip() if ecode_n == 0 else '' git_email = git_email.strip() if ecode_e == 0 else '' if not git_name or not git_email: - logger.critical('Cannot create git-bug identity: git user.name/user.email not configured') + logger.critical( + 'Cannot create git-bug identity: git user.name/user.email not configured' + ) return False logger.info('No git-bug identity found for this repository.') @@ -74,9 +84,18 @@ def _ensure_identity(topdir: str) -> bool: if answer and answer != 'y': return False - ecode, out, err = git_bug_cli(topdir, [ - 'user', 'new', '-n', git_name, '-e', git_email, '--non-interactive', - ]) + ecode, out, err = git_bug_cli( + topdir, + [ + 'user', + 'new', + '-n', + git_name, + '-e', + git_email, + '--non-interactive', + ], + ) if ecode != 0: logger.critical('Failed to create git-bug identity: %s', err.strip()) return False @@ -115,8 +134,9 @@ def cmd_import(cmdargs: argparse.Namespace) -> None: except RuntimeError as exc: logger.critical('Import failed: %s', exc) sys.exit(1) - logger.info('Created bug %s: %s (%d comments)', - bug.id[:7], bug.title, len(bug.comments)) + logger.info( + 'Created bug %s: %s (%d comments)', bug.id[:7], bug.title, len(bug.comments) + ) def cmd_refresh(cmdargs: argparse.Namespace) -> None: @@ -140,8 +160,7 @@ def cmd_refresh(cmdargs: argparse.Namespace) -> None: if count: logger.info('Bug %s: %d new comment(s)', bug.id[:7], count) total += count - logger.info('Refreshed %d bug(s), %d new comment(s) total', - len(bugs), total) + logger.info('Refreshed %d bug(s), %d new comment(s) total', len(bugs), total) def cmd_list(cmdargs: argparse.Namespace) -> None: @@ -161,8 +180,7 @@ def cmd_list(cmdargs: argparse.Namespace) -> None: for bug in bugs: icon = '\u25cf' if bug.status == Status.OPEN else '\u25cb' labels = ' '.join(f'[{label}]' for label in sorted(bug.labels)) - logger.info('%s %s %s %s', - icon, bug.id[:7], bug.title, labels) + logger.info('%s %s %s %s', icon, bug.id[:7], bug.title, labels) def cmd_delete(cmdargs: argparse.Namespace) -> None: diff --git a/src/b4/bugs/_import.py b/src/b4/bugs/_import.py index 954ebec..9114f70 100644 --- a/src/b4/bugs/_import.py +++ b/src/b4/bugs/_import.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: GPL-2.0-or-later # Copyright (C) 2020 by the Linux Foundation """Thread import engine: lore.kernel.org -> git-bug via ezgb.""" + import email.utils import logging import re @@ -101,6 +102,7 @@ def _get_clean_msgid(msg: EmailMessage) -> Optional[str]: def _sort_by_date(msgs: list[EmailMessage]) -> list[EmailMessage]: """Sort messages by their Date header, oldest first.""" + def _date_key(msg: EmailMessage) -> float: raw = msg.get('Date') if raw: @@ -109,11 +111,14 @@ def _sort_by_date(msgs: list[EmailMessage]) -> list[EmailMessage]: ) return parsed.timestamp() return 0.0 + return sorted(msgs, key=_date_key) def import_thread( - repo: GitBugRepo, msgid: str, noparent: bool = False, + repo: GitBugRepo, + msgid: str, + noparent: bool = False, ) -> Bug: """Import a lore.kernel.org thread as a new git-bug bug. @@ -132,9 +137,7 @@ def import_thread( if noparent: filtered = b4.get_strict_thread(msgs, msgid, noparent=True) if not filtered: - raise RuntimeError( - f'No messages in sub-thread for {msgid}' - ) + raise RuntimeError(f'No messages in sub-thread for {msgid}') msgs = filtered msgs = b4.mbox.minimize_thread(msgs) @@ -181,7 +184,8 @@ def import_thread( subject = '(no subject)' scope = 'no-parent' if noparent else '' bug = repo.create_bug( - title=subject, body=format_comment(root, scope=scope), + title=subject, + body=format_comment(root, scope=scope), ) # Add follow-up messages as comments, sorted by date diff --git a/src/b4/bugs/_tui.py b/src/b4/bugs/_tui.py index 998e6bb..aaf079f 100644 --- a/src/b4/bugs/_tui.py +++ b/src/b4/bugs/_tui.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: GPL-2.0-or-later # Copyright (C) 2020 by the Linux Foundation """Textual TUI for b4 bugs.""" + import email.message import email.utils import hashlib @@ -59,25 +60,25 @@ logger = logging.getLogger('b4') # Material UI colors used by git-bug for deterministic label coloring. # See entities/common/label.go in git-bug. _LABEL_COLORS = [ - (244, 67, 54), # red - (233, 30, 99), # pink - (156, 39, 176), # purple - (103, 58, 183), # deepPurple - (63, 81, 181), # indigo - (33, 150, 243), # blue - (3, 169, 244), # lightBlue - (0, 188, 212), # cyan - (0, 150, 136), # teal - (76, 175, 80), # green - (139, 195, 74), # lightGreen - (205, 220, 57), # lime - (255, 235, 59), # yellow - (255, 193, 7), # amber - (255, 152, 0), # orange - (255, 87, 34), # deepOrange - (121, 85, 72), # brown + (244, 67, 54), # red + (233, 30, 99), # pink + (156, 39, 176), # purple + (103, 58, 183), # deepPurple + (63, 81, 181), # indigo + (33, 150, 243), # blue + (3, 169, 244), # lightBlue + (0, 188, 212), # cyan + (0, 150, 136), # teal + (76, 175, 80), # green + (139, 195, 74), # lightGreen + (205, 220, 57), # lime + (255, 235, 59), # yellow + (255, 193, 7), # amber + (255, 152, 0), # orange + (255, 87, 34), # deepOrange + (121, 85, 72), # brown (158, 158, 158), # grey - (96, 125, 139), # blueGrey + (96, 125, 139), # blueGrey ] @@ -94,13 +95,13 @@ def label_color(label: str) -> str: _LIFECYCLE_SYMBOLS: dict[str, str] = { - 'new': '\u2605', # ★ black star - 'confirmed': '\u00a4', # ¤ currency sign (bug-like) + 'new': '\u2605', # ★ black star + 'confirmed': '\u00a4', # ¤ currency sign (bug-like) 'worksforme': '\u00f8', # ø latin small letter o with stroke - 'needinfo': '\u203d', # ‽ interrobang - 'wontfix': '\u2260', # ≠ not equal to - 'fixed': '\u2713', # ✓ check mark - 'duplicate': '\u2261', # ≡ identical to + 'needinfo': '\u203d', # ‽ interrobang + 'wontfix': '\u2260', # ≠ not equal to + 'fixed': '\u2713', # ✓ check mark + 'duplicate': '\u2261', # ≡ identical to } @@ -109,13 +110,13 @@ _LIFECYCLE_SYMBOLS: dict[str, str] = { # 1 = waiting (pending external input) # 2 = resolved (no action needed) _LIFECYCLE_TIER: dict[str, int] = { - 'new': 0, - 'confirmed': 0, - 'needinfo': 1, + 'new': 0, + 'confirmed': 0, + 'needinfo': 1, 'worksforme': 2, - 'wontfix': 2, - 'fixed': 2, - 'duplicate': 2, + 'wontfix': 2, + 'fixed': 2, + 'duplicate': 2, } @@ -125,7 +126,7 @@ def _bug_tier(bug: BugLike) -> int: return 2 for lb in bug.labels: if lb.startswith('lifecycle:'): - state = lb[len('lifecycle:'):] + state = lb[len('lifecycle:') :] return _LIFECYCLE_TIER.get(state, 0) return 0 @@ -148,7 +149,7 @@ def _bug_lifecycle(bug: BugLike) -> str: """ for lb in bug.labels: if lb.startswith('lifecycle:'): - state = lb[len('lifecycle:'):] + state = lb[len('lifecycle:') :] return _LIFECYCLE_SYMBOLS.get(state, '?') if bug.status == Status.CLOSED: return '\u00d7' # × multiplication sign @@ -193,8 +194,7 @@ def _relative_time(dt: datetime) -> str: return f'{years}y ago' -def _render_comment(viewer: RichLog, text: str, - ts: dict[str, str]) -> None: +def _render_comment(viewer: RichLog, text: str, ts: dict[str, str]) -> None: """Render an RFC 2822 formatted comment into a RichLog.""" if '\n\n' in text: header_block, body = text.split('\n\n', 1) @@ -206,8 +206,8 @@ def _render_comment(viewer: RichLog, text: str, colon = line.find(':') if colon > 0: hdr_text = Text() - hdr_text.append(line[:colon + 1], style='bold') - hdr_text.append(line[colon + 1:]) + hdr_text.append(line[: colon + 1], style='bold') + hdr_text.append(line[colon + 1 :]) viewer.write(hdr_text) else: viewer.write(Text(line)) @@ -225,6 +225,7 @@ def _render_comment(viewer: RichLog, text: str, # -- List item widget -------------------------------------------------------- + def _bug_submitter(bug: Bug) -> str: """Get the submitter name from the first comment's From header.""" if bug.comments: @@ -253,8 +254,7 @@ class BugListItem(ListItem): count = bug.comment_count else: submitter = _bug_submitter(bug) - count = sum(1 for c in bug.comments - if not is_comment_removed(c.text)) + count = sum(1 for c in bug.comments if not is_comment_removed(c.text)) if display_width(submitter) > 20: while display_width(submitter) > 19: submitter = submitter[:-1] @@ -276,10 +276,11 @@ class BugListItem(ListItem): # -- Modal screens ----------------------------------------------------------- + class ImportScreen(ModalScreen[Optional[str]]): """Modal for importing a lore thread by message-id.""" - DEFAULT_CSS = ''' + DEFAULT_CSS = """ ImportScreen { align: center middle; } @@ -295,7 +296,7 @@ class ImportScreen(ModalScreen[Optional[str]]): height: auto; margin-top: 1; } - ''' + """ BINDINGS = [ Binding('escape', 'cancel', 'cancel'), @@ -305,8 +306,7 @@ class ImportScreen(ModalScreen[Optional[str]]): with Vertical(id='import-dialog') as dialog: dialog.border_title = 'Import thread from lore' yield Input(placeholder='Message-ID or lore URL', id='import-msgid') - yield Checkbox('Ignore parent messages in thread', - id='import-noparent') + yield Checkbox('Ignore parent messages in thread', id='import-noparent') yield Static('', id='import-status') def on_input_submitted(self, event: Input.Submitted) -> None: @@ -325,11 +325,14 @@ class ImportScreen(ModalScreen[Optional[str]]): status.update('Importing...') self.run_worker( lambda: self._do_import(msgid, noparent), - name='import', thread=True, exit_on_error=False, + name='import', + thread=True, + exit_on_error=False, ) def _do_import(self, msgid: str, noparent: bool) -> str: from b4.bugs._import import import_thread + app = self.app if not isinstance(app, BugListApp): raise RuntimeError('ImportScreen must be used with BugListApp') @@ -337,9 +340,7 @@ class ImportScreen(ModalScreen[Optional[str]]): with _quiet_worker(): bug = import_thread(app.repo, msgid, noparent=noparent) except RuntimeError: - raise RuntimeError( - 'Could not retrieve message thread' - ) from None + raise RuntimeError('Could not retrieve message thread') from None return bug.id async def on_worker_state_changed(self, event: Worker.StateChanged) -> None: @@ -366,6 +367,7 @@ class CommentItem(ListItem): def compose(self) -> ComposeResult: from textual.widgets import Label + st = Label(f' {self._display_name}', markup=False) st.styles.text_style = 'dim' yield st @@ -374,7 +376,7 @@ class CommentItem(ListItem): class BugDetailScreen(ModalScreen[None]): """Full-screen bug detail view with left pane navigation.""" - DEFAULT_CSS = ''' + DEFAULT_CSS = """ BugDetailScreen { background: $surface; } @@ -419,7 +421,7 @@ class BugDetailScreen(ModalScreen[None]): #detail-log { width: 3fr; } - ''' + """ BINDINGS = [ Binding('a', 'bug_action', 'action'), @@ -469,8 +471,7 @@ class BugDetailScreen(ModalScreen[None]): with Horizontal(id='detail-body'): with Vertical(id='comment-list-pane'): yield ListView(id='comment-list') - yield RichLog(id='detail-log', wrap=True, markup=False, - auto_scroll=False) + yield RichLog(id='detail-log', wrap=True, markup=False, auto_scroll=False) yield SeparatedFooter() def on_mount(self) -> None: @@ -479,10 +480,17 @@ class BugDetailScreen(ModalScreen[None]): self._ts = app._ts # Build colour map for commenters - palette = reviewer_colours(self._ts) if self._ts else [ - 'dark_goldenrod', 'dark_cyan', 'dark_magenta', - 'dark_red', 'dark_blue', - ] + palette = ( + reviewer_colours(self._ts) + if self._ts + else [ + 'dark_goldenrod', + 'dark_cyan', + 'dark_magenta', + 'dark_red', + 'dark_blue', + ] + ) emails: list[str] = [] for comment in self.bug.comments: addr = '' @@ -509,15 +517,15 @@ class BugDetailScreen(ModalScreen[None]): if parent_idx is not None: parent_depth = self._comment_depths.get(parent_idx, 0) self._comment_depths[i] = min( - parent_depth + 1, self._MAX_DEPTH, + parent_depth + 1, + self._MAX_DEPTH, ) continue self._comment_depths[i] = 0 # Build visible comment indices (skip removed) self._visible_indices = [ - i for i, c in enumerate(self.bug.comments) - if not is_comment_removed(c.text) + i for i, c in enumerate(self.bug.comments) if not is_comment_removed(c.text) ] # Populate left pane @@ -530,6 +538,7 @@ class BugDetailScreen(ModalScreen[None]): def _initial_populate() -> None: self._populate_richlog() self.query_one('#comment-list', ListView).focus() + self.call_after_refresh(_initial_populate) def _build_comment_items(self) -> list[CommentItem]: @@ -571,8 +580,7 @@ class BugDetailScreen(ModalScreen[None]): subsequent appends, causing stale child counts. """ self._visible_indices = [ - i for i, c in enumerate(self.bug.comments) - if not is_comment_removed(c.text) + i for i, c in enumerate(self.bug.comments) if not is_comment_removed(c.text) ] # Replace the ListView widget and rebuild the RichLog in a # single batch so the screen doesn't flicker mid-rebuild. @@ -600,7 +608,10 @@ class BugDetailScreen(ModalScreen[None]): return self._ts.get('accent', 'cyan') def _render_comment_panel( - self, viewer: RichLog, comment: Comment, idx: int, + self, + viewer: RichLog, + comment: Comment, + idx: int, depth: int = 0, ) -> None: """Render a comment as a bordered panel in the review app style.""" @@ -611,7 +622,7 @@ class BugDetailScreen(ModalScreen[None]): header_block, body = text, '' colour = self._get_comment_colour(comment) - bg = f"on {self._ts['panel']}" if self._ts.get('panel') else 'on grey11' + bg = f'on {self._ts["panel"]}' if self._ts.get('panel') else 'on grey11' # Extract sender name for panel title from_hdr = parse_comment_header(text, 'From') @@ -633,7 +644,7 @@ class BugDetailScreen(ModalScreen[None]): colon = line.find(':') if colon > 0: hdr_name = line[:colon] - hdr_val = line[colon + 1:].strip() + hdr_val = line[colon + 1 :].strip() if hdr_name == 'In-Reply-To': continue if hdr_name == 'Message-ID': @@ -672,6 +683,7 @@ class BugDetailScreen(ModalScreen[None]): ) if depth > 0: from rich.padding import Padding + viewer.write(Padding(panel, pad=(0, 0, 0, depth * 2))) else: viewer.write(panel) @@ -802,13 +814,15 @@ class BugDetailScreen(ModalScreen[None]): with self.app.suspend(): try: result = b4.edit_in_editor( - template.encode(), filehint='bug-comment.md', + template.encode(), + filehint='bug-comment.md', ) except Exception as exc: logger.critical('Editor error: %s', exc) return # Strip HTML comments and check if anything remains import re + text = result.decode(errors='replace') text = re.sub(r'', '', text, flags=re.DOTALL).strip() if not text: @@ -825,7 +839,8 @@ class BugDetailScreen(ModalScreen[None]): """Return the currently selected comment, or None.""" lv = self.query_one('#comment-list', ListView) if lv.highlighted_child is not None and isinstance( - lv.highlighted_child, CommentItem, + lv.highlighted_child, + CommentItem, ): idx = lv.highlighted_child.comment_idx if idx < len(self.bug.comments): @@ -841,8 +856,9 @@ class BugDetailScreen(ModalScreen[None]): # Get message-id from comment — required for reply msgid = parse_comment_header(comment.text, 'Message-ID') if not msgid: - self.notify('No Message-ID in this comment, cannot reply', - severity='warning') + self.notify( + 'No Message-ID in this comment, cannot reply', severity='warning' + ) return # Fetch the original message from lore @@ -857,10 +873,12 @@ class BugDetailScreen(ModalScreen[None]): """Fetch the original message and compose a reply.""" # Determine if this bug uses --no-parent scope scope = parse_comment_header( - self.bug.comments[0].text, 'X-B4-Bug-Scope', + self.bug.comments[0].text, + 'X-B4-Bug-Scope', ) root_msgid = parse_comment_header( - self.bug.comments[0].text, 'Message-ID', + self.bug.comments[0].text, + 'Message-ID', ) if not root_msgid: root_msgid = msgid @@ -871,7 +889,8 @@ class BugDetailScreen(ModalScreen[None]): msgs = b4.get_pi_thread_by_msgid(fetch_id) if not msgs: self.app.call_from_thread( - self.notify, 'Could not retrieve thread from lore', + self.notify, + 'Could not retrieve thread from lore', severity='error', ) return @@ -879,7 +898,9 @@ class BugDetailScreen(ModalScreen[None]): # Apply --no-parent filter if applicable if scope == 'no-parent': filtered = b4.get_strict_thread( - msgs, fetch_id, noparent=True, + msgs, + fetch_id, + noparent=True, ) if filtered: msgs = filtered @@ -896,7 +917,8 @@ class BugDetailScreen(ModalScreen[None]): if target_msg is None: self.app.call_from_thread( - self.notify, f'Message {msgid} not found in thread', + self.notify, + f'Message {msgid} not found in thread', severity='error', ) return @@ -922,21 +944,23 @@ class BugDetailScreen(ModalScreen[None]): # Schedule the editor open on the main thread self.app.call_from_thread( - self._open_reply_editor, lmsg, reply_text, + self._open_reply_editor, + lmsg, + reply_text, ) - def _open_reply_editor(self, lmsg: 'b4.LoreMessage', - reply_text: str) -> None: + def _open_reply_editor(self, lmsg: 'b4.LoreMessage', reply_text: str) -> None: """Open editor and show preview (runs on main thread).""" self._reply_edit_loop(lmsg, reply_text) - def _reply_edit_loop(self, lmsg: 'b4.LoreMessage', - reply_text: str, - is_reedit: bool = False) -> None: + def _reply_edit_loop( + self, lmsg: 'b4.LoreMessage', reply_text: str, is_reedit: bool = False + ) -> None: with self.app.suspend(): try: result = b4.edit_in_editor( - reply_text.encode(), filehint='reply.eml', + reply_text.encode(), + filehint='reply.eml', ) except Exception as exc: logger.critical('Editor error: %s', exc) @@ -960,7 +984,8 @@ class BugDetailScreen(ModalScreen[None]): self._reply_edit_loop(lmsg, final_text, is_reedit=True) self.app.push_screen( - ReplyPreviewScreen(reply_msg), callback=_on_preview, + ReplyPreviewScreen(reply_msg), + callback=_on_preview, ) def _send_reply(self, msg: 'email.message.EmailMessage') -> None: @@ -973,7 +998,8 @@ class BugDetailScreen(ModalScreen[None]): try: smtp, fromaddr = b4.get_smtp(dryrun=dryrun) sent = b4.send_mail( - smtp, [msg], + smtp, + [msg], fromaddr=fromaddr, patatt_sign=patatt_sign, dryrun=dryrun, @@ -990,6 +1016,7 @@ class BugDetailScreen(ModalScreen[None]): self.notify('Reply sent') # Record the reply as a comment on the bug from b4.bugs._import import format_comment + comment_text = format_comment(msg) app.repo.add_comment(self.bug.id, comment_text) app.repo.invalidate(self.bug.id) @@ -1021,8 +1048,9 @@ class BugDetailScreen(ModalScreen[None]): if not isinstance(app, BugListApp): return usercfg = b4.get_user_config() - identity = (f'{usercfg.get("name", "Unknown")} ' - f'<{usercfg.get("email", "unknown")}>') + identity = ( + f'{usercfg.get("name", "Unknown")} <{usercfg.get("email", "unknown")}>' + ) tombstone = make_tombstone(comment.text, identity) app.repo.edit_comment(self.bug.id, comment.id, tombstone) self._refresh_bug_view() @@ -1032,8 +1060,10 @@ class BugDetailScreen(ModalScreen[None]): self.app.push_screen( ConfirmScreen( title='Remove comment?', - body=[f'From: {sender}', - 'The comment body will be permanently removed.'], + body=[ + f'From: {sender}', + 'The comment body will be permanently removed.', + ], border='$warning', ), callback=_on_confirm, @@ -1041,6 +1071,7 @@ class BugDetailScreen(ModalScreen[None]): def action_edit_title(self) -> None: """Edit the bug title.""" + def _on_result(new_title: Optional[str]) -> None: if not new_title: return @@ -1057,7 +1088,8 @@ class BugDetailScreen(ModalScreen[None]): self.query_one('#detail-header', Static).update(header) self.app.push_screen( - EditTitleScreen(self.bug.title), callback=_on_result, + EditTitleScreen(self.bug.title), + callback=_on_result, ) def action_add_label(self) -> None: @@ -1093,12 +1125,14 @@ class BugDetailScreen(ModalScreen[None]): return bid = self.bug.id if action == 'delete': + def _on_delete(confirmed: bool | None) -> None: if not confirmed: return app.repo.remove_bug(bid) app.repo.invalidate() self.dismiss(None) + self.app.push_screen( ConfirmScreen( title='Delete bug?', @@ -1110,14 +1144,14 @@ class BugDetailScreen(ModalScreen[None]): ) return if action == 'duplicate': + def _on_dup(target_id: Optional[str]) -> None: if not target_id: return target = app.repo.get_bug(target_id) app.repo.add_comment( bid, - f'Closing as duplicate of {target.id[:7]}: ' - f'{target.title}', + f'Closing as duplicate of {target.id[:7]}: {target.title}', ) for lb in self.bug.labels: if lb.startswith('lifecycle:'): @@ -1126,6 +1160,7 @@ class BugDetailScreen(ModalScreen[None]): app.repo.set_status(bid, Status.CLOSED) app.repo.invalidate() self.dismiss(None) + self.app.push_screen( DuplicateScreen(app.repo, self.bug), callback=_on_dup, @@ -1151,8 +1186,8 @@ class BugDetailScreen(ModalScreen[None]): self._refresh_bug_view() self.app.push_screen( - ActionScreen(actions, shortcuts=_ACTION_SHORTCUTS), - callback=_on_result) + ActionScreen(actions, shortcuts=_ACTION_SHORTCUTS), callback=_on_result + ) def action_back(self) -> None: self.dismiss(None) @@ -1164,7 +1199,7 @@ class ReplyPreviewScreen(ModalScreen[Optional[str]]): Returns 'send' to send, 'edit' to re-edit, or None to abandon. """ - DEFAULT_CSS = ''' + DEFAULT_CSS = """ ReplyPreviewScreen { background: $surface; } @@ -1183,7 +1218,7 @@ class ReplyPreviewScreen(ModalScreen[Optional[str]]): padding: 0 1; color: $text-muted; } - ''' + """ BINDINGS = [ Binding('S', 'send', 'Send'), @@ -1242,6 +1277,7 @@ class ReplyPreviewScreen(ModalScreen[Optional[str]]): def action_edit_tocc(self) -> None: from b4.tui import ToCcScreen + to_val = b4.LoreMessage.clean_header(self._msg.get('To', '')) cc_val = b4.LoreMessage.clean_header(self._msg.get('Cc', '')) screen = ToCcScreen(to_val, cc_val, '', show_apply_all=False) @@ -1281,7 +1317,6 @@ class ReplyPreviewScreen(ModalScreen[Optional[str]]): self.query_one('#reply-preview-log', RichLog).scroll_page_up() - class LabelOption(ListItem): """A toggleable label option in the label selection dialog.""" @@ -1292,6 +1327,7 @@ class LabelOption(ListItem): def compose(self) -> ComposeResult: from textual.widgets import Label + mark = 'x' if self.selected else ' ' text = Text() text.append(f'[{mark}] ') @@ -1302,6 +1338,7 @@ class LabelOption(ListItem): def toggle(self) -> None: self.selected = not self.selected from textual.widgets import Label + mark = 'x' if self.selected else ' ' text = Text() text.append(f'[{mark}] ') @@ -1322,7 +1359,7 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): _list_id = '#label-list' - DEFAULT_CSS = ''' + DEFAULT_CSS = """ LabelScreen { align: center middle; } @@ -1342,7 +1379,7 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): margin-top: 1; color: $text-muted; } - ''' + """ BINDINGS = [ Binding('space', 'toggle_item', 'Toggle', show=False), @@ -1355,7 +1392,8 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): ] def __init__( - self, current_labels: set[str] | frozenset[str], + self, + current_labels: set[str] | frozenset[str], suggestions: list[str], ) -> None: super().__init__() @@ -1365,10 +1403,7 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): self._suggestions = suggestions def compose(self) -> ComposeResult: - items = [ - LabelOption(lb, initially_selected=True) - for lb in self._current - ] + items = [LabelOption(lb, initially_selected=True) for lb in self._current] with Vertical(id='label-dialog') as dialog: dialog.border_title = 'Select labels' if not items: @@ -1381,7 +1416,9 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): else: yield ListView(*items, id='label-list') yield Static( - Text('space toggle | [a] add new | Enter save | Escape cancel'), + Text( + 'space toggle | [a] add new | Enter save | Escape cancel' + ), id='label-hint', ) @@ -1391,7 +1428,8 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): def action_toggle_item(self) -> None: lv = self.query_one('#label-list', ListView) if lv.highlighted_child is not None and isinstance( - lv.highlighted_child, LabelOption, + lv.highlighted_child, + LabelOption, ): lv.highlighted_child.toggle() @@ -1405,7 +1443,9 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): has_items = len(lv.children) > 0 hint = self.query_one('#label-hint', Static) if has_items: - hint.update(Text('space toggle | [a] add new | Enter save | Escape cancel')) + hint.update( + Text('space toggle | [a] add new | Enter save | Escape cancel') + ) else: hint.update(Text('[a] add new | Escape cancel')) @@ -1425,6 +1465,7 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): lv.index = len(lv.children) - 1 lv.focus() self._update_hint() + self.app.push_screen( AddLabelScreen(self._suggestions), callback=_on_added, @@ -1457,7 +1498,7 @@ class LabelScreen(JKListNavMixin, ModalScreen[Optional[dict[str, list[str]]]]): class AddLabelScreen(ModalScreen[Optional[str]]): """Text input for adding a brand-new label, with suggestions.""" - DEFAULT_CSS = ''' + DEFAULT_CSS = """ AddLabelScreen { align: center middle; } @@ -1468,7 +1509,7 @@ class AddLabelScreen(ModalScreen[Optional[str]]): background: $surface; padding: 1 2; } - ''' + """ BINDINGS = [ Binding('escape', 'cancel', 'cancel'), @@ -1481,7 +1522,8 @@ class AddLabelScreen(ModalScreen[Optional[str]]): def compose(self) -> ComposeResult: suggester = ( SuggestFromList(self._suggestions, case_sensitive=False) - if self._suggestions else None + if self._suggestions + else None ) with Vertical(id='addlabel-dialog') as dialog: dialog.border_title = 'Add label' @@ -1505,21 +1547,21 @@ class AddLabelScreen(ModalScreen[Optional[str]]): # Shortcut keys for the bug action selector. _ACTION_SHORTCUTS: dict[str, str] = { - 'confirmed': 'c', - 'needinfo': 'n', + 'confirmed': 'c', + 'needinfo': 'n', 'worksforme': 'w', - 'wontfix': 'x', - 'fixed': 'f', - 'duplicate': 'd', - 'reopen': 'r', - 'delete': 'D', + 'wontfix': 'x', + 'fixed': 'f', + 'duplicate': 'd', + 'reopen': 'r', + 'delete': 'D', } class UpdateBugsScreen(ModalScreen[Optional[dict[str, int]]]): """Modal showing progress while updating bugs from lore.""" - DEFAULT_CSS = ''' + DEFAULT_CSS = """ UpdateBugsScreen { align: center middle; } @@ -1537,7 +1579,7 @@ class UpdateBugsScreen(ModalScreen[Optional[dict[str, int]]]): margin-top: 1; color: $text-muted; } - ''' + """ BINDINGS = [ Binding('escape', 'cancel', 'cancel'), @@ -1550,11 +1592,14 @@ class UpdateBugsScreen(ModalScreen[Optional[dict[str, int]]]): self._repo = repo self._cancelled = False self._result: dict[str, int] = { - 'checked': 0, 'updated': 0, 'new_comments': 0, + 'checked': 0, + 'updated': 0, + 'new_comments': 0, } def compose(self) -> ComposeResult: from textual.widgets import Label, ProgressBar + count = len(self._bugs) title = f'Updating {count} bug(s)' if count > 1 else 'Updating bug' with Vertical(id='update-dialog') as dialog: @@ -1565,16 +1610,21 @@ class UpdateBugsScreen(ModalScreen[Optional[dict[str, int]]]): ) yield Label('', id='update-bug', markup=False) yield ProgressBar( - total=count, show_eta=False, id='update-progress', + total=count, + show_eta=False, + id='update-progress', ) def on_mount(self) -> None: self.run_worker( - self._do_updates, name='_do_updates', thread=True, + self._do_updates, + name='_do_updates', + thread=True, ) def _update_progress(self, completed: int, title: str) -> None: from textual.widgets import Label, ProgressBar + count = len(self._bugs) self.query_one('#update-status', Label).update( f'Checking {completed}/{count} bugs...', @@ -1584,12 +1634,15 @@ class UpdateBugsScreen(ModalScreen[Optional[dict[str, int]]]): def _do_updates(self) -> dict[str, int]: from b4.bugs._import import refresh_bug + with _quiet_worker(): for i, bug in enumerate(self._bugs): if self._cancelled: break self.app.call_from_thread( - self._update_progress, i, bug.title, + self._update_progress, + i, + bug.title, ) try: count = refresh_bug(self._repo, bug.id) @@ -1600,12 +1653,15 @@ class UpdateBugsScreen(ModalScreen[Optional[dict[str, int]]]): self._result['updated'] += 1 self._result['new_comments'] += count self.app.call_from_thread( - self._update_progress, i + 1, bug.title, + self._update_progress, + i + 1, + bug.title, ) return self._result async def on_worker_state_changed( - self, event: Worker.StateChanged, + self, + event: Worker.StateChanged, ) -> None: if event.worker.name != '_do_updates': return @@ -1624,7 +1680,7 @@ class DuplicateScreen(ModalScreen[Optional[str]]): Returns the resolved bug ID on confirm, or None on cancel. """ - DEFAULT_CSS = ''' + DEFAULT_CSS = """ DuplicateScreen { align: center middle; } @@ -1640,7 +1696,7 @@ class DuplicateScreen(ModalScreen[Optional[str]]): margin-top: 1; color: $text-muted; } - ''' + """ BINDINGS = [ Binding('escape', 'cancel', 'cancel'), @@ -1655,8 +1711,7 @@ class DuplicateScreen(ModalScreen[Optional[str]]): with Vertical(id='dup-dialog') as dialog: dialog.border_title = 'Close as duplicate of' yield Input(placeholder='Bug ID', id='dup-input') - yield Static('Enter confirm | Escape cancel', - id='dup-status') + yield Static('Enter confirm | Escape cancel', id='dup-status') def on_mount(self) -> None: self.query_one('#dup-input', Input).focus() @@ -1685,7 +1740,7 @@ class DuplicateScreen(ModalScreen[Optional[str]]): class EditTitleScreen(ModalScreen[Optional[str]]): """Edit a bug's title.""" - DEFAULT_CSS = ''' + DEFAULT_CSS = """ EditTitleScreen { align: center middle; } @@ -1700,7 +1755,7 @@ class EditTitleScreen(ModalScreen[Optional[str]]): margin-top: 1; color: $text-muted; } - ''' + """ BINDINGS = [ Binding('escape', 'cancel', 'cancel'), @@ -1714,8 +1769,7 @@ class EditTitleScreen(ModalScreen[Optional[str]]): with Vertical(id='edit-title-dialog') as dialog: dialog.border_title = 'Edit title' yield Input(value=self._current_title, id='edit-title-input') - yield Static('Enter save | Escape cancel', - id='edit-title-hint') + yield Static('Enter save | Escape cancel', id='edit-title-hint') def on_mount(self) -> None: self.query_one('#edit-title-input', Input).focus() @@ -1733,6 +1787,7 @@ class EditTitleScreen(ModalScreen[Optional[str]]): # -- Main app ---------------------------------------------------------------- + class BugListApp(JKListNavMixin, App[None]): """Bug management TUI backed by git-bug via ezgb.""" @@ -1740,7 +1795,7 @@ class BugListApp(JKListNavMixin, App[None]): _list_id = '#bug-list' - DEFAULT_CSS = ''' + DEFAULT_CSS = """ BugListApp { layout: vertical; } @@ -1780,7 +1835,7 @@ class BugListApp(JKListNavMixin, App[None]): width: 14; text-style: bold; } - ''' + """ BINDINGS = [ Binding('j', 'cursor_down', 'down', show=False), @@ -1812,9 +1867,9 @@ class BugListApp(JKListNavMixin, App[None]): 'action_quit': 'global', } - def __init__(self, repo: GitBugRepo, *, - email_dryrun: bool = False, - no_sign: bool = False) -> None: + def __init__( + self, repo: GitBugRepo, *, email_dryrun: bool = False, no_sign: bool = False + ) -> None: super().__init__() self.repo = repo self.email_dryrun = email_dryrun @@ -1832,7 +1887,9 @@ class BugListApp(JKListNavMixin, App[None]): def compose(self) -> ComposeResult: yield Static('b4 bugs', id='title-bar') - header_text = f'{"ID":<7s} {"Submitter":<20s} {"Msgs":>4s} {"S"} {"Subject"}' + header_text = ( + f'{"ID":<7s} {"Submitter":<20s} {"Msgs":>4s} {"S"} {"Subject"}' + ) yield Static(header_text, id='column-header') yield ListView(id='bug-list') with Vertical(id='details-panel'): @@ -1882,8 +1939,7 @@ class BugListApp(JKListNavMixin, App[None]): if current_mtime != self._cache_mtime: self._cache_mtime = current_mtime self._save_focus() - self.run_worker(self._load_bugs, name='load_bugs', - thread=True) + self.run_worker(self._load_bugs, name='load_bugs', thread=True) def on_mount(self) -> None: self._ts = resolve_styles(self) @@ -1947,20 +2003,18 @@ class BugListApp(JKListNavMixin, App[None]): # or the limit pattern has an explicit s: token has_explicit_status = self._has_status_token(self._limit_pattern) if not self._show_closed and not has_explicit_status: - display_bugs = [ - b for b in display_bugs if b.status == Status.OPEN - ] + display_bugs = [b for b in display_bugs if b.status == Status.OPEN] if self._limit_pattern: display_bugs = [ - b for b in display_bugs - if self._matches_limit(b, self._limit_pattern) + b for b in display_bugs if self._matches_limit(b, self._limit_pattern) ] # Sort: by last activity (newest first) within each tier, # then by tier (active → waiting → resolved). display_bugs.sort( - key=_bug_last_activity, reverse=True, + key=_bug_last_activity, + reverse=True, ) display_bugs.sort(key=_bug_tier) @@ -1968,7 +2022,9 @@ class BugListApp(JKListNavMixin, App[None]): items: list[BugListItem] = [] for bug in display_bugs: - count = bug.comment_count if isinstance(bug, BugSummary) else len(bug.comments) + count = ( + bug.comment_count if isinstance(bug, BugSummary) else len(bug.comments) + ) seen = self._seen_counts.get(bug.id, count) unseen = max(0, count - seen) items.append(BugListItem(bug, unseen=unseen)) @@ -1999,7 +2055,11 @@ class BugListApp(JKListNavMixin, App[None]): # new comments that arrived since the baseline. for bug in self._all_bugs: if bug.id not in self._seen_counts: - count = bug.comment_count if isinstance(bug, BugSummary) else len(bug.comments) + count = ( + bug.comment_count + if isinstance(bug, BugSummary) + else len(bug.comments) + ) self._seen_counts[bug.id] = count # Collect all known labels across all bugs all_labels: set[str] = set() @@ -2023,7 +2083,7 @@ class BugListApp(JKListNavMixin, App[None]): lifecycle = '' for lb in bug.labels: if lb.startswith('lifecycle:'): - lifecycle = lb[len('lifecycle:'):] + lifecycle = lb[len('lifecycle:') :] break git_status = 'open' if bug.status == Status.OPEN else 'closed' if lifecycle: @@ -2050,15 +2110,13 @@ class BugListApp(JKListNavMixin, App[None]): self.query_one('#detail-labels', Static).update('none') created_str = ( - f'{bug.created_at:%Y-%m-%d %H:%M} ' - f'({_relative_time(bug.created_at)})' + f'{bug.created_at:%Y-%m-%d %H:%M} ({_relative_time(bug.created_at)})' ) self.query_one('#detail-created', Static).update(created_str) if isinstance(bug, BugSummary): edited_str = ( - f'{bug.edited_at:%Y-%m-%d %H:%M} ' - f'({_relative_time(bug.edited_at)})' + f'{bug.edited_at:%Y-%m-%d %H:%M} ({_relative_time(bug.edited_at)})' ) self.query_one('#detail-last-activity', Static).update(edited_str) self.query_one('#detail-comments', Static).update( @@ -2072,8 +2130,7 @@ class BugListApp(JKListNavMixin, App[None]): f'by {last.author.name}' ) self.query_one('#detail-last-activity', Static).update(last_str) - visible = sum(1 for c in bug.comments - if not is_comment_removed(c.text)) + visible = sum(1 for c in bug.comments if not is_comment_removed(c.text)) self.query_one('#detail-comments', Static).update( str(visible), ) @@ -2090,7 +2147,11 @@ class BugListApp(JKListNavMixin, App[None]): # Mark as seen so the badge clears on return bug_id = event.item.bug.id bug_obj = event.item.bug - count = bug_obj.comment_count if isinstance(bug_obj, BugSummary) else len(bug_obj.comments) + count = ( + bug_obj.comment_count + if isinstance(bug_obj, BugSummary) + else len(bug_obj.comments) + ) self._seen_counts[bug_id] = count # Load full Bug on demand (CachedBug doesn't have comments) bug = self.repo.get_bug(bug_id) @@ -2098,17 +2159,17 @@ class BugListApp(JKListNavMixin, App[None]): def _on_dismiss(_result: None) -> None: self.repo.invalidate() self._save_focus() - self.run_worker(self._load_bugs, name='load_bugs', - thread=True) + self.run_worker(self._load_bugs, name='load_bugs', thread=True) - self.push_screen(BugDetailScreen(bug), - callback=_on_dismiss) + self.push_screen(BugDetailScreen(bug), callback=_on_dismiss) # -- Actions ------------------------------------------------------------- def _get_selected_bug(self) -> Optional[BugLike]: lv = self.query_one('#bug-list', ListView) - if lv.highlighted_child is not None and isinstance(lv.highlighted_child, BugListItem): + if lv.highlighted_child is not None and isinstance( + lv.highlighted_child, BugListItem + ): return lv.highlighted_child.bug return None @@ -2125,9 +2186,11 @@ class BugListApp(JKListNavMixin, App[None]): def action_limit(self) -> None: self.push_screen( - LimitScreen(self._limit_pattern, - hint='Prefixes: s: l: