From: Tamir Duberstein <tamird@kernel.org>
To: "Kernel.org Tools" <tools@kernel.org>
Cc: Konstantin Ryabitsev <konstantin@linuxfoundation.org>,
Tamir Duberstein <tamird@kernel.org>, 'test@example.com'}
Subject: [PATCH patatt 3/7] Add Ruff format check
Date: Sun, 19 Apr 2026 21:22:23 -0400 [thread overview]
Message-ID: <20260419-stronger-type-checking-v1-3-5c108048d2c7@kernel.org> (raw)
In-Reply-To: <20260419-stronger-type-checking-v1-0-5c108048d2c7@kernel.org>
Run Ruff's formatter as part of the local CI helper and configure it to
preserve the project's single-quote style.
Apply the one-time formatting pass so the new check starts from a green
baseline.
Signed-off-by: Tamir Duberstein <tamird@kernel.org>
---
ci.sh | 1 +
conftest.py | 3 +-
pyproject.toml | 3 +
src/patatt/__init__.py | 345 +++++++++++++++++++++++++++---------
tests/conftest.py | 10 +-
tests/test_validation.py | 16 +-
tests/unit/test_byhash.py | 2 -
| 17 +-
tests/unit/test_get_algo_keydata.py | 9 +-
tests/unit/test_patatt_message.py | 20 ++-
10 files changed, 307 insertions(+), 119 deletions(-)
diff --git a/ci.sh b/ci.sh
index 3001db2..ec0baf8 100755
--- a/ci.sh
+++ b/ci.sh
@@ -2,6 +2,7 @@
set -eu
+uv run ruff format --check
uv run ruff check
uv run mypy .
uv run pytest --durations=0
diff --git a/conftest.py b/conftest.py
index c106f53..33276ab 100644
--- a/conftest.py
+++ b/conftest.py
@@ -3,7 +3,6 @@ from pathlib import Path
# Add the src directory to the Python path
project_root = Path(__file__).parent
-src_path = str(project_root / "src")
+src_path = str(project_root / 'src')
if src_path not in sys.path:
sys.path.insert(0, src_path)
-
diff --git a/pyproject.toml b/pyproject.toml
index 7583cd4..d56d828 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -58,6 +58,9 @@ extend-select = [
]
flake8-quotes.inline-quotes = "single"
+[tool.ruff.format]
+quote-style = "single"
+
[tool.pyright]
typeCheckingMode = "off"
diff --git a/src/patatt/__init__.py b/src/patatt/__init__.py
index 935da55..24ec933 100644
--- a/src/patatt/__init__.py
+++ b/src/patatt/__init__.py
@@ -55,6 +55,7 @@ CONFIGCACHE: Dict[str, GitConfigType] = dict()
__VERSION__ = '0.7.1'
MAX_SUPPORTED_FORMAT_VERSION = 1
+
class Error(Exception):
"""Base exception for patatt errors.
@@ -171,8 +172,11 @@ class DevsigHeader:
.. deprecated::
Use :meth:`get_field_as_bytes` or :meth:`get_field_as_str` instead.
"""
- warnings.warn('get_field() is deprecated, use get_field_as_bytes() or get_field_as_str() instead',
- DeprecationWarning, stacklevel=2)
+ warnings.warn(
+ 'get_field() is deprecated, use get_field_as_bytes() or get_field_as_str() instead',
+ DeprecationWarning,
+ stacklevel=2,
+ )
value = self.hdata.get(field)
if isinstance(value, bytes) and decode:
return value.decode()
@@ -247,8 +251,10 @@ class DevsigHeader:
if mode == 'sign':
# Make sure REQ_HDRS is a subset of allhdrs
if not reqset.issubset(allhdrs):
- raise SigningError('The following required headers not present: %s'
- % (b', '.join(reqset.difference(allhdrs)).decode()))
+ raise SigningError(
+ 'The following required headers not present: %s'
+ % (b', '.join(reqset.difference(allhdrs)).decode())
+ )
# Add optional headers that are actually present
optpresent = list(allhdrs.intersection(optset))
signlist = REQ_HDRS + sorted(optpresent)
@@ -261,8 +267,10 @@ class DevsigHeader:
signlist = [x.strip() for x in hfield.split(b':')]
# Make sure REQ_HEADERS are in this set
if not reqset.issubset(set(signlist)):
- raise ValidationError('The following required headers not signed: %s'
- % (b', '.join(reqset.difference(set(signlist))).decode()))
+ raise ValidationError(
+ 'The following required headers not signed: %s'
+ % (b', '.join(reqset.difference(set(signlist))).decode())
+ )
else:
raise RuntimeError('Unknown set_header mode: %s' % mode)
@@ -276,7 +284,9 @@ class DevsigHeader:
at = 0
for hname, rawval in list(parsed):
if hname == shname:
- self._headervals.append(hname + b':' + DevsigHeader._dkim_canonicalize_header(rawval))
+ self._headervals.append(
+ hname + b':' + DevsigHeader._dkim_canonicalize_header(rawval)
+ )
parsed.pop(at)
break
at += 1
@@ -335,7 +345,9 @@ class DevsigHeader:
pubkey = keyinfo.encode()
else:
pubkey = keyinfo
- sdigest, (_, _, _, signkey, signtime) = DevsigHeader._validate_openpgp(bdata, pubkey)
+ sdigest, (_, _, _, signkey, signtime) = DevsigHeader._validate_openpgp(
+ bdata, pubkey
+ )
if sdigest != vdigest:
raise ValidationError('Header validation failed')
return signkey, signtime
@@ -349,7 +361,9 @@ class DevsigHeader:
bkeyinfo = keyinfo.encode()
else:
# This cannot be None for any of the algorithms we support other than openpgp, done above
- raise RuntimeError('keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__)
+ raise RuntimeError(
+ 'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
+ )
if algo.startswith('ed25519'):
sdigest = DevsigHeader._validate_ed25519(bdata, bkeyinfo)
@@ -372,7 +386,9 @@ class DevsigHeader:
return signkey, signtime
- def sign(self, keyinfo: Union[str, bytes], split: bool = True) -> Tuple[bytes, bytes]:
+ def sign(
+ self, keyinfo: Union[str, bytes], split: bool = True
+ ) -> Tuple[bytes, bytes]:
"""Sign the message and generate signature header value.
Args:
@@ -399,7 +415,9 @@ class DevsigHeader:
skeyinfo = keyinfo
bkeyinfo = keyinfo.encode()
else:
- raise RuntimeError('keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__)
+ raise RuntimeError(
+ 'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
+ )
hparts = list()
for fn in self._order:
@@ -474,7 +492,9 @@ class DevsigHeader:
sshkargs = ['-Y', 'sign', '-n', 'patatt', '-f', keypath]
ecode, out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
- raise SigningError('Running ssh-keygen failed', errors=err.decode().strip().split('\n'))
+ raise SigningError(
+ 'Running ssh-keygen failed', errors=err.decode().strip().split('\n')
+ )
# Remove the header/footer
sigdata = b''
for bline in out.split(b'\n'):
@@ -486,7 +506,9 @@ class DevsigHeader:
sshkargs = ['-l', '-f', keypath]
ecode, out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
- raise SigningError('Running ssh-keygen failed', errors=err.decode().split('\n'))
+ raise SigningError(
+ 'Running ssh-keygen failed', errors=err.decode().split('\n')
+ )
chunks = out.split()
keyfp = chunks[1]
KEYCACHE[keypath] = keyfp
@@ -503,17 +525,41 @@ class DevsigHeader:
spath = os.path.join(td, 'sigdata')
with open(fpath, 'wb') as fh:
chunks = keydata.split()
- bcont = b'patatter@local namespaces="patatt" ' + chunks[0] + b' ' + chunks[1] + b'\n'
+ bcont = (
+ b'patatter@local namespaces="patatt" '
+ + chunks[0]
+ + b' '
+ + chunks[1]
+ + b'\n'
+ )
logger.debug('allowed-signers: %s', bcont)
fh.write(bcont)
with open(spath, 'wb') as fh:
- bcont = b'-----BEGIN SSH SIGNATURE-----\n' + sigdata + b'\n-----END SSH SIGNATURE-----\n'
+ bcont = (
+ b'-----BEGIN SSH SIGNATURE-----\n'
+ + sigdata
+ + b'\n-----END SSH SIGNATURE-----\n'
+ )
logger.debug('sigdata: %s', bcont)
fh.write(bcont)
- sshkargs = ['-Y', 'verify', '-n', 'patatt', '-I', 'patatter@local', '-f', fpath, '-s', spath]
+ sshkargs = [
+ '-Y',
+ 'verify',
+ '-n',
+ 'patatt',
+ '-I',
+ 'patatter@local',
+ '-f',
+ fpath,
+ '-s',
+ spath,
+ ]
ecode, out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
- raise ValidationError('Failed to validate openssh signature', errors=err.decode().split('\n'))
+ raise ValidationError(
+ 'Failed to validate openssh signature',
+ errors=err.decode().split('\n'),
+ )
@staticmethod
def _sign_openpgp(payload: bytes, keyid: str) -> Tuple[bytes, bytes]:
@@ -528,7 +574,9 @@ class DevsigHeader:
gpgargs = ['--with-colons', '--fingerprint', keyid]
ecode, out, err = gpg_run_command(gpgargs)
if ecode > 0:
- raise SigningError('Running gpg failed', errors=err.decode().split('\n'))
+ raise SigningError(
+ 'Running gpg failed', errors=err.decode().split('\n')
+ )
pkid = None
keyfp = None
for line in out.split(b'\n'):
@@ -550,13 +598,21 @@ class DevsigHeader:
return bdata, keyfp
@staticmethod
- def _validate_openpgp(sigdata: bytes, pubkey: Optional[bytes]) -> Tuple[bytes, Tuple[bool, bool, bool, str, str]]:
+ def _validate_openpgp(
+ sigdata: bytes, pubkey: Optional[bytes]
+ ) -> Tuple[bytes, Tuple[bool, bool, bool, str, str]]:
global KEYCACHE
bsigdata = base64.b64decode(sigdata)
vrfyargs = ['--verify', '--output', '-', '--status-fd=2']
if pubkey:
with tempfile.TemporaryDirectory(suffix='.patatt.gnupg') as td:
- keyringargs = ['--homedir', td, '--no-default-keyring', '--keyring', 'pub']
+ keyringargs = [
+ '--homedir',
+ td,
+ '--no-default-keyring',
+ '--keyring',
+ 'pub',
+ ]
if pubkey in KEYCACHE:
logger.debug('Reusing cached keyring')
with open(os.path.join(td, 'pub'), 'wb') as kfh:
@@ -596,10 +652,16 @@ class DevsigHeader:
signtime = ''
signkey = ''
- logger.debug('GNUPG status:\n\t%s', status.decode().strip().replace('\n', '\n\t'))
+ logger.debug(
+ 'GNUPG status:\n\t%s', status.decode().strip().replace('\n', '\n\t')
+ )
if re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M):
good = True
- if (vs_matches := re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)):
+ if vs_matches := re.search(
+ rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)',
+ status,
+ flags=re.M,
+ ):
valid = True
signkey = vs_matches.groups()[0].decode()
signtime = vs_matches.groups()[2].decode()
@@ -698,7 +760,9 @@ class PatattMessage:
# Generate a new payload using m and p and canonicalize with \r\n endings,
# trimming any excess blank lines ("simple" DKIM canonicalization).
- m, p, i = PatattMessage._get_git_mailinfo(b''.join(self.headers) + self.lf + self.body)
+ m, p, i = PatattMessage._get_git_mailinfo(
+ b''.join(self.headers) + self.lf + self.body
+ )
self.canon_body = b''
for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
@@ -718,14 +782,26 @@ class PatattMessage:
left, right = header.split(b':', 1)
lleft = left.lower()
if lleft == b'from':
- right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>'
+ right = (
+ b' '
+ + idata.get(b'author', b'')
+ + b' <'
+ + idata.get(b'email', b'')
+ + b'>'
+ )
elif lleft == b'subject':
right = b' ' + idata.get(b'subject', b'')
self.canon_headers.append(left + b':' + right)
except ValueError:
self.canon_headers.append(header)
- def sign(self, algo: str, keyinfo: Union[str, bytes], identity: Optional[str], selector: Optional[str]) -> None:
+ def sign(
+ self,
+ algo: str,
+ keyinfo: Union[str, bytes],
+ identity: Optional[str],
+ selector: Optional[str],
+ ) -> None:
"""Sign the message and add signature headers.
Args:
@@ -751,7 +827,9 @@ class PatattMessage:
ds.set_field('l', str(len(self.canon_body)))
if not identity:
if not self.canon_identity:
- raise SigningError('No identity provided and no canonical identity available')
+ raise SigningError(
+ 'No identity provided and no canonical identity available'
+ )
identity = self.canon_identity
ds.set_field('i', identity)
if selector:
@@ -766,7 +844,9 @@ class PatattMessage:
ds.set_field('t', str(int(time.time())))
hv, pkinfo = ds.sign(keyinfo)
- dshdr = email.header.make_header([(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78)
+ dshdr = email.header.make_header(
+ [(DEVSIG_HDR + b': ' + hv, 'us-ascii')], maxlinelen=78
+ )
self.headers.append(dshdr.encode().encode() + self.lf)
# Make informational header about the key used
@@ -781,10 +861,14 @@ class PatattMessage:
else:
idata.append(b'pk=%s' % pkinfo)
- dkhdr = email.header.make_header([(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78)
+ dkhdr = email.header.make_header(
+ [(DEVKEY_HDR + b': ' + b'; '.join(idata), 'us-ascii')], maxlinelen=78
+ )
self.headers.append(dkhdr.encode().encode() + self.lf)
- def validate(self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False) -> Tuple[str, str]:
+ def validate(
+ self, identity: str, pkey: Union[bytes, str, None], trim_body: bool = False
+ ) -> Tuple[str, str]:
"""Validate the signature for a specific identity.
Args:
@@ -863,7 +947,7 @@ class PatattMessage:
break
# is it a wrapped header?
- if line[0] in ("\x09", "\x20", 0x09, 0x20):
+ if line[0] in ('\x09', '\x20', 0x09, 0x20):
if not len(self.headers):
raise RuntimeError('Not a valid RFC2822 message')
# attach it to the previous header
@@ -950,19 +1034,23 @@ def get_data_dir() -> Path:
return datadir
-def _run_command(cmdargs: List[str],
- stdin: Optional[bytes] = None,
- env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
+def _run_command(
+ cmdargs: List[str],
+ stdin: Optional[bytes] = None,
+ env: Optional[Dict[str, str]] = None,
+) -> Tuple[int, bytes, bytes]:
logger.debug('Running %s', ' '.join(cmdargs))
cp = subprocess.run(cmdargs, input=stdin, env=env, capture_output=True, text=False)
logger.debug('Completed %s', repr(cp))
return cp.returncode, cp.stdout, cp.stderr
-def git_run_command(gitdir: Optional[str],
- args: List[str],
- stdin: Optional[bytes] = None,
- env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
+def git_run_command(
+ gitdir: Optional[str],
+ args: List[str],
+ stdin: Optional[bytes] = None,
+ env: Optional[Dict[str, str]] = None,
+) -> Tuple[int, bytes, bytes]:
if gitdir:
args = ['git', '--git-dir', gitdir, '--no-pager'] + args
else:
@@ -970,10 +1058,12 @@ def git_run_command(gitdir: Optional[str],
return _run_command(args, stdin=stdin, env=env)
-def get_config_from_git(regexp: str,
- section: Optional[str] = None,
- defaults: Optional[Dict[str, Union[str, List[str]]]] = None,
- multivals: Optional[List[str]] = None) -> GitConfigType:
+def get_config_from_git(
+ regexp: str,
+ section: Optional[str] = None,
+ defaults: Optional[Dict[str, Union[str, List[str]]]] = None,
+ multivals: Optional[List[str]] = None,
+) -> GitConfigType:
if multivals is None:
multivals = list()
@@ -1027,13 +1117,22 @@ def get_config_from_git(regexp: str,
return gitconfig
-def gpg_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
+def gpg_run_command(
+ cmdargs: List[str], stdin: Optional[bytes] = None
+) -> Tuple[int, bytes, bytes]:
gpgbin, _ = set_bin_paths(None)
- cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs
+ cmdargs = [
+ gpgbin,
+ '--batch',
+ '--no-auto-key-retrieve',
+ '--no-auto-check-trustdb',
+ ] + cmdargs
return _run_command(cmdargs, stdin)
-def sshk_run_command(cmdargs: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
+def sshk_run_command(
+ cmdargs: List[str], stdin: Optional[bytes] = None
+) -> Tuple[int, bytes, bytes]:
_, sshkbin = set_bin_paths(None)
cmdargs = [sshkbin] + cmdargs
return _run_command(cmdargs, stdin)
@@ -1079,8 +1178,12 @@ def make_pkey_path(keytype: str, identity: str, selector: str) -> Path:
domain = chunks[1].lower()
selector = selector.lower()
# urlencode all potentially untrusted bits to make sure nobody tries path-based badness
- return Path(urllib.parse.quote_plus(keytype), urllib.parse.quote_plus(domain),
- urllib.parse.quote_plus(local), urllib.parse.quote_plus(selector))
+ return Path(
+ urllib.parse.quote_plus(keytype),
+ urllib.parse.quote_plus(domain),
+ urllib.parse.quote_plus(local),
+ urllib.parse.quote_plus(selector),
+ )
def make_byhash_path(keytype: str, identity: str, selector: str) -> Path:
@@ -1105,7 +1208,9 @@ def make_byhash_path(keytype: str, identity: str, selector: str) -> Path:
return Path('by-hash', prefix, remainder)
-def get_public_key(source: str, keytype: str, identity: str, selector: str) -> Tuple[bytes, str]:
+def get_public_key(
+ source: str, keytype: str, identity: str, selector: str
+) -> Tuple[bytes, str]:
"""Look up a public key from a keyring source.
Searches for the key at the standard path first, then falls back to
@@ -1133,7 +1238,9 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
# split by :
parts = source.split(':', 4)
if len(parts) < 4:
- raise ConfigurationError('Invalid ref, must have at least 3 colons: %s' % source)
+ raise ConfigurationError(
+ 'Invalid ref, must have at least 3 colons: %s' % source
+ )
gitrepo = parts[1]
gitref = parts[2]
gitsub = parts[3]
@@ -1170,7 +1277,9 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
# Handle one level of symlinks
if out.find(b'\n') < 0 < out.find(b'/'):
# Check this path as well
- linktgt = os.path.normpath(os.path.join(os.path.dirname(subpath), out.decode()))
+ linktgt = os.path.normpath(
+ os.path.join(os.path.dirname(subpath), out.decode())
+ )
keysrc = f'{gitref}:{linktgt}'
cmdargs = ['show', keysrc]
ecode, out, err = git_run_command(gittop, cmdargs)
@@ -1230,6 +1339,7 @@ def get_public_key(source: str, keytype: str, identity: str, selector: str) -> T
def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
import sys
+
if len(cmdargs.msgfile):
# Load all message from the files passed to make sure they all parse correctly
messages = dict()
@@ -1239,17 +1349,21 @@ def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
elif not sys.stdin.isatty():
messages = {'-': sys.stdin.buffer.read()}
else:
- logger.critical('E: Pipe a message to sign or pass filenames with individual messages')
+ logger.critical(
+ 'E: Pipe a message to sign or pass filenames with individual messages'
+ )
raise RuntimeError('Nothing to do')
return messages
-def sign_message(msgdata: bytes,
- algo: str,
- keyinfo: Union[str, bytes],
- identity: Optional[str],
- selector: Optional[str]) -> bytes:
+def sign_message(
+ msgdata: bytes,
+ algo: str,
+ keyinfo: Union[str, bytes],
+ identity: Optional[str],
+ selector: Optional[str],
+) -> bytes:
"""Sign an RFC2822 message and return the signed message bytes.
Args:
@@ -1284,7 +1398,9 @@ def set_bin_paths(config: Optional[GitConfigType]) -> Tuple[str, str]:
_sshkbin = config.get('ssh-keygen-bin')
assert isinstance(_sshkbin, str), 'ssh-keygen-bin must be a string'
SSHKBIN = _sshkbin
- elif (_sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')) is not None:
+ elif (
+ _sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')
+ ) is not None:
assert isinstance(_sshkbin, str), 'program must be a string'
SSHKBIN = _sshkbin
else:
@@ -1302,7 +1418,9 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
# Do we have this already looked up?
identity = config.get('identity')
if not isinstance(identity, str):
- raise ConfigurationError('Identity must be a string, got %s' % type(identity).__name__)
+ raise ConfigurationError(
+ 'Identity must be a string, got %s' % type(identity).__name__
+ )
if identity in KEYCACHE:
algo, keydata = KEYCACHE[identity]
@@ -1313,7 +1431,11 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
if user_signingkey:
gpg_format = get_config_from_git(r'gpg\..*').get('format', 'gpg')
key_algo = 'openssh' if gpg_format == 'ssh' else 'openpgp'
- logger.info('N: Using %s key %s defined by user.signingkey', key_algo, user_signingkey)
+ logger.info(
+ 'N: Using %s key %s defined by user.signingkey',
+ key_algo,
+ user_signingkey,
+ )
logger.info('N: Override by setting patatt.signingkey')
config['signingkey'] = '%s:%s' % (key_algo, user_signingkey)
else:
@@ -1323,7 +1445,9 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
sk = config.get('signingkey')
if not isinstance(sk, str):
- raise ConfigurationError('Signing key must be a string, got %s' % type(sk).__name__)
+ raise ConfigurationError(
+ 'Signing key must be a string, got %s' % type(sk).__name__
+ )
if sk.startswith('ed25519:'):
algo = 'ed25519'
identifier = sk[8:]
@@ -1338,7 +1462,7 @@ def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
keysrc = str(skey)
else:
# finally, try .git/%s.key
- if (gtdir := get_git_toplevel()):
+ if gtdir := get_git_toplevel():
skey = Path(gtdir) / '.git' / f'{identifier}.key'
if skey.exists():
keysrc = str(skey)
@@ -1397,12 +1521,18 @@ def get_main_config(section: Optional[str] = None) -> GitConfigType:
csection = 'default'
if csection in CONFIGCACHE:
return CONFIGCACHE[csection]
- config = get_config_from_git(r'patatt\..*', section=section, multivals=['keyringsrc'])
+ config = get_config_from_git(
+ r'patatt\..*', section=section, multivals=['keyringsrc']
+ )
# Append some extra keyring locations
if 'keyringsrc' not in config or not isinstance(config['keyringsrc'], list):
config['keyringsrc'] = list()
- assert isinstance(config['keyringsrc'], list) # Just to make lint checkers shut up
- config['keyringsrc'] += ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
+ assert isinstance(config['keyringsrc'], list) # Just to make lint checkers shut up
+ config['keyringsrc'] += [
+ 'ref:::.keys',
+ 'ref:::.local-keys',
+ 'ref::refs/meta/keyring:',
+ ]
set_bin_paths(config)
logger.debug('config: %s', config)
CONFIGCACHE[csection] = config
@@ -1442,10 +1572,11 @@ def cmd_sign(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
sys.exit(1)
-def validate_message(msgdata: bytes,
- sources: List[str],
- trim_body: bool = False
- ) -> List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]]:
+def validate_message(
+ msgdata: bytes, sources: List[str], trim_body: bool = False
+) -> List[
+ Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]
+]:
"""Validate all signatures in an RFC2822 message.
Args:
@@ -1459,11 +1590,17 @@ def validate_message(msgdata: bytes,
Result codes: RES_VALID, RES_BADSIG, RES_NOKEY, RES_NOSIG, RES_ERROR
"""
- attestations: List[Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]] = list()
+ attestations: List[
+ Tuple[
+ int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]
+ ]
+ ] = list()
pm = PatattMessage(msgdata)
if not pm.signed:
logger.debug('message is not signed')
- attestations.append((RES_NOSIG, None, None, None, None, ['no signatures found']))
+ attestations.append(
+ (RES_NOSIG, None, None, None, None, ['no signatures found'])
+ )
return attestations
# Find all identities for which we have public keys
@@ -1530,6 +1667,7 @@ def validate_message(msgdata: bytes,
def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
import mailbox
+
if len(cmdargs.msgfile) == 1:
# Try to open as an mbox file
try:
@@ -1699,36 +1837,73 @@ def command() -> None:
parser = argparse.ArgumentParser(
prog='patatt',
description='Cryptographically attest patches before sending out',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+ parser.add_argument(
+ '-v',
+ '--verbose',
+ action='store_true',
+ default=False,
+ help='Be a bit more verbose',
+ )
+ parser.add_argument(
+ '-d',
+ '--debug',
+ action='store_true',
+ default=False,
+ help='Show debugging output',
+ )
+ parser.add_argument(
+ '-s',
+ '--section',
+ dest='section',
+ default=None,
+ help='Use config section [patatt "sectionname"]',
)
- parser.add_argument('-v', '--verbose', action='store_true', default=False,
- help='Be a bit more verbose')
- parser.add_argument('-d', '--debug', action='store_true', default=False,
- help='Show debugging output')
- parser.add_argument('-s', '--section', dest='section', default=None,
- help='Use config section [patatt "sectionname"]')
parser.add_argument('--version', action='version', version=__VERSION__)
subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd')
- sp_sign = subparsers.add_parser('sign', help='Cryptographically attest an RFC2822 message')
- sp_sign.add_argument('--hook', dest='hookmode', action='store_true', default=False,
- help='Git hook mode')
+ sp_sign = subparsers.add_parser(
+ 'sign', help='Cryptographically attest an RFC2822 message'
+ )
+ sp_sign.add_argument(
+ '--hook',
+ dest='hookmode',
+ action='store_true',
+ default=False,
+ help='Git hook mode',
+ )
sp_sign.add_argument('msgfile', nargs='*', help='RFC2822 message files to sign')
sp_sign.set_defaults(func=cmd_sign)
sp_val = subparsers.add_parser('validate', help='Validate a devsig-signed message')
- sp_val.add_argument('msgfile', nargs='*', help='Individual signed message files to validate or an mbox')
+ sp_val.add_argument(
+ 'msgfile',
+ nargs='*',
+ help='Individual signed message files to validate or an mbox',
+ )
sp_val.set_defaults(func=cmd_validate)
sp_gen = subparsers.add_parser('genkey', help='Generate a new ed25519 keypair')
- sp_gen.add_argument('-n', '--keyname', default=None,
- help='Name to use for the key, e.g. "workstation", or "default"')
- sp_gen.add_argument('-f', '--force', action='store_true', default=False,
- help='Overwrite any existing keys, if found')
+ sp_gen.add_argument(
+ '-n',
+ '--keyname',
+ default=None,
+ help='Name to use for the key, e.g. "workstation", or "default"',
+ )
+ sp_gen.add_argument(
+ '-f',
+ '--force',
+ action='store_true',
+ default=False,
+ help='Overwrite any existing keys, if found',
+ )
sp_gen.set_defaults(func=cmd_genkey)
- sp_install = subparsers.add_parser('install-hook', help='Install sendmail-validate hook into the current repo')
+ sp_install = subparsers.add_parser(
+ 'install-hook', help='Install sendmail-validate hook into the current repo'
+ )
sp_install.set_defaults(func=cmd_install_hook)
_args = parser.parse_args()
diff --git a/tests/conftest.py b/tests/conftest.py
index 8bb6064..a2d2124 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -18,6 +18,7 @@ Message-ID: <12345@example.com>
This is a test email body.
"""
+
@pytest.fixture
def temp_data_dir() -> Generator[str, None, None]:
"""Create a temporary data directory structure for patatt."""
@@ -32,23 +33,26 @@ def temp_data_dir() -> Generator[str, None, None]:
# Return path to the temp directory
yield tmpdirname
+
@pytest.fixture
def devsig_header() -> DevsigHeader:
"""Create a basic DevsigHeader instance."""
return DevsigHeader()
+
@pytest.fixture
def patatt_message(sample_email_bytes: bytes) -> PatattMessage:
"""Create a PatattMessage from a sample email."""
return PatattMessage(sample_email_bytes)
+
@pytest.fixture
def sample_ed25519_key_pair() -> Dict[str, bytes]:
"""Generate a sample ed25519 key pair for testing."""
try:
from nacl.signing import SigningKey
except ImportError:
- pytest.skip("PyNaCl not installed, skipping ed25519 tests")
+ pytest.skip('PyNaCl not installed, skipping ed25519 tests')
# Generate a key pair
private_key = SigningKey.generate()
@@ -57,5 +61,5 @@ def sample_ed25519_key_pair() -> Dict[str, bytes]:
# Return base64 encoded keys
return {
'private': base64.b64encode(bytes(private_key)),
- 'public': base64.b64encode(public_key.encode())
- }
\ No newline at end of file
+ 'public': base64.b64encode(public_key.encode()),
+ }
diff --git a/tests/test_validation.py b/tests/test_validation.py
index d51be81..1477fbe 100644
--- a/tests/test_validation.py
+++ b/tests/test_validation.py
@@ -6,11 +6,9 @@ import pytest
from patatt import RES_VALID, validate_message
-@pytest.mark.parametrize("sample_file", [
- "ed25519-signed.txt",
- "pgp-signed.txt",
- "openssh-signed.txt"
-])
+@pytest.mark.parametrize(
+ 'sample_file', ['ed25519-signed.txt', 'pgp-signed.txt', 'openssh-signed.txt']
+)
def test_validate(sample_file: str) -> None:
"""Test validation of an ed25519 signed message from samples directory."""
# Path to the sample file
@@ -29,14 +27,14 @@ def test_validate(sample_file: str) -> None:
results = validate_message(signed_data, [sources_path])
# Check validation results
- assert results, "Validation should return results"
+ assert results, 'Validation should return results'
# At least one valid signature should be found
valid_signatures = [r for r in results if r[0] == RES_VALID]
- assert valid_signatures, "Should find at least one valid signature"
+ assert valid_signatures, 'Should find at least one valid signature'
# Print validation details for debugging
- print(f"Found {len(valid_signatures)} valid signatures:")
+ print(f'Found {len(valid_signatures)} valid signatures:')
for result in valid_signatures:
status, algo, keytype, identity, selector, errors = result
- print(f" - {keytype} signature by {identity} ({selector})")
+ print(f' - {keytype} signature by {identity} ({selector})')
diff --git a/tests/unit/test_byhash.py b/tests/unit/test_byhash.py
index c3fe4d2..e90ee82 100644
--- a/tests/unit/test_byhash.py
+++ b/tests/unit/test_byhash.py
@@ -8,7 +8,6 @@ from patatt import get_public_key, make_byhash_path, make_pkey_path
class TestMakeByhashPath:
-
def test_basic_hash_computation(self) -> None:
"""Test that make_byhash_path computes the correct hash."""
keytype = 'openssh'
@@ -42,7 +41,6 @@ class TestMakeByhashPath:
class TestGetPublicKeyByHash:
-
def test_filesystem_byhash_lookup(self) -> None:
"""Test that get_public_key finds a key via by-hash fallback."""
with tempfile.TemporaryDirectory() as tmpdir:
--git a/tests/unit/test_devsig_header.py b/tests/unit/test_devsig_header.py
index 547593d..7b6e461 100644
--- a/tests/unit/test_devsig_header.py
+++ b/tests/unit/test_devsig_header.py
@@ -8,7 +8,6 @@ from patatt import DevsigHeader
class TestDevsigHeader:
-
def test_initialization(self) -> None:
"""Test that DevsigHeader initializes correctly."""
header = DevsigHeader()
@@ -17,7 +16,9 @@ class TestDevsigHeader:
def test_from_bytes(self) -> None:
"""Test parsing a header from bytes."""
- header_bytes = b'v=1; a=ed25519-sha256; t=1623456789; i=test@example.com; bh=abcd1234'
+ header_bytes = (
+ b'v=1; a=ed25519-sha256; t=1623456789; i=test@example.com; bh=abcd1234'
+ )
header = DevsigHeader(header_bytes)
assert header.get_field_as_str('v') == '1'
@@ -41,7 +42,7 @@ class TestDevsigHeader:
def test_set_body(self) -> None:
"""Test setting the body and calculating the body hash."""
header = DevsigHeader()
- body = b"This is a test body"
+ body = b'This is a test body'
header.set_body(body)
@@ -55,7 +56,7 @@ class TestDevsigHeader:
def test_set_body_with_maxlen(self) -> None:
"""Test setting the body with a maxlen parameter."""
header = DevsigHeader()
- body = b"This is a test body"
+ body = b'This is a test body'
maxlen = 10
header.set_body(body, maxlen=maxlen)
@@ -95,17 +96,17 @@ class TestDevsigHeader:
"""Test that sanity_check fails if required fields are not set."""
header = DevsigHeader()
- with pytest.raises(RuntimeError, match="Must set \"a\" field first"):
+ with pytest.raises(RuntimeError, match='Must set "a" field first'):
header.sanity_check()
header.set_field('a', 'ed25519-sha256')
- with pytest.raises(RuntimeError, match="Must use set_body first"):
+ with pytest.raises(RuntimeError, match='Must use set_body first'):
header.sanity_check()
- header.set_body(b"Test body")
+ header.set_body(b'Test body')
- with pytest.raises(RuntimeError, match="Must use set_headers first"):
+ with pytest.raises(RuntimeError, match='Must use set_headers first'):
header.sanity_check()
# @pytest.mark.skipif(True, reason="Requires actual ed25519 keys")
diff --git a/tests/unit/test_get_algo_keydata.py b/tests/unit/test_get_algo_keydata.py
index 545f051..c4ccc8d 100644
--- a/tests/unit/test_get_algo_keydata.py
+++ b/tests/unit/test_get_algo_keydata.py
@@ -41,7 +41,10 @@ class TestGetAlgoKeydataSSHSigningKey:
def test_ssh_format_uses_openssh(self, mock_gcfg: MagicMock) -> None:
"""When gpg.format=ssh, user.signingkey should get the openssh: prefix."""
mock_gcfg.side_effect = _make_mock_get_config(
- usercfg={'email': 'test@example.com', 'signingkey': '/home/user/.ssh/id_ed25519.pub'},
+ usercfg={
+ 'email': 'test@example.com',
+ 'signingkey': '/home/user/.ssh/id_ed25519.pub',
+ },
gpgcfg={'format': 'ssh'},
)
config: GitConfigType = {'identity': 'test@example.com'}
@@ -91,7 +94,9 @@ class TestGetAlgoKeydataSSHSigningKey:
get_algo_keydata(config)
@patch('patatt.get_config_from_git')
- def test_patatt_signingkey_skips_user_signingkey(self, mock_gcfg: MagicMock) -> None:
+ def test_patatt_signingkey_skips_user_signingkey(
+ self, mock_gcfg: MagicMock
+ ) -> None:
"""When patatt.signingkey is already set, user.signingkey is not consulted."""
mock_gcfg.side_effect = _make_mock_get_config(
usercfg={'email': 'test@example.com', 'signingkey': 'SHOULD_NOT_BE_USED'},
diff --git a/tests/unit/test_patatt_message.py b/tests/unit/test_patatt_message.py
index 58338f4..17d4a30 100644
--- a/tests/unit/test_patatt_message.py
+++ b/tests/unit/test_patatt_message.py
@@ -6,7 +6,6 @@ from patatt import PatattMessage
class TestPatattMessage:
-
def test_initialization(self, sample_email_bytes: bytes) -> None:
"""Test initialization of PatattMessage with sample email."""
message = PatattMessage(sample_email_bytes)
@@ -25,7 +24,7 @@ This is a test body.
message = PatattMessage(email_bytes)
assert len(message.headers) == 2
- assert message.body == b"This is a test body.\n"
+ assert message.body == b'This is a test body.\n'
def test_as_bytes(self, sample_email_bytes: bytes) -> None:
"""Test converting message back to bytes."""
@@ -44,17 +43,22 @@ This is a test body.
assert isinstance(output, str)
assert output == sample_email_bytes.decode()
- def test_git_canonicalize(self, monkeypatch: pytest.MonkeyPatch, sample_email_bytes: bytes) -> None:
+ def test_git_canonicalize(
+ self, monkeypatch: pytest.MonkeyPatch, sample_email_bytes: bytes
+ ) -> None:
"""Test git canonicalization of message."""
+
# Mock _get_git_mailinfo to avoid actual git command execution
def mock_get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]:
# Return mock metadata, patch, and info
- metadata = b"Author: Test User\nEmail: test@example.com\nSubject: Test email\n"
- patch = b"This is a test body.\n"
- info = b"email: test@example.com\nauthor: Test User\nsubject: Test email\n"
+ metadata = (
+ b'Author: Test User\nEmail: test@example.com\nSubject: Test email\n'
+ )
+ patch = b'This is a test body.\n'
+ info = b'email: test@example.com\nauthor: Test User\nsubject: Test email\n'
return metadata, patch, info
- monkeypatch.setattr(PatattMessage, "_get_git_mailinfo", mock_get_git_mailinfo)
+ monkeypatch.setattr(PatattMessage, '_get_git_mailinfo', mock_get_git_mailinfo)
message = PatattMessage(sample_email_bytes)
message.git_canonicalize()
@@ -62,7 +66,7 @@ This is a test body.
# Check that the message was canonicalized
assert message.canon_body is not None
assert message.canon_headers is not None
- assert message.canon_identity == "test@example.com"
+ assert message.canon_identity == 'test@example.com'
# @pytest.mark.skipif(True, reason="Requires actual signing setup")
# def test_sign(self) -> None:
--
2.53.0
next prev parent reply other threads:[~2026-04-20 1:23 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-04-20 1:22 [PATCH patatt 0/7] Harden local checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 1/7] Add local CI script Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 2/7] Add Ruff import checks Tamir Duberstein
2026-04-20 1:22 ` Tamir Duberstein [this message]
2026-04-20 1:22 ` [PATCH patatt 4/7] Add pyright strict checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 5/7] Add ty checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 6/7] Reduce dictionary lookups Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 7/7] Import PyNaCl unconditionally Tamir Duberstein
2026-04-27 20:20 ` [PATCH patatt 0/7] Harden local checks Tamir Duberstein
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260419-stronger-type-checking-v1-3-5c108048d2c7@kernel.org \
--to=tamird@kernel.org \
--cc='test@example.com'} \
--cc=konstantin@linuxfoundation.org \
--cc=tools@kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox