From: Tamir Duberstein <tamird@kernel.org>
To: "Kernel.org Tools" <tools@kernel.org>
Cc: Konstantin Ryabitsev <konstantin@linuxfoundation.org>,
Tamir Duberstein <tamird@kernel.org>, 'test@example.com'}
Subject: [PATCH patatt 3/7] Add Ruff format check
Date: Sun, 19 Apr 2026 21:22:23 -0400 [thread overview]
Message-ID: <20260419-stronger-type-checking-v1-3-5c108048d2c7@kernel.org> (raw)
In-Reply-To: <20260419-stronger-type-checking-v1-0-5c108048d2c7@kernel.org>
Run Ruff's formatter as part of the local CI helper and configure it to
preserve the project's single-quote style.
Apply the one-time formatting pass so the new check starts from a green
baseline.
Signed-off-by: Tamir Duberstein <tamird@kernel.org>
---
ci.sh | 1 +
conftest.py | 3 +-
pyproject.toml | 3 +
src/patatt/__init__.py | 345 +++++++++++++++++++++++++++---------
tests/conftest.py | 10 +-
tests/test_validation.py | 16 +-
tests/unit/test_byhash.py | 2 -
| 17 +-
tests/unit/test_get_algo_keydata.py | 9 +-
tests/unit/test_patatt_message.py | 20 ++-
10 files changed, 307 insertions(+), 119 deletions(-)
diff --git a/ci.sh b/ci.sh
index 3001db2..ec0baf8 100755
--- a/ci.sh
+++ b/ci.sh
@@ -2,6 +2,7 @@
set -eu
+uv run ruff format --check
uv run ruff check
uv run mypy .
uv run pytest --durations=0
diff --git a/conftest.py b/conftest.py
index c106f53..33276ab 100644
--- a/conftest.py
+++ b/conftest.py
@@ -3,7 +3,6 @@ from pathlib import Path
# Add the src directory to the Python path
project_root = Path(__file__).parent
-src_path = str(project_root / "src")
+src_path = str(project_root / 'src')
if src_path not in sys.path:
sys.path.insert(0, src_path)
-
diff --git a/pyproject.toml b/pyproject.toml
index 7583cd4..d56d828 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -58,6 +58,9 @@ extend-select = [
]
flake8-quotes.inline-quotes = "single"
+[tool.ruff.format]
+quote-style = "single"
+
[tool.pyright]
typeCheckingMode = "off"
diff --git a/src/patatt/__init__.py b/src/patatt/__init__.py
index 935da55..24ec933 100644
--- a/src/patatt/__init__.py
+++ b/src/patatt/__init__.py
@@ -55,6 +55,7 @@ CONFIGCACHE: Dict[str, GitConfigType] = dict()
__VERSION__ = '0.7.1'
MAX_SUPPORTED_FORMAT_VERSION = 1
+
class Error(Exception):
"""Base exception for patatt errors.
@@ -171,8 +172,11 @@ class DevsigHeader:
.. deprecated::
Use :meth:`get_field_as_bytes` or :meth:`get_field_as_str` instead.
"""
- warnings.warn('get_field() is deprecated, use get_field_as_bytes() or get_field_as_str() instead',
- DeprecationWarning, stacklevel=2)
+ warnings.warn(
+ 'get_field() is deprecated, use get_field_as_bytes() or get_field_as_str() instead',
+ DeprecationWarning,
+ stacklevel=2,
+ )
value = self.hdata.get(field)
if isinstance(value, bytes) and decode:
return value.decode()
@@ -247,8 +251,10 @@ class DevsigHeader:
if mode == 'sign':
# Make sure REQ_HDRS is a subset of allhdrs
if not reqset.issubset(allhdrs):
- raise SigningError('The following required headers not present: %s'
- % (b', '.join(reqset.difference(allhdrs)).decode()))
+ raise SigningError(
+ 'The following required headers not present: %s'
+ % (b', '.join(reqset.difference(allhdrs)).decode())
+ )
# Add optional headers that are actually present
optpresent = list(allhdrs.intersection(optset))
signlist = REQ_HDRS + sorted(optpresent)
@@ -261,8 +267,10 @@ class DevsigHeader:
signlist = [x.strip() for x in hfield.split(b':')]
# Make sure REQ_HEADERS are in this set
if not reqset.issubset(set(signlist)):
- raise ValidationError('The following required headers not signed: %s'
- % (b', '.join(reqset.difference(set(signlist))).decode()))
+ raise ValidationError(
+ 'The following required headers not signed: %s'
+ % (b', '.join(reqset.difference(set(signlist))).decode())
+ )
else:
raise RuntimeError('Unknown set_header mode: %s' % mode)
@@ -276,7 +284,9 @@ class DevsigHeader:
at = 0
for hname, rawval in list(parsed):
if hname == shname:
- self._headervals.append(hname + b':' + DevsigHeader._dkim_canonicalize_header(rawval))
+ self._headervals.append(
+ hname + b':' + DevsigHeader._dkim_canonicalize_header(rawval)
+ )
parsed.pop(at)
break
at += 1
@@ -335,7 +345,9 @@ class DevsigHeader:
pubkey = keyinfo.encode()
else:
pubkey = keyinfo
- sdigest, (_, _, _, signkey, signtime) = DevsigHeader._validate_openpgp(bdata, pubkey)
+ sdigest, (_, _, _, signkey, signtime) = DevsigHeader._validate_openpgp(
+ bdata, pubkey
+ )
if sdigest != vdigest:
raise ValidationError('Header validation failed')
return signkey, signtime
@@ -349,7 +361,9 @@ class DevsigHeader:
bkeyinfo = keyinfo.encode()
else:
# This cannot be None for any of the algorithms we support other than openpgp, done above
- raise RuntimeError('keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__)
+ raise RuntimeError(
+ 'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
+ )
if algo.startswith('ed25519'):
sdigest = DevsigHeader._validate_ed25519(bdata, bkeyinfo)
@@ -372,7 +386,9 @@ class DevsigHeader:
return signkey, signtime
- def sign(self, keyinfo: Union[str, bytes], split: bool = True) -> Tuple[bytes, bytes]:
+ def sign(
+ self, keyinfo: Union[str, bytes], split: bool = True
+ ) -> Tuple[bytes, bytes]:
"""Sign the message and generate signature header value.
Args:
@@ -399,7 +415,9 @@ class DevsigHeader:
skeyinfo = keyinfo
bkeyinfo = keyinfo.encode()
else:
- raise RuntimeError('keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__)
+ raise RuntimeError(
+ 'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
+ )
hparts = list()
for fn in self._order:
@@ -474,7 +492,9 @@ class DevsigHeader:
sshkargs = ['-Y', 'sign', '-n', 'patatt', '-f', keypath]
ecode, out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
- raise SigningError('Running ssh-keygen failed', errors=err.decode().strip().split('\n'))
+ raise SigningError(
+ 'Running ssh-keygen failed', errors=err.decode().strip().split('\n')
+ )
# Remove the header/footer
sigdata = b''
for bline in out.split(b'\n'):
@@ -486,7 +506,9 @@ class DevsigHeader:
sshkargs = ['-l', '-f', keypath]
ecode, out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
- raise SigningError('Running ssh-keygen failed', errors=err.decode().split('\n'))
+ raise SigningError(
+ 'Running ssh-keygen failed', errors=err.decode().split('\n')
+ )
chunks = out.split()
keyfp = chunks[1]
KEYCACHE[keypath] = keyfp
@@ -503,17 +525,41 @@ class DevsigHeader:
spath = os.path.join(td, 'sigdata')
with open(fpath, 'wb') as fh:
chunks = keydata.split()
- bcont = b'patatter@local namespaces="patatt" ' + chunks[0] + b' ' + chunks[1] + b'\n'
+ bcont = (
+ b'patatter@local namespaces="patatt" '
+ + chunks[0]
+ + b' '
+ + chunks[1]
+ + b'\n'
+ )
logger.debug('allowed-signers: %s', bcont)
fh.write(bcont)
with open(spath, 'wb') as fh:
- bcont = b'-----BEGIN SSH SIGNATURE-----\n' + sigdata + b'\n-----END SSH SIGNATURE-----\n'
+ bcont = (
+ b'-----BEGIN SSH SIGNATURE-----\n'
+ + sigdata
+ + b'\n-----END SSH SIGNATURE-----\n'
+ )
logger.debug('sigdata: %s', bcont)
fh.write(bcont)
- sshkargs = ['-Y', 'verify', '-n', 'patatt', '-I', 'patatter@local', '-f', fpath, '-s', spath]
+ sshkargs = [
+ '-Y',
+ 'verify',
+ '-n',
+ 'patatt',
+ '-I',
+ 'patatter@local',
+ '-f',
+ fpath,
+ '-s',
+ spath,
+ ]
ecode, out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
- raise ValidationError('Failed to validate openssh signature', errors=err.decode().split('\n'))
+ raise ValidationError(
+ 'Failed to validate openssh signature',
+ errors=err.decode().split('\n'),
+ )
@staticmethod
def _sign_openpgp(payload: bytes, keyid: str) -> Tuple[bytes, bytes]:
@@ -528,7 +574,9 @@ class DevsigHeader:
gpgargs = ['--with-colons', '--fingerprint', keyid]
ecode, out, err = gpg_run_command(gpgargs)
if ecode > 0:
- raise SigningError('Running gpg failed', errors=err.decode().split('\n'))
+ raise SigningError(
+ 'Running gpg failed', errors=err.decode().split('\n')
+ )
pkid = None
keyfp = None
for line in out.split(b'\n'):
@@ -550,13 +598,21 @@ class DevsigHeader:
return bdata, keyfp
@staticmethod
- def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, Tuple[bool, bool, bool, str, str]]:
+ def _validate_openpgp(
+ sigdata: bytes, pubkey: Optional[bytes]
+ ) -> Tuple[bytes, Tuple[bool, bool, bool, str, str]]:
global KEYCACHE
bsigdata = base64.b64decode(sigdata)
vrfyargs = ['--verify', '--output', '-', '--status-fd=2']
if pubkey:
with tempfile.TemporaryDirectory(suffix='.patatt.gnupg') as td:
- keyringargs = ['--homedir', td, '--no-default-keyring', '--keyring', 'pub']
+ keyringargs = [
+ '--homedir',
+ td,
+ '--no-default-keyring',
+ '--keyring',
+ 'pub',
+ ]
if pubkey in KEYCACHE:
logger.debug('Reusing cached keyring')
with open(os.path.join(td, 'pub'), 'wb') as kfh:
@@ -596,10 +652,16 @@ class DevsigHeader:
signtime = ''
signkey = ''
- logger.debug('GNUPG status:\n\t%s', status.decode().strip().replace('\n', '\n\t'))
+ logger.debug(
+ 'GNUPG status:\n\t%s', status.decode().strip().replace('\n', '\n\t')
+ )
if re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M):
good = True
- if (vs_matches := re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)):
+ if vs_matches := re.search(
+ rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)',
+ status,
+ flags=re.M,
+ ):
valid = True
signkey = vs_matches.groups()[0].decode()
signtime = vs_matches.groups()[2].decode()
@@ -698,7 +760,9 @@ class PatattMessage:
# Generate a new payload using m and p and canonicalize with \r\n endings,
# trimming any excess blank lines ("simple" DKIM canonicalization).
- m, p, i = PatattMessage._get_git_mailinfo(b''.join(self.headers) + self.lf + self.body)
+ m, p, i = PatattMessage._get_git_mailinfo(
+ b''.join(self.headers) + self.lf + self.body
+ )
self.canon_body = b''
for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
@@ -718,14 +782,26 @@ class PatattMessage:
left, right = header.split(b':', 1)
lleft = left.lower()
if lleft == b'from':
- right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>'
+ right = (
+ b' '
+ + idata.get(b'author', b'')
+ + b' <'
+ + idata.get(b'email', b'')
+ + b'>'
+ )
elif lleft == b'subject':
right = b' ' + idata.get(b'subject', b'')
self.canon_headers.append(left + b':' + right)
except ValueError:
self.canon_headers.append(header)
- def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None:
+ def sign(
+ self,
+ algo: str,
+ keyinfo: Union[str, bytes],
+ identity: Optional[str],
+ selector: Optional[str],
+ ) -> None:
"""Sign the message and add signature headers.
Args:
@@ -751,7 +827,9 @@ class PatattMessage:
ds.set_field('l', str(len(self.canon_body)))
if not identity:
if not self.canon_identity:
- raise SigningError('No identity provided and no canonical identity available')
+ raise SigningError(
+ 'No identity provided and no canonical identity available'
+ )
identity = self.canon_identity
ds.set_field('i', identity)
if selector:
@@ -766,7 +844,9 @@ class PatattMessage:
ds.set_field('t', str(int(time.time())))
hv, pkinfo = ds.sign(keyinfo)
- dshdr = email.header.make_header([(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78)
+ dshdr = email.header.make_header(
+ [(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78
+ )
self.headers.append(dshdr.encode().encode() + self.lf)
# Make informational header about the key used
@@ -781,10 +861,14 @@ class PatattMessage:
else:
idata.append(b'pk=%s' % pkinfo)
- dkhdr = email.header.make_header([(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78)
+ dkhdr = email.header.make_header(
+ [(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78
+ )
self.headers.append(dkhdr.encode().encode() + self.lf)
- def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> Tuple[str, str]:
+ def validate(
+ self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False
+ ) -> Tuple[str, str]:
"""Validate the signature for a specific identity.
Args:
@@ -863,7 +947,7 @@ class PatattMessage:
break
# is it a wrapped header?
- if line[0] in ("\x09", "\x20", 0x09, 0x20):
+ if line[0] in ('\x09', '\x20', 0x09, 0x20):
if not len(self.headers):
raise RuntimeError('Not a valid RFC2822 message')
# attach it to the previous header
@@ -950,19 +1034,23 @@ def get_data_dir() -> Path:
return datadir
-def _run_command(cmdargs: List[str],
- stdin: Optional[bytes] = None,
- env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
+def _run_command(
+ cmdargs: List[str],
+ stdin: Optional[bytes] = None,
+ env: Optional[Dict[str, str]] = None,
+) -> Tuple[int, bytes, bytes]:
logger.debug('Running %s', ' '.join(cmdargs))
cp = subprocess.run(cmdargs, input=stdin, env=env, capture_output=True, text=False)
logger.debug('Completed %s', repr(cp))
return cp.returncode, cp.stdout, cp.stderr
-def git_run_command(gitdir: Optional[str],
- args: List[str],
- stdin: Optional[bytes] = None,
- env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
+def git_run_command(
+ gitdir: Optional[str],
+ args: List[str],
+ stdin: Optional[bytes] = None,
+ env: Optional[Dict[str, str]] = None,
+) -> Tuple[int, bytes, bytes]:
if gitdir:
args = ['git', '--git-dir', gitdir, '--no-pager'] + args
else:
@@ -970,10 +1058,12 @@ def git_run_command(gitdir: Optional[str],
return _run_command(args, stdin=stdin, env=env)
-def get_config_from_git(regexp: str,
- section: Optional[str] = None,
- defaults: Optional[Dict[str, Union[str, List[str]]]] = None,
- multivals: Optional[List[str]] = None) -> GitConfigType:
+def get_config_from_git(
+ regexp: str,
+ section: Optional[str] = None,
+ defaults: Optional[Dict[str, Union[str, List[str]]]] = None,
+ multivals: Optional[List[str]] = None,
+) -> GitConfigType:
if multivals is None:
multivals = list()
@@ -1027,13 +1117,22 @@ def get_config_from_git(regexp: str,
return gitconfig
-def gpg_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
+def gpg_run_command(
+ cmdargs: List[str], stdin: Optional[bytes] = None
+) -> Tuple[int, bytes, bytes]:
gpgbin, _ = set_bin_paths(None)
- cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs
+ cmdargs = [
+ gpgbin,
+ '--batch',
+ '--no-auto-key-retrieve',
+ '--no-auto-check-trustdb',
+ ] + cmdargs
return _run_command(cmdargs, stdin)
-def sshk_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
+def sshk_run_command(
+ cmdargs: List[str], stdin: Optional[bytes] = None
+) -> Tuple[int, bytes, bytes]:
_, sshkbin = set_bin_paths(None)
cmdargs = [sshkbin] + cmdargs
return _run_command(cmdargs, stdin)
@@ -1079,8 +1178,12 @@ def make_pkey_path(keytype: str, identity: str, selector: str) -> Path:
domain = chunks[1].lower()
selector = selector.lower()
# urlencode all potentially untrusted bits to make sure nobody tries path-based badness
- return Path(urllib.parse.quote_plus(keytype), urllib.parse.quote_plus(domain),
- urllib.parse.quote_plus(local), urllib.parse.quote_plus(selector))
+ return Path(
+ urllib.parse.quote_plus(keytype),
+ urllib.parse.quote_plus(domain),
+ urllib.parse.quote_plus(local),
+ urllib.parse.quote_plus(selector),
+ )
def make_byhash_path(keytype: str, identity: str, selector: str) -> Path:
@@ -1105,7 +1208,9 @@ def make_byhash_path(keytype: str, identity: str, selector: str) -> Path:
return Path('by-hash', prefix, remainder)
-def get_public_key(source: str, keytype: str, identity: str, selector: str) -> Tuple[bytes, str]:
+def get_public_key(
+ source: str, keytype: str, identity: str, selector: str
+) -> Tuple[bytes, str]:
"""Look up a public key from a keyring source.
Searches for the key at the standard path first, then falls back to
@@ -1133,7 +1238,9 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
# split by :
parts = source.split(':', 4)
if len(parts) < 4:
- raise ConfigurationError('Invalid ref, must have at least 3 colons: %s' % source)
+ raise ConfigurationError(
+ 'Invalid ref, must have at least 3 colons: %s' % source
+ )
gitrepo = parts[1]
gitref = parts[2]
gitsub = parts[3]
@@ -1170,7 +1277,9 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
# Handle one level of symlinks
if out.find(b'\n') < 0 < out.find(b'/'):
# Check this path as well
- linktgt = os.path.normpath(os.path.join(os.path.dirname(subpath), out.decode()))
+ linktgt = os.path.normpath(
+ os.path.join(os.path.dirname(subpath), out.decode())
+ )
keysrc = f'{gitref}:{linktgt}'
cmdargs = ['show', keysrc]
ecode, out, err = git_run_command(gittop, cmdargs)
@@ -1230,6 +1339,7 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
import sys
+
if len(cmdargs.msgfile):
# Load all message from the files passed to make sure they all parse correctly
messages = dict()
@@ -1239,17 +1349,21 @@ def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
elif not sys.stdin.isatty():
messages = {'-': sys.stdin.buffer.read()}
else:
- logger.critical('E: Pipe a message to sign or pass filenames with individual messages')
+ logger.critical(
+ 'E: Pipe a message to sign or pass filenames with individual messages'
+ )
raise RuntimeError('Nothing to do')
return messages
-def sign_message(msgdata: bytes,
- algo: str,
- keyinfo: Union[str, bytes],
- identity: Optional[str],
- selector: Optional[str]) -> bytes:
+def sign_message(
+ msgdata: bytes,
+ algo: str,
+ keyinfo: Union[str, bytes],
+ identity: Optional[str],
+ selector: Optional[str],
+) -> bytes:
"""Sign an RFC2822 message and return the signed message bytes.
Args:
@@ -1284,7 +1398,9 @@ def set_bin_paths(config: Optional[GitConfigType]) -> Tuple[str, str]:
_sshkbin = config.get('ssh-keygen-bin')
assert isinstance(_sshkbin, str), 'ssh-keygen-bin must be a string'
SSHKBIN = _sshkbin
- elif (_sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')) is not None:
+ elif (
+ _sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')
+ ) is not None:
assert isinstance(_sshkbin, str), 'program must be a string'
SSHKBIN = _sshkbin
else:
@@ -1302,7 +1418,9 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
# Do we have this already looked up?
identity = config.get('identity')
if not isinstance(identity, str):
- raise ConfigurationError('Identity must be a string, got %s' % type(identity).__name__)
+ raise ConfigurationError(
+ 'Identity must be a string, got %s' % type(identity).__name__
+ )
if identity in KEYCACHE:
algo, keydata = KEYCACHE[identity]
@@ -1313,7 +1431,11 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
if user_signingkey:
gpg_format = get_config_from_git(r'gpg\..*').get('format', 'gpg')
key_algo = 'openssh' if gpg_format == 'ssh' else 'openpgp'
- logger.info('N: Using %s key %s defined by user.signingkey', key_algo, user_signingkey)
+ logger.info(
+ 'N: Using %s key %s defined by user.signingkey',
+ key_algo,
+ user_signingkey,
+ )
logger.info('N: Override by setting patatt.signingkey')
config['signingkey'] = '%s:%s' % (key_algo, user_signingkey)
else:
@@ -1323,7 +1445,9 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
sk = config.get('signingkey')
if not isinstance(sk, str):
- raise ConfigurationError('Signing key must be a string, got %s' % type(sk).__name__)
+ raise ConfigurationError(
+ 'Signing key must be a string, got %s' % type(sk).__name__
+ )
if sk.startswith('ed25519:'):
algo = 'ed25519'
identifier = sk[8:]
@@ -1338,7 +1462,7 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
keysrc = str(skey)
else:
# finally, try .git/%s.key
- if (gtdir := get_git_toplevel()):
+ if gtdir := get_git_toplevel():
skey = Path(gtdir) / '.git' / f'{identifier}.key'
if skey.exists():
keysrc = str(skey)
@@ -1397,12 +1521,18 @@ def get_main_config(section: Optional[str] = None) -> GitConfigType:
csection = 'default'
if csection in CONFIGCACHE:
return CONFIGCACHE[csection]
- config = get_config_from_git(r'patatt\..*', section=section, multivals=['keyringsrc'])
+ config = get_config_from_git(
+ r'patatt\..*', section=section, multivals=['keyringsrc']
+ )
# Append some extra keyring locations
if 'keyringsrc' not in config or not isinstance(config['keyringsrc'], list):
config['keyringsrc'] = list()
- assert isinstance(config['keyringsrc'], list) # Just to make lint checkers shut up
- config['keyringsrc'] += ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
+ assert isinstance(config['keyringsrc'], list) # Just to make lint checkers shut up
+ config['keyringsrc'] += [
+ 'ref:::.keys',
+ 'ref:::.local-keys',
+ 'ref::refs/meta/keyring:',
+ ]
set_bin_paths(config)
logger.debug('config: %s', config)
CONFIGCACHE[csection] = config
@@ -1442,10 +1572,11 @@ def cmd_sign(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
sys.exit(1)
-def validate_message(msgdata: bytes,
- sources: List[str],
- trim_body: bool = False
- ) -> List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]]:
+def validate_message(
+ msgdata: bytes, sources: List[str], trim_body: bool = False
+) -> List[
+ Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]
+]:
"""Validate all signatures in an RFC2822 message.
Args:
@@ -1459,11 +1590,17 @@ def validate_message(msgdata: bytes,
Result codes: RES_VALID, RES_BADSIG, RES_NOKEY, RES_NOSIG, RES_ERROR
"""
- attestations: List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]] = list()
+ attestations: List[
+ Tuple[
+ int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]
+ ]
+ ] = list()
pm = PatattMessage(msgdata)
if not pm.signed:
logger.debug('message is not signed')
- attestations.append((RES_NOSIG, None, None, None, None, ['no signatures found']))
+ attestations.append(
+ (RES_NOSIG, None, None, None, None, ['no signatures found'])
+ )
return attestations
# Find all identities for which we have public keys
@@ -1530,6 +1667,7 @@ def validate_message(msgdata: bytes,
def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
import mailbox
+
if len(cmdargs.msgfile) == 1:
# Try to open as an mbox file
try:
@@ -1699,36 +1837,73 @@ def command() -> None:
parser = argparse.ArgumentParser(
prog='patatt',
description='Cryptographically attest patches before sending out',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+ parser.add_argument(
+ '-v',
+ '--verbose',
+ action='store_true',
+ default=False,
+ help='Be a bit more verbose',
+ )
+ parser.add_argument(
+ '-d',
+ '--debug',
+ action='store_true',
+ default=False,
+ help='Show debugging output',
+ )
+ parser.add_argument(
+ '-s',
+ '--section',
+ dest='section',
+ default=None,
+ help='Use config section [patatt "sectionname"]',
)
- parser.add_argument('-v', '--verbose', action='store_true', default=False,
- help='Be a bit more verbose')
- parser.add_argument('-d', '--debug', action='store_true', default=False,
- help='Show debugging output')
- parser.add_argument('-s', '--section', dest='section', default=None,
- help='Use config section [patatt "sectionname"]')
parser.add_argument('--version', action='version', version=__VERSION__)
subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd')
- sp_sign = subparsers.add_parser('sign', help='Cryptographically attest an RFC2822 message')
- sp_sign.add_argument('--hook', dest='hookmode', action='store_true', default=False,
- help='Git hook mode')
+ sp_sign = subparsers.add_parser(
+ 'sign', help='Cryptographically attest an RFC2822 message'
+ )
+ sp_sign.add_argument(
+ '--hook',
+ dest='hookmode',
+ action='store_true',
+ default=False,
+ help='Git hook mode',
+ )
sp_sign.add_argument('msgfile', nargs='*', help='RFC2822 message files to sign')
sp_sign.set_defaults(func=cmd_sign)
sp_val = subparsers.add_parser('validate', help='Validate a devsig-signed message')
- sp_val.add_argument('msgfile', nargs='*', help='Individual signed message files to validate or an mbox')
+ sp_val.add_argument(
+ 'msgfile',
+ nargs='*',
+ help='Individual signed message files to validate or an mbox',
+ )
sp_val.set_defaults(func=cmd_validate)
sp_gen = subparsers.add_parser('genkey', help='Generate a new ed25519 keypair')
- sp_gen.add_argument('-n', '--keyname', default=None,
- help='Name to use for the key, e.g. "workstation", or "default"')
- sp_gen.add_argument('-f', '--force', action='store_true', default=False,
- help='Overwrite any existing keys, if found')
+ sp_gen.add_argument(
+ '-n',
+ '--keyname',
+ default=None,
+ help='Name to use for the key, e.g. "workstation", or "default"',
+ )
+ sp_gen.add_argument(
+ '-f',
+ '--force',
+ action='store_true',
+ default=False,
+ help='Overwrite any existing keys, if found',
+ )
sp_gen.set_defaults(func=cmd_genkey)
- sp_install = subparsers.add_parser('install-hook', help='Install sendmail-validate hook into the current repo')
+ sp_install = subparsers.add_parser(
+ 'install-hook', help='Install sendmail-validate hook into the current repo'
+ )
sp_install.set_defaults(func=cmd_install_hook)
_args = parser.parse_args()
diff --git a/tests/conftest.py b/tests/conftest.py
index 8bb6064..a2d2124 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -18,6 +18,7 @@ Message-ID: <12345@example.com>
This is a test email body.
"""
+
@pytest.fixture
def temp_data_dir() -> Generator[str, None, None]:
"""Create a temporary data directory structure for patatt."""
@@ -32,23 +33,26 @@ def temp_data_dir() -> Generator[str, None, None]:
# Return path to the temp directory
yield tmpdirname
+
@pytest.fixture
def devsig_header() -> DevsigHeader:
"""Create a basic DevsigHeader instance."""
return DevsigHeader()
+
@pytest.fixture
def patatt_message(sample_email_bytes: bytes) -> PatattMessage:
"""Create a PatattMessage from a sample email."""
return PatattMessage(sample_email_bytes)
+
@pytest.fixture
def sample_ed25519_key_pair() -> Dict[str, bytes]:
"""Generate a sample ed25519 key pair for testing."""
try:
from nacl.signing import SigningKey
except ImportError:
- pytest.skip("PyNaCl not installed, skipping ed25519 tests")
+ pytest.skip('PyNaCl not installed, skipping ed25519 tests')
# Generate a key pair
private_key = SigningKey.generate()
@@ -57,5 +61,5 @@ def sample_ed25519_key_pair() -> Dict[str, bytes]:
# Return base64 encoded keys
return {
'private': base64.b64encode(bytes(private_key)),
- 'public': base64.b64encode(public_key.encode())
- }
\ No newline at end of file
+ 'public': base64.b64encode(public_key.encode()),
+ }
diff --git a/tests/test_validation.py b/tests/test_validation.py
index d51be81..1477fbe 100644
--- a/tests/test_validation.py
+++ b/tests/test_validation.py
@@ -6,11 +6,9 @@ import pytest
from patatt import RES_VALID, validate_message
-@pytest.mark.parametrize("sample_file", [
- "ed25519-signed.txt",
- "pgp-signed.txt",
- "openssh-signed.txt"
-])
+@pytest.mark.parametrize(
+ 'sample_file', ['ed25519-signed.txt', 'pgp-signed.txt', 'openssh-signed.txt']
+)
def test_validate(sample_file: str) -> None:
"""Test validation of an ed25519 signed message from samples directory."""
# Path to the sample file
@@ -29,14 +27,14 @@ def test_validate(sample_file: str) -> None:
results = validate_message(signed_data, [sources_path])
# Check validation results
- assert results, "Validation should return results"
+ assert results, 'Validation should return results'
# At least one valid signature should be found
valid_signatures = [r for r in results if r[0] == RES_VALID]
- assert valid_signatures, "Should find at least one valid signature"
+ assert valid_signatures, 'Should find at least one valid signature'
# Print validation details for debugging
- print(f"Found {len(valid_signatures)} valid signatures:")
+ print(f'Found {len(valid_signatures)} valid signatures:')
for result in valid_signatures:
status, algo, keytype, identity, selector, errors = result
- print(f" - {keytype} signature by {identity} ({selector})")
+ print(f' - {keytype} signature by {identity} ({selector})')
diff --git a/tests/unit/test_byhash.py b/tests/unit/test_byhash.py
index c3fe4d2..e90ee82 100644
--- a/tests/unit/test_byhash.py
+++ b/tests/unit/test_byhash.py
@@ -8,7 +8,6 @@ from patatt import get_public_key, make_byhash_path, make_pkey_path
class TestMakeByhashPath:
-
def test_basic_hash_computation(self) -> None:
"""Test that make_byhash_path computes the correct hash."""
keytype = 'openssh'
@@ -42,7 +41,6 @@ class TestMakeByhashPath:
class TestGetPublicKeyByHash:
-
def test_filesystem_byhash_lookup(self) -> None:
"""Test that get_public_key finds a key via by-hash fallback."""
with tempfile.TemporaryDirectory() as tmpdir:
--git a/tests/unit/test_devsig_header.py b/tests/unit/test_devsig_header.py
index 547593d..7b6e461 100644
--- a/tests/unit/test_devsig_header.py
+++ b/tests/unit/test_devsig_header.py
@@ -8,7 +8,6 @@ from patatt import DevsigHeader
class TestDevsigHeader:
-
def test_initialization(self) -> None:
"""Test that DevsigHeader initializes correctly."""
header = DevsigHeader()
@@ -17,7 +16,9 @@ class TestDevsigHeader:
def test_from_bytes(self) -> None:
"""Test parsing a header from bytes."""
- header_bytes = b'v=1; a=ed25519-sha256; t=1623456789; i=test@example.com; bh=abcd1234'
+ header_bytes = (
+ b'v=1; a=ed25519-sha256; t=1623456789; i=test@example.com; bh=abcd1234'
+ )
header = DevsigHeader(header_bytes)
assert header.get_field_as_str('v') == '1'
@@ -41,7 +42,7 @@ class TestDevsigHeader:
def test_set_body(self) -> None:
"""Test setting the body and calculating the body hash."""
header = DevsigHeader()
- body = b"This is a test body"
+ body = b'This is a test body'
header.set_body(body)
@@ -55,7 +56,7 @@ class TestDevsigHeader:
def test_set_body_with_maxlen(self) -> None:
"""Test setting the body with a maxlen parameter."""
header = DevsigHeader()
- body = b"This is a test body"
+ body = b'This is a test body'
maxlen = 10
header.set_body(body, maxlen=maxlen)
@@ -95,17 +96,17 @@ class TestDevsigHeader:
"""Test that sanity_check fails if required fields are not set."""
header = DevsigHeader()
- with pytest.raises(RuntimeError, match="Must set \"a\" field first"):
+ with pytest.raises(RuntimeError, match='Must set "a" field first'):
header.sanity_check()
header.set_field('a', 'ed25519-sha256')
- with pytest.raises(RuntimeError, match="Must use set_body first"):
+ with pytest.raises(RuntimeError, match='Must use set_body first'):
header.sanity_check()
- header.set_body(b"Test body")
+ header.set_body(b'Test body')
- with pytest.raises(RuntimeError, match="Must use set_headers first"):
+ with pytest.raises(RuntimeError, match='Must use set_headers first'):
header.sanity_check()
# @pytest.mark.skipif(True, reason="Requires actual ed25519 keys")
diff --git a/tests/unit/test_get_algo_keydata.py b/tests/unit/test_get_algo_keydata.py
index 545f051..c4ccc8d 100644
--- a/tests/unit/test_get_algo_keydata.py
+++ b/tests/unit/test_get_algo_keydata.py
@@ -41,7 +41,10 @@ class TestGetAlgoKeydataSSHSigningKey:
def test_ssh_format_uses_openssh(self, mock_gcfg: MagicMock) -> None:
"""When gpg.format=ssh, user.signingkey should get the openssh: prefix."""
mock_gcfg.side_effect = _make_mock_get_config(
- usercfg={'email': 'test@example.com', 'signingkey': '/home/user/.ssh/id_ed25519.pub'},
+ usercfg={
+ 'email': 'test@example.com',
+ 'signingkey': '/home/user/.ssh/id_ed25519.pub',
+ },
gpgcfg={'format': 'ssh'},
)
config: GitConfigType = {'identity': 'test@example.com'}
@@ -91,7 +94,9 @@ class TestGetAlgoKeydataSSHSigningKey:
get_algo_keydata(config)
@patch('patatt.get_config_from_git')
- def test_patatt_signingkey_skips_user_signingkey(self, mock_gcfg: MagicMock) -> None:
+ def test_patatt_signingkey_skips_user_signingkey(
+ self, mock_gcfg: MagicMock
+ ) -> None:
"""When patatt.signingkey is already set, user.signingkey is not consulted."""
mock_gcfg.side_effect = _make_mock_get_config(
usercfg={'email': 'test@example.com', 'signingkey': 'SHOULD_NOT_BE_USED'},
diff --git a/tests/unit/test_patatt_message.py b/tests/unit/test_patatt_message.py
index 58338f4..17d4a30 100644
--- a/tests/unit/test_patatt_message.py
+++ b/tests/unit/test_patatt_message.py
@@ -6,7 +6,6 @@ from patatt import PatattMessage
class TestPatattMessage:
-
def test_initialization(self, sample_email_bytes: bytes) -> None:
"""Test initialization of PatattMessage with sample email."""
message = PatattMessage(sample_email_bytes)
@@ -25,7 +24,7 @@ This is a test body.
message = PatattMessage(email_bytes)
assert len(message.headers) == 2
- assert message.body == b"This is a test body.\n"
+ assert message.body == b'This is a test body.\n'
def test_as_bytes(self, sample_email_bytes: bytes) -> None:
"""Test converting message back to bytes."""
@@ -44,17 +43,22 @@ This is a test body.
assert isinstance(output, str)
assert output == sample_email_bytes.decode()
- def test_git_canonicalize(self, monkeypatch: pytest.MonkeyPatch, sample_email_bytes: bytes) -> None:
+ def test_git_canonicalize(
+ self, monkeypatch: pytest.MonkeyPatch, sample_email_bytes: bytes
+ ) -> None:
"""Test git canonicalization of message."""
+
# Mock _get_git_mailinfo to avoid actual git command execution
def mock_get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]:
# Return mock metadata, patch, and info
- metadata = b"Author: Test User\nEmail: test@example.com\nSubject: Test email\n"
- patch = b"This is a test body.\n"
- info = b"email: test@example.com\nauthor: Test User\nsubject: Test email\n"
+ metadata = (
+ b'Author: Test User\nEmail: test@example.com\nSubject: Test email\n'
+ )
+ patch = b'This is a test body.\n'
+ info = b'email: test@example.com\nauthor: Test User\nsubject: Test email\n'
return metadata, patch, info
- monkeypatch.setattr(PatattMessage, "_get_git_mailinfo", mock_get_git_mailinfo)
+ monkeypatch.setattr(PatattMessage, '_get_git_mailinfo', mock_get_git_mailinfo)
message = PatattMessage(sample_email_bytes)
message.git_canonicalize()
@@ -62,7 +66,7 @@ This is a test body.
# Check that the message was canonicalized
assert message.canon_body is not None
assert message.canon_headers is not None
- assert message.canon_identity == "test@example.com"
+ assert message.canon_identity == 'test@example.com'
# @pytest.mark.skipif(True, reason="Requires actual signing setup")
# def test_sign(self) -> None:
--
2.53.0
next prev parent reply other threads:[~2026-04-20 1:23 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-04-20 1:22 [PATCH patatt 0/7] Harden local checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 1/7] Add local CI script Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 2/7] Add Ruff import checks Tamir Duberstein
2026-04-20 1:22 ` Tamir Duberstein [this message]
2026-04-20 1:22 ` [PATCH patatt 4/7] Add pyright strict checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 5/7] Add ty checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 6/7] Reduce dictionary lookups Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 7/7] Import PyNaCl unconditionally Tamir Duberstein
2026-04-27 20:20 ` [PATCH patatt 0/7] Harden local checks Tamir Duberstein
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260419-stronger-type-checking-v1-3-5c108048d2c7@kernel.org \
--to=tamird@kernel.org \
--cc='test@example.com'} \
--cc=konstantin@linuxfoundation.org \
--cc=tools@kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.