All of lore.kernel.org
 help / color / mirror / Atom feed
From: Tamir Duberstein <tamird@kernel.org>
To: "Kernel.org Tools" <tools@kernel.org>
Cc: Konstantin Ryabitsev <konstantin@linuxfoundation.org>,
	 Tamir Duberstein <tamird@kernel.org>, 'test@example.com'}
Subject: [PATCH patatt 3/7] Add Ruff format check
Date: Sun, 19 Apr 2026 21:22:23 -0400	[thread overview]
Message-ID: <20260419-stronger-type-checking-v1-3-5c108048d2c7@kernel.org> (raw)
In-Reply-To: <20260419-stronger-type-checking-v1-0-5c108048d2c7@kernel.org>

Run Ruff's formatter as part of the local CI helper and configure it to
preserve the project's single-quote style.

Apply the one-time formatting pass so the new check starts from a green
baseline.

Signed-off-by: Tamir Duberstein <tamird@kernel.org>
---
 ci.sh                               |   1 +
 conftest.py                         |   3 +-
 pyproject.toml                      |   3 +
 src/patatt/__init__.py              | 345 +++++++++++++++++++++++++++---------
 tests/conftest.py                   |  10 +-
 tests/test_validation.py            |  16 +-
 tests/unit/test_byhash.py           |   2 -
 tests/unit/test_devsig_header.py    |  17 +-
 tests/unit/test_get_algo_keydata.py |   9 +-
 tests/unit/test_patatt_message.py   |  20 ++-
 10 files changed, 307 insertions(+), 119 deletions(-)

diff --git a/ci.sh b/ci.sh
index 3001db2..ec0baf8 100755
--- a/ci.sh
+++ b/ci.sh
@@ -2,6 +2,7 @@
 
 set -eu
 
+uv run ruff format --check
 uv run ruff check
 uv run mypy .
 uv run pytest --durations=0
diff --git a/conftest.py b/conftest.py
index c106f53..33276ab 100644
--- a/conftest.py
+++ b/conftest.py
@@ -3,7 +3,6 @@ from pathlib import Path
 
 # Add the src directory to the Python path
 project_root = Path(__file__).parent
-src_path = str(project_root / "src")
+src_path = str(project_root / 'src')
 if src_path not in sys.path:
     sys.path.insert(0, src_path)
-
diff --git a/pyproject.toml b/pyproject.toml
index 7583cd4..d56d828 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -58,6 +58,9 @@ extend-select = [
 ]
 flake8-quotes.inline-quotes = "single"
 
+[tool.ruff.format]
+quote-style = "single"
+
 [tool.pyright]
 typeCheckingMode = "off"
 
diff --git a/src/patatt/__init__.py b/src/patatt/__init__.py
index 935da55..24ec933 100644
--- a/src/patatt/__init__.py
+++ b/src/patatt/__init__.py
@@ -55,6 +55,7 @@ CONFIGCACHE: Dict[str, GitConfigType] = dict()
 __VERSION__ = '0.7.1'
 MAX_SUPPORTED_FORMAT_VERSION = 1
 
+
 class Error(Exception):
     """Base exception for patatt errors.
 
@@ -171,8 +172,11 @@ class DevsigHeader:
         .. deprecated::
             Use :meth:`get_field_as_bytes` or :meth:`get_field_as_str` instead.
         """
-        warnings.warn('get_field() is deprecated, use get_field_as_bytes() or get_field_as_str() instead',
-                      DeprecationWarning, stacklevel=2)
+        warnings.warn(
+            'get_field() is deprecated, use get_field_as_bytes() or get_field_as_str() instead',
+            DeprecationWarning,
+            stacklevel=2,
+        )
         value = self.hdata.get(field)
         if isinstance(value, bytes) and decode:
             return value.decode()
@@ -247,8 +251,10 @@ class DevsigHeader:
         if mode == 'sign':
             # Make sure REQ_HDRS is a subset of allhdrs
             if not reqset.issubset(allhdrs):
-                raise SigningError('The following required headers not present: %s'
-                                   % (b', '.join(reqset.difference(allhdrs)).decode()))
+                raise SigningError(
+                    'The following required headers not present: %s'
+                    % (b', '.join(reqset.difference(allhdrs)).decode())
+                )
             # Add optional headers that are actually present
             optpresent = list(allhdrs.intersection(optset))
             signlist = REQ_HDRS + sorted(optpresent)
@@ -261,8 +267,10 @@ class DevsigHeader:
             signlist = [x.strip() for x in hfield.split(b':')]
             # Make sure REQ_HEADERS are in this set
             if not reqset.issubset(set(signlist)):
-                raise ValidationError('The following required headers not signed: %s'
-                                      % (b', '.join(reqset.difference(set(signlist))).decode()))
+                raise ValidationError(
+                    'The following required headers not signed: %s'
+                    % (b', '.join(reqset.difference(set(signlist))).decode())
+                )
         else:
             raise RuntimeError('Unknown set_header mode: %s' % mode)
 
@@ -276,7 +284,9 @@ class DevsigHeader:
             at = 0
             for hname, rawval in list(parsed):
                 if hname == shname:
-                    self._headervals.append(hname + b':' + DevsigHeader._dkim_canonicalize_header(rawval))
+                    self._headervals.append(
+                        hname + b':' + DevsigHeader._dkim_canonicalize_header(rawval)
+                    )
                     parsed.pop(at)
                     break
                 at += 1
@@ -335,7 +345,9 @@ class DevsigHeader:
                 pubkey = keyinfo.encode()
             else:
                 pubkey = keyinfo
-            sdigest, (_, _, _, signkey, signtime) = DevsigHeader._validate_openpgp(bdata, pubkey)
+            sdigest, (_, _, _, signkey, signtime) = DevsigHeader._validate_openpgp(
+                bdata, pubkey
+            )
             if sdigest != vdigest:
                 raise ValidationError('Header validation failed')
             return signkey, signtime
@@ -349,7 +361,9 @@ class DevsigHeader:
             bkeyinfo = keyinfo.encode()
         else:
             # This cannot be None for any of the algorithms we support other than openpgp, done above
-            raise RuntimeError('keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__)
+            raise RuntimeError(
+                'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
+            )
 
         if algo.startswith('ed25519'):
             sdigest = DevsigHeader._validate_ed25519(bdata, bkeyinfo)
@@ -372,7 +386,9 @@ class DevsigHeader:
 
         return signkey, signtime
 
-    def sign(self, keyinfo: Union[str, bytes], split: bool = True) -> Tuple[bytes, bytes]:
+    def sign(
+        self, keyinfo: Union[str, bytes], split: bool = True
+    ) -> Tuple[bytes, bytes]:
         """Sign the message and generate signature header value.
 
         Args:
@@ -399,7 +415,9 @@ class DevsigHeader:
             skeyinfo = keyinfo
             bkeyinfo = keyinfo.encode()
         else:
-            raise RuntimeError('keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__)
+            raise RuntimeError(
+                'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
+            )
 
         hparts = list()
         for fn in self._order:
@@ -474,7 +492,9 @@ class DevsigHeader:
         sshkargs = ['-Y', 'sign', '-n', 'patatt', '-f', keypath]
         ecode, out, err = sshk_run_command(sshkargs, payload)
         if ecode > 0:
-            raise SigningError('Running ssh-keygen failed', errors=err.decode().strip().split('\n'))
+            raise SigningError(
+                'Running ssh-keygen failed', errors=err.decode().strip().split('\n')
+            )
         # Remove the header/footer
         sigdata = b''
         for bline in out.split(b'\n'):
@@ -486,7 +506,9 @@ class DevsigHeader:
             sshkargs = ['-l', '-f', keypath]
             ecode, out, err = sshk_run_command(sshkargs, payload)
             if ecode > 0:
-                raise SigningError('Running ssh-keygen failed', errors=err.decode().split('\n'))
+                raise SigningError(
+                    'Running ssh-keygen failed', errors=err.decode().split('\n')
+                )
             chunks = out.split()
             keyfp = chunks[1]
             KEYCACHE[keypath] = keyfp
@@ -503,17 +525,41 @@ class DevsigHeader:
             spath = os.path.join(td, 'sigdata')
             with open(fpath, 'wb') as fh:
                 chunks = keydata.split()
-                bcont = b'patatter@local namespaces="patatt" ' + chunks[0] + b' ' + chunks[1] + b'\n'
+                bcont = (
+                    b'patatter@local namespaces="patatt" '
+                    + chunks[0]
+                    + b' '
+                    + chunks[1]
+                    + b'\n'
+                )
                 logger.debug('allowed-signers: %s', bcont)
                 fh.write(bcont)
             with open(spath, 'wb') as fh:
-                bcont = b'-----BEGIN SSH SIGNATURE-----\n' + sigdata + b'\n-----END SSH SIGNATURE-----\n'
+                bcont = (
+                    b'-----BEGIN SSH SIGNATURE-----\n'
+                    + sigdata
+                    + b'\n-----END SSH SIGNATURE-----\n'
+                )
                 logger.debug('sigdata: %s', bcont)
                 fh.write(bcont)
-            sshkargs = ['-Y', 'verify', '-n', 'patatt', '-I', 'patatter@local', '-f', fpath, '-s', spath]
+            sshkargs = [
+                '-Y',
+                'verify',
+                '-n',
+                'patatt',
+                '-I',
+                'patatter@local',
+                '-f',
+                fpath,
+                '-s',
+                spath,
+            ]
             ecode, out, err = sshk_run_command(sshkargs, payload)
             if ecode > 0:
-                raise ValidationError('Failed to validate openssh signature', errors=err.decode().split('\n'))
+                raise ValidationError(
+                    'Failed to validate openssh signature',
+                    errors=err.decode().split('\n'),
+                )
 
     @staticmethod
     def _sign_openpgp(payload: bytes, keyid: str) -> Tuple[bytes, bytes]:
@@ -528,7 +574,9 @@ class DevsigHeader:
             gpgargs = ['--with-colons', '--fingerprint', keyid]
             ecode, out, err = gpg_run_command(gpgargs)
             if ecode > 0:
-                raise SigningError('Running gpg failed', errors=err.decode().split('\n'))
+                raise SigningError(
+                    'Running gpg failed', errors=err.decode().split('\n')
+                )
             pkid = None
             keyfp = None
             for line in out.split(b'\n'):
@@ -550,13 +598,21 @@ class DevsigHeader:
         return bdata, keyfp
 
     @staticmethod
-    def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, Tuple[bool, bool, bool, str, str]]:
+    def _validate_openpgp(
+        sigdata: bytes, pubkey: Optional[bytes]
+    ) -> Tuple[bytes, Tuple[bool, bool, bool, str, str]]:
         global KEYCACHE
         bsigdata = base64.b64decode(sigdata)
         vrfyargs = ['--verify', '--output', '-', '--status-fd=2']
         if pubkey:
             with tempfile.TemporaryDirectory(suffix='.patatt.gnupg') as td:
-                keyringargs = ['--homedir', td, '--no-default-keyring', '--keyring', 'pub']
+                keyringargs = [
+                    '--homedir',
+                    td,
+                    '--no-default-keyring',
+                    '--keyring',
+                    'pub',
+                ]
                 if pubkey in KEYCACHE:
                     logger.debug('Reusing cached keyring')
                     with open(os.path.join(td, 'pub'), 'wb') as kfh:
@@ -596,10 +652,16 @@ class DevsigHeader:
         signtime = ''
         signkey = ''
 
-        logger.debug('GNUPG status:\n\t%s', status.decode().strip().replace('\n', '\n\t'))
+        logger.debug(
+            'GNUPG status:\n\t%s', status.decode().strip().replace('\n', '\n\t')
+        )
         if re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M):
             good = True
-        if (vs_matches := re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)):
+        if vs_matches := re.search(
+            rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)',
+            status,
+            flags=re.M,
+        ):
             valid = True
             signkey = vs_matches.groups()[0].decode()
             signtime = vs_matches.groups()[2].decode()
@@ -698,7 +760,9 @@ class PatattMessage:
 
         # Generate a new payload using m and p and canonicalize with \r\n endings,
         # trimming any excess blank lines ("simple" DKIM canonicalization).
-        m, p, i = PatattMessage._get_git_mailinfo(b''.join(self.headers) + self.lf + self.body)
+        m, p, i = PatattMessage._get_git_mailinfo(
+            b''.join(self.headers) + self.lf + self.body
+        )
         self.canon_body = b''
         for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
             self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
@@ -718,14 +782,26 @@ class PatattMessage:
                 left, right = header.split(b':', 1)
                 lleft = left.lower()
                 if lleft == b'from':
-                    right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>'
+                    right = (
+                        b' '
+                        + idata.get(b'author', b'')
+                        + b' <'
+                        + idata.get(b'email', b'')
+                        + b'>'
+                    )
                 elif lleft == b'subject':
                     right = b' ' + idata.get(b'subject', b'')
                 self.canon_headers.append(left + b':' + right)
             except ValueError:
                 self.canon_headers.append(header)
 
-    def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None:
+    def sign(
+        self,
+        algo: str,
+        keyinfo: Union[str, bytes],
+        identity: Optional[str],
+        selector: Optional[str],
+    ) -> None:
         """Sign the message and add signature headers.
 
         Args:
@@ -751,7 +827,9 @@ class PatattMessage:
         ds.set_field('l', str(len(self.canon_body)))
         if not identity:
             if not self.canon_identity:
-                raise SigningError('No identity provided and no canonical identity available')
+                raise SigningError(
+                    'No identity provided and no canonical identity available'
+                )
             identity = self.canon_identity
         ds.set_field('i', identity)
         if selector:
@@ -766,7 +844,9 @@ class PatattMessage:
             ds.set_field('t', str(int(time.time())))
         hv, pkinfo = ds.sign(keyinfo)
 
-        dshdr = email.header.make_header([(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78)
+        dshdr = email.header.make_header(
+            [(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78
+        )
         self.headers.append(dshdr.encode().encode() + self.lf)
 
         # Make informational header about the key used
@@ -781,10 +861,14 @@ class PatattMessage:
         else:
             idata.append(b'pk=%s' % pkinfo)
 
-        dkhdr = email.header.make_header([(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78)
+        dkhdr = email.header.make_header(
+            [(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78
+        )
         self.headers.append(dkhdr.encode().encode() + self.lf)
 
-    def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> Tuple[str, str]:
+    def validate(
+        self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False
+    ) -> Tuple[str, str]:
         """Validate the signature for a specific identity.
 
         Args:
@@ -863,7 +947,7 @@ class PatattMessage:
                     break
 
                 # is it a wrapped header?
-                if line[0] in ("\x09", "\x20", 0x09, 0x20):
+                if line[0] in ('\x09', '\x20', 0x09, 0x20):
                     if not len(self.headers):
                         raise RuntimeError('Not a valid RFC2822 message')
                     # attach it to the previous header
@@ -950,19 +1034,23 @@ def get_data_dir() -> Path:
     return datadir
 
 
-def _run_command(cmdargs: List[str],
-                 stdin: Optional[bytes] = None,
-                 env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
+def _run_command(
+    cmdargs: List[str],
+    stdin: Optional[bytes] = None,
+    env: Optional[Dict[str, str]] = None,
+) -> Tuple[int, bytes, bytes]:
     logger.debug('Running %s', ' '.join(cmdargs))
     cp = subprocess.run(cmdargs, input=stdin, env=env, capture_output=True, text=False)
     logger.debug('Completed %s', repr(cp))
     return cp.returncode, cp.stdout, cp.stderr
 
 
-def git_run_command(gitdir: Optional[str],
-                    args: List[str],
-                    stdin: Optional[bytes] = None,
-                    env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
+def git_run_command(
+    gitdir: Optional[str],
+    args: List[str],
+    stdin: Optional[bytes] = None,
+    env: Optional[Dict[str, str]] = None,
+) -> Tuple[int, bytes, bytes]:
     if gitdir:
         args = ['git', '--git-dir', gitdir, '--no-pager'] + args
     else:
@@ -970,10 +1058,12 @@ def git_run_command(gitdir: Optional[str],
     return _run_command(args, stdin=stdin, env=env)
 
 
-def get_config_from_git(regexp: str,
-                        section: Optional[str] = None,
-                        defaults: Optional[Dict[str, Union[str, List[str]]]] = None,
-                        multivals: Optional[List[str]] = None) -> GitConfigType:
+def get_config_from_git(
+    regexp: str,
+    section: Optional[str] = None,
+    defaults: Optional[Dict[str, Union[str, List[str]]]] = None,
+    multivals: Optional[List[str]] = None,
+) -> GitConfigType:
     if multivals is None:
         multivals = list()
 
@@ -1027,13 +1117,22 @@ def get_config_from_git(regexp: str,
     return gitconfig
 
 
-def gpg_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
+def gpg_run_command(
+    cmdargs: List[str], stdin: Optional[bytes] = None
+) -> Tuple[int, bytes, bytes]:
     gpgbin, _ = set_bin_paths(None)
-    cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs
+    cmdargs = [
+        gpgbin,
+        '--batch',
+        '--no-auto-key-retrieve',
+        '--no-auto-check-trustdb',
+    ] + cmdargs
     return _run_command(cmdargs, stdin)
 
 
-def sshk_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
+def sshk_run_command(
+    cmdargs: List[str], stdin: Optional[bytes] = None
+) -> Tuple[int, bytes, bytes]:
     _, sshkbin = set_bin_paths(None)
     cmdargs = [sshkbin] + cmdargs
     return _run_command(cmdargs, stdin)
@@ -1079,8 +1178,12 @@ def make_pkey_path(keytype: str, identity: str, selector: str) -> Path:
     domain = chunks[1].lower()
     selector = selector.lower()
     # urlencode all potentially untrusted bits to make sure nobody tries path-based badness
-    return Path(urllib.parse.quote_plus(keytype), urllib.parse.quote_plus(domain),
-                urllib.parse.quote_plus(local), urllib.parse.quote_plus(selector))
+    return Path(
+        urllib.parse.quote_plus(keytype),
+        urllib.parse.quote_plus(domain),
+        urllib.parse.quote_plus(local),
+        urllib.parse.quote_plus(selector),
+    )
 
 
 def make_byhash_path(keytype: str, identity: str, selector: str) -> Path:
@@ -1105,7 +1208,9 @@ def make_byhash_path(keytype: str, identity: str, selector: str) -> Path:
     return Path('by-hash', prefix, remainder)
 
 
-def get_public_key(source: str, keytype: str, identity: str, selector: str) -> Tuple[bytes, str]:
+def get_public_key(
+    source: str, keytype: str, identity: str, selector: str
+) -> Tuple[bytes, str]:
     """Look up a public key from a keyring source.
 
     Searches for the key at the standard path first, then falls back to
@@ -1133,7 +1238,9 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
         # split by :
         parts = source.split(':', 4)
         if len(parts) < 4:
-            raise ConfigurationError('Invalid ref, must have at least 3 colons: %s' % source)
+            raise ConfigurationError(
+                'Invalid ref, must have at least 3 colons: %s' % source
+            )
         gitrepo = parts[1]
         gitref = parts[2]
         gitsub = parts[3]
@@ -1170,7 +1277,9 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
             # Handle one level of symlinks
             if out.find(b'\n') < 0 < out.find(b'/'):
                 # Check this path as well
-                linktgt = os.path.normpath(os.path.join(os.path.dirname(subpath), out.decode()))
+                linktgt = os.path.normpath(
+                    os.path.join(os.path.dirname(subpath), out.decode())
+                )
                 keysrc = f'{gitref}:{linktgt}'
                 cmdargs = ['show', keysrc]
                 ecode, out, err = git_run_command(gittop, cmdargs)
@@ -1230,6 +1339,7 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
 
 def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
     import sys
+
     if len(cmdargs.msgfile):
         # Load all message from the files passed to make sure they all parse correctly
         messages = dict()
@@ -1239,17 +1349,21 @@ def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
     elif not sys.stdin.isatty():
         messages = {'-': sys.stdin.buffer.read()}
     else:
-        logger.critical('E: Pipe a message to sign or pass filenames with individual messages')
+        logger.critical(
+            'E: Pipe a message to sign or pass filenames with individual messages'
+        )
         raise RuntimeError('Nothing to do')
 
     return messages
 
 
-def sign_message(msgdata: bytes,
-                 algo: str,
-                 keyinfo: Union[str, bytes],
-                 identity: Optional[str],
-                 selector: Optional[str]) -> bytes:
+def sign_message(
+    msgdata: bytes,
+    algo: str,
+    keyinfo: Union[str, bytes],
+    identity: Optional[str],
+    selector: Optional[str],
+) -> bytes:
     """Sign an RFC2822 message and return the signed message bytes.
 
     Args:
@@ -1284,7 +1398,9 @@ def set_bin_paths(config: Optional[GitConfigType]) -> Tuple[str, str]:
             _sshkbin = config.get('ssh-keygen-bin')
             assert isinstance(_sshkbin, str), 'ssh-keygen-bin must be a string'
             SSHKBIN = _sshkbin
-        elif (_sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')) is not None:
+        elif (
+            _sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')
+        ) is not None:
             assert isinstance(_sshkbin, str), 'program must be a string'
             SSHKBIN = _sshkbin
         else:
@@ -1302,7 +1418,9 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
     # Do we have this already looked up?
     identity = config.get('identity')
     if not isinstance(identity, str):
-        raise ConfigurationError('Identity must be a string, got %s' % type(identity).__name__)
+        raise ConfigurationError(
+            'Identity must be a string, got %s' % type(identity).__name__
+        )
 
     if identity in KEYCACHE:
         algo, keydata = KEYCACHE[identity]
@@ -1313,7 +1431,11 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
         if user_signingkey:
             gpg_format = get_config_from_git(r'gpg\..*').get('format', 'gpg')
             key_algo = 'openssh' if gpg_format == 'ssh' else 'openpgp'
-            logger.info('N: Using %s key %s defined by user.signingkey', key_algo, user_signingkey)
+            logger.info(
+                'N: Using %s key %s defined by user.signingkey',
+                key_algo,
+                user_signingkey,
+            )
             logger.info('N: Override by setting patatt.signingkey')
             config['signingkey'] = '%s:%s' % (key_algo, user_signingkey)
         else:
@@ -1323,7 +1445,9 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
 
     sk = config.get('signingkey')
     if not isinstance(sk, str):
-        raise ConfigurationError('Signing key must be a string, got %s' % type(sk).__name__)
+        raise ConfigurationError(
+            'Signing key must be a string, got %s' % type(sk).__name__
+        )
     if sk.startswith('ed25519:'):
         algo = 'ed25519'
         identifier = sk[8:]
@@ -1338,7 +1462,7 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
                 keysrc = str(skey)
             else:
                 # finally, try .git/%s.key
-                if (gtdir := get_git_toplevel()):
+                if gtdir := get_git_toplevel():
                     skey = Path(gtdir) / '.git' / f'{identifier}.key'
                     if skey.exists():
                         keysrc = str(skey)
@@ -1397,12 +1521,18 @@ def get_main_config(section: Optional[str] = None) -> GitConfigType:
         csection = 'default'
     if csection in CONFIGCACHE:
         return CONFIGCACHE[csection]
-    config = get_config_from_git(r'patatt\..*', section=section, multivals=['keyringsrc'])
+    config = get_config_from_git(
+        r'patatt\..*', section=section, multivals=['keyringsrc']
+    )
     # Append some extra keyring locations
     if 'keyringsrc' not in config or not isinstance(config['keyringsrc'], list):
         config['keyringsrc'] = list()
-    assert isinstance(config['keyringsrc'], list) # Just to make lint checkers shut up
-    config['keyringsrc'] += ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
+    assert isinstance(config['keyringsrc'], list)  # Just to make lint checkers shut up
+    config['keyringsrc'] += [
+        'ref:::.keys',
+        'ref:::.local-keys',
+        'ref::refs/meta/keyring:',
+    ]
     set_bin_paths(config)
     logger.debug('config: %s', config)
     CONFIGCACHE[csection] = config
@@ -1442,10 +1572,11 @@ def cmd_sign(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
             sys.exit(1)
 
 
-def validate_message(msgdata: bytes,
-                     sources: List[str],
-                     trim_body: bool = False
-                     ) -> List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]]:
+def validate_message(
+    msgdata: bytes, sources: List[str], trim_body: bool = False
+) -> List[
+    Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]
+]:
     """Validate all signatures in an RFC2822 message.
 
     Args:
@@ -1459,11 +1590,17 @@ def validate_message(msgdata: bytes,
 
         Result codes: RES_VALID, RES_BADSIG, RES_NOKEY, RES_NOSIG, RES_ERROR
     """
-    attestations: List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]] = list()
+    attestations: List[
+        Tuple[
+            int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]
+        ]
+    ] = list()
     pm = PatattMessage(msgdata)
     if not pm.signed:
         logger.debug('message is not signed')
-        attestations.append((RES_NOSIG, None, None, None, None, ['no signatures found']))
+        attestations.append(
+            (RES_NOSIG, None, None, None, None, ['no signatures found'])
+        )
         return attestations
 
     # Find all identities for which we have public keys
@@ -1530,6 +1667,7 @@ def validate_message(msgdata: bytes,
 
 def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
     import mailbox
+
     if len(cmdargs.msgfile) == 1:
         # Try to open as an mbox file
         try:
@@ -1699,36 +1837,73 @@ def command() -> None:
     parser = argparse.ArgumentParser(
         prog='patatt',
         description='Cryptographically attest patches before sending out',
-        formatter_class=argparse.ArgumentDefaultsHelpFormatter
+        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+    )
+    parser.add_argument(
+        '-v',
+        '--verbose',
+        action='store_true',
+        default=False,
+        help='Be a bit more verbose',
+    )
+    parser.add_argument(
+        '-d',
+        '--debug',
+        action='store_true',
+        default=False,
+        help='Show debugging output',
+    )
+    parser.add_argument(
+        '-s',
+        '--section',
+        dest='section',
+        default=None,
+        help='Use config section [patatt "sectionname"]',
     )
-    parser.add_argument('-v', '--verbose', action='store_true', default=False,
-                        help='Be a bit more verbose')
-    parser.add_argument('-d', '--debug', action='store_true', default=False,
-                        help='Show debugging output')
-    parser.add_argument('-s', '--section', dest='section', default=None,
-                        help='Use config section [patatt "sectionname"]')
     parser.add_argument('--version', action='version', version=__VERSION__)
 
     subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd')
 
-    sp_sign = subparsers.add_parser('sign', help='Cryptographically attest an RFC2822 message')
-    sp_sign.add_argument('--hook', dest='hookmode', action='store_true', default=False,
-                         help='Git hook mode')
+    sp_sign = subparsers.add_parser(
+        'sign', help='Cryptographically attest an RFC2822 message'
+    )
+    sp_sign.add_argument(
+        '--hook',
+        dest='hookmode',
+        action='store_true',
+        default=False,
+        help='Git hook mode',
+    )
     sp_sign.add_argument('msgfile', nargs='*', help='RFC2822 message files to sign')
     sp_sign.set_defaults(func=cmd_sign)
 
     sp_val = subparsers.add_parser('validate', help='Validate a devsig-signed message')
-    sp_val.add_argument('msgfile', nargs='*', help='Individual signed message files to validate or an mbox')
+    sp_val.add_argument(
+        'msgfile',
+        nargs='*',
+        help='Individual signed message files to validate or an mbox',
+    )
     sp_val.set_defaults(func=cmd_validate)
 
     sp_gen = subparsers.add_parser('genkey', help='Generate a new ed25519 keypair')
-    sp_gen.add_argument('-n', '--keyname', default=None,
-                        help='Name to use for the key, e.g. "workstation", or "default"')
-    sp_gen.add_argument('-f', '--force', action='store_true', default=False,
-                        help='Overwrite any existing keys, if found')
+    sp_gen.add_argument(
+        '-n',
+        '--keyname',
+        default=None,
+        help='Name to use for the key, e.g. "workstation", or "default"',
+    )
+    sp_gen.add_argument(
+        '-f',
+        '--force',
+        action='store_true',
+        default=False,
+        help='Overwrite any existing keys, if found',
+    )
     sp_gen.set_defaults(func=cmd_genkey)
 
-    sp_install = subparsers.add_parser('install-hook', help='Install sendmail-validate hook into the current repo')
+    sp_install = subparsers.add_parser(
+        'install-hook', help='Install sendmail-validate hook into the current repo'
+    )
     sp_install.set_defaults(func=cmd_install_hook)
 
     _args = parser.parse_args()
diff --git a/tests/conftest.py b/tests/conftest.py
index 8bb6064..a2d2124 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -18,6 +18,7 @@ Message-ID: <12345@example.com>
 This is a test email body.
 """
 
+
 @pytest.fixture
 def temp_data_dir() -> Generator[str, None, None]:
     """Create a temporary data directory structure for patatt."""
@@ -32,23 +33,26 @@ def temp_data_dir() -> Generator[str, None, None]:
         # Return path to the temp directory
         yield tmpdirname
 
+
 @pytest.fixture
 def devsig_header() -> DevsigHeader:
     """Create a basic DevsigHeader instance."""
     return DevsigHeader()
 
+
 @pytest.fixture
 def patatt_message(sample_email_bytes: bytes) -> PatattMessage:
     """Create a PatattMessage from a sample email."""
     return PatattMessage(sample_email_bytes)
 
+
 @pytest.fixture
 def sample_ed25519_key_pair() -> Dict[str, bytes]:
     """Generate a sample ed25519 key pair for testing."""
     try:
         from nacl.signing import SigningKey
     except ImportError:
-        pytest.skip("PyNaCl not installed, skipping ed25519 tests")
+        pytest.skip('PyNaCl not installed, skipping ed25519 tests')
 
     # Generate a key pair
     private_key = SigningKey.generate()
@@ -57,5 +61,5 @@ def sample_ed25519_key_pair() -> Dict[str, bytes]:
     # Return base64 encoded keys
     return {
         'private': base64.b64encode(bytes(private_key)),
-        'public': base64.b64encode(public_key.encode())
-    }
\ No newline at end of file
+        'public': base64.b64encode(public_key.encode()),
+    }
diff --git a/tests/test_validation.py b/tests/test_validation.py
index d51be81..1477fbe 100644
--- a/tests/test_validation.py
+++ b/tests/test_validation.py
@@ -6,11 +6,9 @@ import pytest
 from patatt import RES_VALID, validate_message
 
 
-@pytest.mark.parametrize("sample_file", [
-    "ed25519-signed.txt",
-    "pgp-signed.txt",
-    "openssh-signed.txt"
-])
+@pytest.mark.parametrize(
+    'sample_file', ['ed25519-signed.txt', 'pgp-signed.txt', 'openssh-signed.txt']
+)
 def test_validate(sample_file: str) -> None:
     """Test validation of an ed25519 signed message from samples directory."""
     # Path to the sample file
@@ -29,14 +27,14 @@ def test_validate(sample_file: str) -> None:
     results = validate_message(signed_data, [sources_path])
 
     # Check validation results
-    assert results, "Validation should return results"
+    assert results, 'Validation should return results'
 
     # At least one valid signature should be found
     valid_signatures = [r for r in results if r[0] == RES_VALID]
-    assert valid_signatures, "Should find at least one valid signature"
+    assert valid_signatures, 'Should find at least one valid signature'
 
     # Print validation details for debugging
-    print(f"Found {len(valid_signatures)} valid signatures:")
+    print(f'Found {len(valid_signatures)} valid signatures:')
     for result in valid_signatures:
         status, algo, keytype, identity, selector, errors = result
-        print(f"  - {keytype} signature by {identity} ({selector})")
+        print(f'  - {keytype} signature by {identity} ({selector})')
diff --git a/tests/unit/test_byhash.py b/tests/unit/test_byhash.py
index c3fe4d2..e90ee82 100644
--- a/tests/unit/test_byhash.py
+++ b/tests/unit/test_byhash.py
@@ -8,7 +8,6 @@ from patatt import get_public_key, make_byhash_path, make_pkey_path
 
 
 class TestMakeByhashPath:
-
     def test_basic_hash_computation(self) -> None:
         """Test that make_byhash_path computes the correct hash."""
         keytype = 'openssh'
@@ -42,7 +41,6 @@ class TestMakeByhashPath:
 
 
 class TestGetPublicKeyByHash:
-
     def test_filesystem_byhash_lookup(self) -> None:
         """Test that get_public_key finds a key via by-hash fallback."""
         with tempfile.TemporaryDirectory() as tmpdir:
diff --git a/tests/unit/test_devsig_header.py b/tests/unit/test_devsig_header.py
index 547593d..7b6e461 100644
--- a/tests/unit/test_devsig_header.py
+++ b/tests/unit/test_devsig_header.py
@@ -8,7 +8,6 @@ from patatt import DevsigHeader
 
 
 class TestDevsigHeader:
-
     def test_initialization(self) -> None:
         """Test that DevsigHeader initializes correctly."""
         header = DevsigHeader()
@@ -17,7 +16,9 @@ class TestDevsigHeader:
 
     def test_from_bytes(self) -> None:
         """Test parsing a header from bytes."""
-        header_bytes = b'v=1; a=ed25519-sha256; t=1623456789; i=test@example.com; bh=abcd1234'
+        header_bytes = (
+            b'v=1; a=ed25519-sha256; t=1623456789; i=test@example.com; bh=abcd1234'
+        )
         header = DevsigHeader(header_bytes)
 
         assert header.get_field_as_str('v') == '1'
@@ -41,7 +42,7 @@ class TestDevsigHeader:
     def test_set_body(self) -> None:
         """Test setting the body and calculating the body hash."""
         header = DevsigHeader()
-        body = b"This is a test body"
+        body = b'This is a test body'
 
         header.set_body(body)
 
@@ -55,7 +56,7 @@ class TestDevsigHeader:
     def test_set_body_with_maxlen(self) -> None:
         """Test setting the body with a maxlen parameter."""
         header = DevsigHeader()
-        body = b"This is a test body"
+        body = b'This is a test body'
         maxlen = 10
 
         header.set_body(body, maxlen=maxlen)
@@ -95,17 +96,17 @@ class TestDevsigHeader:
         """Test that sanity_check fails if required fields are not set."""
         header = DevsigHeader()
 
-        with pytest.raises(RuntimeError, match="Must set \"a\" field first"):
+        with pytest.raises(RuntimeError, match='Must set "a" field first'):
             header.sanity_check()
 
         header.set_field('a', 'ed25519-sha256')
 
-        with pytest.raises(RuntimeError, match="Must use set_body first"):
+        with pytest.raises(RuntimeError, match='Must use set_body first'):
             header.sanity_check()
 
-        header.set_body(b"Test body")
+        header.set_body(b'Test body')
 
-        with pytest.raises(RuntimeError, match="Must use set_headers first"):
+        with pytest.raises(RuntimeError, match='Must use set_headers first'):
             header.sanity_check()
 
     # @pytest.mark.skipif(True, reason="Requires actual ed25519 keys")
diff --git a/tests/unit/test_get_algo_keydata.py b/tests/unit/test_get_algo_keydata.py
index 545f051..c4ccc8d 100644
--- a/tests/unit/test_get_algo_keydata.py
+++ b/tests/unit/test_get_algo_keydata.py
@@ -41,7 +41,10 @@ class TestGetAlgoKeydataSSHSigningKey:
     def test_ssh_format_uses_openssh(self, mock_gcfg: MagicMock) -> None:
         """When gpg.format=ssh, user.signingkey should get the openssh: prefix."""
         mock_gcfg.side_effect = _make_mock_get_config(
-            usercfg={'email': 'test@example.com', 'signingkey': '/home/user/.ssh/id_ed25519.pub'},
+            usercfg={
+                'email': 'test@example.com',
+                'signingkey': '/home/user/.ssh/id_ed25519.pub',
+            },
             gpgcfg={'format': 'ssh'},
         )
         config: GitConfigType = {'identity': 'test@example.com'}
@@ -91,7 +94,9 @@ class TestGetAlgoKeydataSSHSigningKey:
             get_algo_keydata(config)
 
     @patch('patatt.get_config_from_git')
-    def test_patatt_signingkey_skips_user_signingkey(self, mock_gcfg: MagicMock) -> None:
+    def test_patatt_signingkey_skips_user_signingkey(
+        self, mock_gcfg: MagicMock
+    ) -> None:
         """When patatt.signingkey is already set, user.signingkey is not consulted."""
         mock_gcfg.side_effect = _make_mock_get_config(
             usercfg={'email': 'test@example.com', 'signingkey': 'SHOULD_NOT_BE_USED'},
diff --git a/tests/unit/test_patatt_message.py b/tests/unit/test_patatt_message.py
index 58338f4..17d4a30 100644
--- a/tests/unit/test_patatt_message.py
+++ b/tests/unit/test_patatt_message.py
@@ -6,7 +6,6 @@ from patatt import PatattMessage
 
 
 class TestPatattMessage:
-
     def test_initialization(self, sample_email_bytes: bytes) -> None:
         """Test initialization of PatattMessage with sample email."""
         message = PatattMessage(sample_email_bytes)
@@ -25,7 +24,7 @@ This is a test body.
         message = PatattMessage(email_bytes)
 
         assert len(message.headers) == 2
-        assert message.body == b"This is a test body.\n"
+        assert message.body == b'This is a test body.\n'
 
     def test_as_bytes(self, sample_email_bytes: bytes) -> None:
         """Test converting message back to bytes."""
@@ -44,17 +43,22 @@ This is a test body.
         assert isinstance(output, str)
         assert output == sample_email_bytes.decode()
 
-    def test_git_canonicalize(self, monkeypatch: pytest.MonkeyPatch, sample_email_bytes: bytes) -> None:
+    def test_git_canonicalize(
+        self, monkeypatch: pytest.MonkeyPatch, sample_email_bytes: bytes
+    ) -> None:
         """Test git canonicalization of message."""
+
         # Mock _get_git_mailinfo to avoid actual git command execution
         def mock_get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]:
             # Return mock metadata, patch, and info
-            metadata = b"Author: Test User\nEmail: test@example.com\nSubject: Test email\n"
-            patch = b"This is a test body.\n"
-            info = b"email: test@example.com\nauthor: Test User\nsubject: Test email\n"
+            metadata = (
+                b'Author: Test User\nEmail: test@example.com\nSubject: Test email\n'
+            )
+            patch = b'This is a test body.\n'
+            info = b'email: test@example.com\nauthor: Test User\nsubject: Test email\n'
             return metadata, patch, info
 
-        monkeypatch.setattr(PatattMessage, "_get_git_mailinfo", mock_get_git_mailinfo)
+        monkeypatch.setattr(PatattMessage, '_get_git_mailinfo', mock_get_git_mailinfo)
 
         message = PatattMessage(sample_email_bytes)
         message.git_canonicalize()
@@ -62,7 +66,7 @@ This is a test body.
         # Check that the message was canonicalized
         assert message.canon_body is not None
         assert message.canon_headers is not None
-        assert message.canon_identity == "test@example.com"
+        assert message.canon_identity == 'test@example.com'
 
     # @pytest.mark.skipif(True, reason="Requires actual signing setup")
     # def test_sign(self) -> None:

-- 
2.53.0


  parent reply	other threads:[~2026-04-20  1:23 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-20  1:22 [PATCH patatt 0/7] Harden local checks Tamir Duberstein
2026-04-20  1:22 ` [PATCH patatt 1/7] Add local CI script Tamir Duberstein
2026-04-20  1:22 ` [PATCH patatt 2/7] Add Ruff import checks Tamir Duberstein
2026-04-20  1:22 ` Tamir Duberstein [this message]
2026-04-20  1:22 ` [PATCH patatt 4/7] Add pyright strict checks Tamir Duberstein
2026-04-20  1:22 ` [PATCH patatt 5/7] Add ty checks Tamir Duberstein
2026-04-20  1:22 ` [PATCH patatt 6/7] Reduce dictionary lookups Tamir Duberstein
2026-04-20  1:22 ` [PATCH patatt 7/7] Import PyNaCl unconditionally Tamir Duberstein
2026-04-27 20:20 ` [PATCH patatt 0/7] Harden local checks Tamir Duberstein

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260419-stronger-type-checking-v1-3-5c108048d2c7@kernel.org \
    --to=tamird@kernel.org \
    --cc='test@example.com'} \
    --cc=konstantin@linuxfoundation.org \
    --cc=tools@kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.