From: Tamir Duberstein <tamird@kernel.org>
To: "Kernel.org Tools" <tools@kernel.org>
Cc: Konstantin Ryabitsev <konstantin@linuxfoundation.org>,
Tamir Duberstein <tamird@kernel.org>
Subject: [PATCH patatt 4/7] Add pyright strict checks
Date: Sun, 19 Apr 2026 21:22:24 -0400 [thread overview]
Message-ID: <20260419-stronger-type-checking-v1-4-5c108048d2c7@kernel.org> (raw)
In-Reply-To: <20260419-stronger-type-checking-v1-0-5c108048d2c7@kernel.org>
Configure pyright in strict mode and run it from the local CI helper.
Tighten ambiguous containers, cached binary paths, and result tuples so
strict analysis has concrete types. Keep private-usage checks disabled
only for tests because the existing tests assert internal state
directly.
Signed-off-by: Tamir Duberstein <tamird@kernel.org>
---
ci.sh | 1 +
pyproject.toml | 7 ++-
src/patatt/__init__.py | 105 ++++++++++++++++++------------------
tests/test_validation.py | 2 +-
| 3 +-
tests/unit/test_get_algo_keydata.py | 2 +-
6 files changed, 62 insertions(+), 58 deletions(-)
diff --git a/ci.sh b/ci.sh
index ec0baf8..4b07fa2 100755
--- a/ci.sh
+++ b/ci.sh
@@ -5,4 +5,5 @@ set -eu
uv run ruff format --check
uv run ruff check
uv run mypy .
+uv run pyright
uv run pytest --durations=0
diff --git a/pyproject.toml b/pyproject.toml
index d56d828..9615a41 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -28,6 +28,7 @@ dependencies = [
dev = [
"mypy",
"pip-tools",
+ "pyright",
"pytest",
"ruff",
]
@@ -62,7 +63,11 @@ flake8-quotes.inline-quotes = "single"
quote-style = "single"
[tool.pyright]
-typeCheckingMode = "off"
+typeCheckingMode = "strict"
+
+executionEnvironments = [
+ { root = "tests", reportPrivateUsage = false },
+]
# Configure mypy in strict mode
[tool.mypy]
diff --git a/src/patatt/__init__.py b/src/patatt/__init__.py
index 24ec933..7087479 100644
--- a/src/patatt/__init__.py
+++ b/src/patatt/__init__.py
@@ -22,15 +22,18 @@ import urllib.parse
import warnings
from io import BytesIO
from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
GitConfigType = Dict[str, Union[str, List[str]]]
+AttestationResult = Tuple[
+ int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]
+]
logger: logging.Logger = logging.getLogger(__name__)
# Overridable via [patatt] parameters
-GPGBIN: Optional[str] = None
-SSHKBIN: Optional[str] = None
+_gpgbin_path: Optional[str] = None
+_sshkbin_path: Optional[str] = None
# Hardcoded defaults
DEVSIG_HDR = b'X-Developer-Signature'
@@ -233,8 +236,8 @@ class DevsigHeader:
SigningError: If required headers are missing (sign mode).
ValidationError: If required headers are not signed (validate mode).
"""
- parsed = list()
- allhdrs = set()
+ parsed: List[Tuple[bytes, bytes]] = list()
+ allhdrs: Set[bytes] = set()
# DKIM operates on headers in reverse order
for header in reversed(headers):
try:
@@ -256,7 +259,7 @@ class DevsigHeader:
% (b', '.join(reqset.difference(allhdrs)).decode())
)
# Add optional headers that are actually present
- optpresent = list(allhdrs.intersection(optset))
+ optpresent: List[bytes] = list(allhdrs.intersection(optset))
signlist = REQ_HDRS + sorted(optpresent)
self.hdata['h'] = b':'.join(signlist)
@@ -411,15 +414,11 @@ class DevsigHeader:
if isinstance(keyinfo, bytes):
bkeyinfo = keyinfo
skeyinfo = keyinfo.decode()
- elif isinstance(keyinfo, str):
+ else:
skeyinfo = keyinfo
bkeyinfo = keyinfo.encode()
- else:
- raise RuntimeError(
- 'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
- )
- hparts = list()
+ hparts: List[bytes] = list()
for fn in self._order:
fv = self.get_field_as_bytes(fn)
if fv is not None:
@@ -554,7 +553,7 @@ class DevsigHeader:
'-s',
spath,
]
- ecode, out, err = sshk_run_command(sshkargs, payload)
+ ecode, _out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
raise ValidationError(
'Failed to validate openssh signature',
@@ -672,7 +671,7 @@ class DevsigHeader:
@staticmethod
def splitter(longstr: bytes, limit: int = 75) -> bytes:
- splitstr = list()
+ splitstr: List[bytes] = list()
first = True
while len(longstr) > limit:
at = limit
@@ -767,7 +766,7 @@ class PatattMessage:
for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
- idata = dict()
+ idata: Dict[bytes, bytes] = dict()
for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'):
left, right = line.split(b':', 1)
idata[left.lower()] = right.strip()
@@ -1102,13 +1101,15 @@ def get_config_from_git(
if cfgkey in multivals:
if cfgkey not in gitconfig:
- gitconfig[cfgkey] = list()
- elif isinstance(gitconfig[cfgkey], str):
- gitconfig[cfgkey] = [gitconfig[cfgkey]] # type: ignore[list-item]
+ values: List[str] = list()
else:
- gitconfig[cfgkey] = list()
- # We've made sure this is a list
- gitconfig[cfgkey].append(value) # type: ignore[union-attr]
+ cfgvalue = gitconfig[cfgkey]
+ if isinstance(cfgvalue, str):
+ values = [cfgvalue]
+ else:
+ values = cfgvalue
+ values.append(value)
+ gitconfig[cfgkey] = values
else:
gitconfig[cfgkey] = value
except ValueError:
@@ -1143,7 +1144,7 @@ def get_git_toplevel(gitdir: Optional[str] = None) -> str:
if gitdir:
cmdargs += ['--git-dir', gitdir]
cmdargs += ['rev-parse', '--show-toplevel']
- ecode, out, err = _run_command(cmdargs)
+ ecode, out, _err = _run_command(cmdargs)
if ecode == 0:
return out.decode().strip()
return ''
@@ -1151,7 +1152,7 @@ def get_git_toplevel(gitdir: Optional[str] = None) -> str:
def get_git_dir() -> str:
cmdargs = ['git', 'rev-parse', '--git-dir']
- ecode, out, err = _run_command(cmdargs)
+ ecode, out, _err = _run_command(cmdargs)
if ecode == 0:
return out.decode().strip()
return ''
@@ -1264,7 +1265,7 @@ def get_public_key(
if not gitref:
# What is our current ref?
cmdargs = ['symbolic-ref', 'HEAD']
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
gitref = out.decode().strip()
if not gitref:
@@ -1272,7 +1273,7 @@ def get_public_key(
keysrc = f'{gitref}:{subpath}'
cmdargs = ['show', keysrc]
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
# Handle one level of symlinks
if out.find(b'\n') < 0 < out.find(b'/'):
@@ -1282,7 +1283,7 @@ def get_public_key(
)
keysrc = f'{gitref}:{linktgt}'
cmdargs = ['show', keysrc]
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
logger.debug('KEYSRC : %s (symlinked)', keysrc)
return out, 'ref:%s:%s' % (gittop, keysrc)
@@ -1301,7 +1302,7 @@ def get_public_key(
byhash_subpath = Path(gitsub) / byhash_keypath
keysrc = f'{gitref}:{byhash_subpath}'
cmdargs = ['show', keysrc]
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
logger.debug('KEYSRC : %s (by-hash)', keysrc)
return out, 'ref:%s:%s' % (gittop, keysrc)
@@ -1342,7 +1343,7 @@ def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
if len(cmdargs.msgfile):
# Load all message from the files passed to make sure they all parse correctly
- messages = dict()
+ messages: Dict[str, bytes] = dict()
for msgfile in cmdargs.msgfile:
with open(msgfile, 'rb') as fh:
messages[msgfile] = fh.read()
@@ -1382,30 +1383,30 @@ def sign_message(
def set_bin_paths(config: Optional[GitConfigType]) -> Tuple[str, str]:
- global GPGBIN, SSHKBIN
- if GPGBIN is None:
+ global _gpgbin_path, _sshkbin_path
+ if _gpgbin_path is None:
if config and config.get('gpg-bin'):
_gpgbin = config.get('gpg-bin')
- assert isinstance(GPGBIN, str), 'gpg-bin must be a string'
- GPGBIN = _gpgbin
+ assert isinstance(_gpgbin, str), 'gpg-bin must be a string'
+ _gpgbin_path = _gpgbin
elif (_gpgbin := get_config_from_git(r'gpg\..*').get('program')) is not None:
assert isinstance(_gpgbin, str), 'gpg program must be a string'
- GPGBIN = _gpgbin
+ _gpgbin_path = _gpgbin
else:
- GPGBIN = 'gpg'
- if SSHKBIN is None:
+ _gpgbin_path = 'gpg'
+ if _sshkbin_path is None:
if config and config.get('ssh-keygen-bin'):
_sshkbin = config.get('ssh-keygen-bin')
assert isinstance(_sshkbin, str), 'ssh-keygen-bin must be a string'
- SSHKBIN = _sshkbin
+ _sshkbin_path = _sshkbin
elif (
_sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')
) is not None:
assert isinstance(_sshkbin, str), 'program must be a string'
- SSHKBIN = _sshkbin
+ _sshkbin_path = _sshkbin
else:
- SSHKBIN = 'ssh-keygen'
- return GPGBIN, SSHKBIN
+ _sshkbin_path = 'ssh-keygen'
+ return _gpgbin_path, _sshkbin_path
def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
@@ -1574,9 +1575,7 @@ def cmd_sign(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
def validate_message(
msgdata: bytes, sources: List[str], trim_body: bool = False
-) -> List[
- Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]
-]:
+) -> List[AttestationResult]:
"""Validate all signatures in an RFC2822 message.
Args:
@@ -1590,11 +1589,7 @@ def validate_message(
Result codes: RES_VALID, RES_BADSIG, RES_NOKEY, RES_NOSIG, RES_ERROR
"""
- attestations: List[
- Tuple[
- int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]
- ]
- ] = list()
+ attestations: List[AttestationResult] = list()
pm = PatattMessage(msgdata)
if not pm.signed:
logger.debug('message is not signed')
@@ -1605,7 +1600,7 @@ def validate_message(
# Find all identities for which we have public keys
for ds in pm.get_sigs():
- errors = list()
+ errors: List[str] = list()
a = ds.get_field_as_str('a')
i = ds.get_field_as_str('i')
s = ds.get_field_as_str('s')
@@ -1676,9 +1671,9 @@ def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
logger.critical('E: %s', ex)
sys.exit(1)
- messages = dict()
+ messages: Dict[str, bytes] = dict()
for msg in mbox:
- subject = msg.get('Subject', 'No subject')
+ subject = str(msg.get('Subject', 'No subject'))
messages[subject] = msg.as_bytes()
else:
try:
@@ -1689,9 +1684,11 @@ def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
ddir = get_data_dir()
pdir = ddir / 'public'
- sources = config.get('keyringsrc', list())
- if not isinstance(sources, list):
- sources = [sources]
+ raw_sources = config.get('keyringsrc', list())
+ if isinstance(raw_sources, list):
+ sources = raw_sources
+ else:
+ sources = [raw_sources]
if str(pdir) not in sources:
sources.append(str(pdir))
@@ -1705,7 +1702,7 @@ def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
for fn, msgdata in messages.items():
try:
attestations = validate_message(msgdata, sources, trim_body=trim_body)
- for result, identity, signtime, keysrc, algo, errors in attestations:
+ for result, identity, _signtime, keysrc, _algo, errors in attestations:
if result > highest_err:
highest_err = result
diff --git a/tests/test_validation.py b/tests/test_validation.py
index 1477fbe..c399f14 100644
--- a/tests/test_validation.py
+++ b/tests/test_validation.py
@@ -36,5 +36,5 @@ def test_validate(sample_file: str) -> None:
# Print validation details for debugging
print(f'Found {len(valid_signatures)} valid signatures:')
for result in valid_signatures:
- status, algo, keytype, identity, selector, errors = result
+ _status, _algo, keytype, identity, selector, _errors = result
print(f' - {keytype} signature by {identity} ({selector})')
--git a/tests/unit/test_devsig_header.py b/tests/unit/test_devsig_header.py
index 7b6e461..d94bdc9 100644
--- a/tests/unit/test_devsig_header.py
+++ b/tests/unit/test_devsig_header.py
@@ -1,6 +1,7 @@
import base64
import hashlib
from io import BytesIO
+from typing import List
import pytest
@@ -76,7 +77,7 @@ class TestDevsigHeader:
header = DevsigHeader()
# Parse the sample email to get headers
- headers = []
+ headers: List[bytes] = []
with BytesIO(sample_email_bytes) as fh:
while True:
line = fh.readline()
diff --git a/tests/unit/test_get_algo_keydata.py b/tests/unit/test_get_algo_keydata.py
index c4ccc8d..4d9b7e6 100644
--- a/tests/unit/test_get_algo_keydata.py
+++ b/tests/unit/test_get_algo_keydata.py
@@ -119,7 +119,7 @@ class TestGetAlgoKeydataSSHSigningKey:
gpgcfg={'format': 'ssh'},
)
config: GitConfigType = {}
- algo, keydata = get_algo_keydata(config)
+ algo, _keydata = get_algo_keydata(config)
assert config['identity'] == 'auto@example.com'
assert algo == 'openssh'
--
2.53.0
next prev parent reply other threads:[~2026-04-20 1:24 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-04-20 1:22 [PATCH patatt 0/7] Harden local checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 1/7] Add local CI script Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 2/7] Add Ruff import checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 3/7] Add Ruff format check Tamir Duberstein
2026-04-20 1:22 ` Tamir Duberstein [this message]
2026-04-20 1:22 ` [PATCH patatt 5/7] Add ty checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 6/7] Reduce dictionary lookups Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 7/7] Import PyNaCl unconditionally Tamir Duberstein
2026-04-27 20:20 ` [PATCH patatt 0/7] Harden local checks Tamir Duberstein
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260419-stronger-type-checking-v1-4-5c108048d2c7@kernel.org \
--to=tamird@kernel.org \
--cc=konstantin@linuxfoundation.org \
--cc=tools@kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox