From: Tamir Duberstein <tamird@kernel.org>
To: "Kernel.org Tools" <tools@kernel.org>
Cc: Konstantin Ryabitsev <konstantin@linuxfoundation.org>,
Tamir Duberstein <tamird@kernel.org>
Subject: [PATCH patatt 4/7] Add pyright strict checks
Date: Sun, 19 Apr 2026 21:22:24 -0400 [thread overview]
Message-ID: <20260419-stronger-type-checking-v1-4-5c108048d2c7@kernel.org> (raw)
In-Reply-To: <20260419-stronger-type-checking-v1-0-5c108048d2c7@kernel.org>
Configure pyright in strict mode and run it from the local CI helper.
Tighten ambiguous containers, cached binary paths, and result tuples so
strict analysis has concrete types. Keep private-usage checks disabled
only for tests because the existing tests assert internal state
directly.
Signed-off-by: Tamir Duberstein <tamird@kernel.org>
---
ci.sh | 1 +
pyproject.toml | 7 ++-
src/patatt/__init__.py | 105 ++++++++++++++++++------------------
tests/test_validation.py | 2 +-
| 3 +-
tests/unit/test_get_algo_keydata.py | 2 +-
6 files changed, 62 insertions(+), 58 deletions(-)
diff --git a/ci.sh b/ci.sh
index ec0baf8..4b07fa2 100755
--- a/ci.sh
+++ b/ci.sh
@@ -5,4 +5,5 @@ set -eu
uv run ruff format --check
uv run ruff check
uv run mypy .
+uv run pyright
uv run pytest --durations=0
diff --git a/pyproject.toml b/pyproject.toml
index d56d828..9615a41 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -28,6 +28,7 @@ dependencies = [
dev = [
"mypy",
"pip-tools",
+ "pyright",
"pytest",
"ruff",
]
@@ -62,7 +63,11 @@ flake8-quotes.inline-quotes = "single"
quote-style = "single"
[tool.pyright]
-typeCheckingMode = "off"
+typeCheckingMode = "strict"
+
+executionEnvironments = [
+ { root = "tests", reportPrivateUsage = false },
+]
# Configure mypy in strict mode
[tool.mypy]
diff --git a/src/patatt/__init__.py b/src/patatt/__init__.py
index 24ec933..7087479 100644
--- a/src/patatt/__init__.py
+++ b/src/patatt/__init__.py
@@ -22,15 +22,18 @@ import urllib.parse
import warnings
from io import BytesIO
from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
GitConfigType = Dict[str, Union[str, List[str]]]
+AttestationResult = Tuple[
+ int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]
+]
logger: logging.Logger = logging.getLogger(__name__)
# Overridable via [patatt] parameters
-GPGBIN: Optional[str] = None
-SSHKBIN: Optional[str] = None
+_gpgbin_path: Optional[str] = None
+_sshkbin_path: Optional[str] = None
# Hardcoded defaults
DEVSIG_HDR = b'X-Developer-Signature'
@@ -233,8 +236,8 @@ class DevsigHeader:
SigningError: If required headers are missing (sign mode).
ValidationError: If required headers are not signed (validate mode).
"""
- parsed = list()
- allhdrs = set()
+ parsed: List[Tuple[bytes, bytes]] = list()
+ allhdrs: Set[bytes] = set()
# DKIM operates on headers in reverse order
for header in reversed(headers):
try:
@@ -256,7 +259,7 @@ class DevsigHeader:
% (b', '.join(reqset.difference(allhdrs)).decode())
)
# Add optional headers that are actually present
- optpresent = list(allhdrs.intersection(optset))
+ optpresent: List[bytes] = list(allhdrs.intersection(optset))
signlist = REQ_HDRS + sorted(optpresent)
self.hdata['h'] = b':'.join(signlist)
@@ -411,15 +414,11 @@ class DevsigHeader:
if isinstance(keyinfo, bytes):
bkeyinfo = keyinfo
skeyinfo = keyinfo.decode()
- elif isinstance(keyinfo, str):
+ else:
skeyinfo = keyinfo
bkeyinfo = keyinfo.encode()
- else:
- raise RuntimeError(
- 'keyinfo must be a string or bytes, not %s' % type(keyinfo).__name__
- )
- hparts = list()
+ hparts: List[bytes] = list()
for fn in self._order:
fv = self.get_field_as_bytes(fn)
if fv is not None:
@@ -554,7 +553,7 @@ class DevsigHeader:
'-s',
spath,
]
- ecode, out, err = sshk_run_command(sshkargs, payload)
+ ecode, _out, err = sshk_run_command(sshkargs, payload)
if ecode > 0:
raise ValidationError(
'Failed to validate openssh signature',
@@ -672,7 +671,7 @@ class DevsigHeader:
@staticmethod
def splitter(longstr: bytes, limit: int = 75) -> bytes:
- splitstr = list()
+ splitstr: List[bytes] = list()
first = True
while len(longstr) > limit:
at = limit
@@ -767,7 +766,7 @@ class PatattMessage:
for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
self.canon_body += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
- idata = dict()
+ idata: Dict[bytes, bytes] = dict()
for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'):
left, right = line.split(b':', 1)
idata[left.lower()] = right.strip()
@@ -1102,13 +1101,15 @@ def get_config_from_git(
if cfgkey in multivals:
if cfgkey not in gitconfig:
- gitconfig[cfgkey] = list()
- elif isinstance(gitconfig[cfgkey], str):
- gitconfig[cfgkey] = [gitconfig[cfgkey]] # type: ignore[list-item]
+ values: List[str] = list()
else:
- gitconfig[cfgkey] = list()
- # We've made sure this is a list
- gitconfig[cfgkey].append(value) # type: ignore[union-attr]
+ cfgvalue = gitconfig[cfgkey]
+ if isinstance(cfgvalue, str):
+ values = [cfgvalue]
+ else:
+ values = cfgvalue
+ values.append(value)
+ gitconfig[cfgkey] = values
else:
gitconfig[cfgkey] = value
except ValueError:
@@ -1143,7 +1144,7 @@ def get_git_toplevel(gitdir: Optional[str] = None) -> str:
if gitdir:
cmdargs += ['--git-dir', gitdir]
cmdargs += ['rev-parse', '--show-toplevel']
- ecode, out, err = _run_command(cmdargs)
+ ecode, out, _err = _run_command(cmdargs)
if ecode == 0:
return out.decode().strip()
return ''
@@ -1151,7 +1152,7 @@ def get_git_toplevel(gitdir: Optional[str] = None) -> str:
def get_git_dir() -> str:
cmdargs = ['git', 'rev-parse', '--git-dir']
- ecode, out, err = _run_command(cmdargs)
+ ecode, out, _err = _run_command(cmdargs)
if ecode == 0:
return out.decode().strip()
return ''
@@ -1264,7 +1265,7 @@ def get_public_key(
if not gitref:
# What is our current ref?
cmdargs = ['symbolic-ref', 'HEAD']
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
gitref = out.decode().strip()
if not gitref:
@@ -1272,7 +1273,7 @@ def get_public_key(
keysrc = f'{gitref}:{subpath}'
cmdargs = ['show', keysrc]
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
# Handle one level of symlinks
if out.find(b'\n') < 0 < out.find(b'/'):
@@ -1282,7 +1283,7 @@ def get_public_key(
)
keysrc = f'{gitref}:{linktgt}'
cmdargs = ['show', keysrc]
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
logger.debug('KEYSRC : %s (symlinked)', keysrc)
return out, 'ref:%s:%s' % (gittop, keysrc)
@@ -1301,7 +1302,7 @@ def get_public_key(
byhash_subpath = Path(gitsub) / byhash_keypath
keysrc = f'{gitref}:{byhash_subpath}'
cmdargs = ['show', keysrc]
- ecode, out, err = git_run_command(gittop, cmdargs)
+ ecode, out, _err = git_run_command(gittop, cmdargs)
if ecode == 0:
logger.debug('KEYSRC : %s (by-hash)', keysrc)
return out, 'ref:%s:%s' % (gittop, keysrc)
@@ -1342,7 +1343,7 @@ def _load_messages(cmdargs: argparse.Namespace) -> Dict[str, bytes]:
if len(cmdargs.msgfile):
# Load all message from the files passed to make sure they all parse correctly
- messages = dict()
+ messages: Dict[str, bytes] = dict()
for msgfile in cmdargs.msgfile:
with open(msgfile, 'rb') as fh:
messages[msgfile] = fh.read()
@@ -1382,30 +1383,30 @@ def sign_message(
def set_bin_paths(config: Optional[GitConfigType]) -> Tuple[str, str]:
- global GPGBIN, SSHKBIN
- if GPGBIN is None:
+ global _gpgbin_path, _sshkbin_path
+ if _gpgbin_path is None:
if config and config.get('gpg-bin'):
_gpgbin = config.get('gpg-bin')
- assert isinstance(GPGBIN, str), 'gpg-bin must be a string'
- GPGBIN = _gpgbin
+ assert isinstance(_gpgbin, str), 'gpg-bin must be a string'
+ _gpgbin_path = _gpgbin
elif (_gpgbin := get_config_from_git(r'gpg\..*').get('program')) is not None:
assert isinstance(_gpgbin, str), 'gpg program must be a string'
- GPGBIN = _gpgbin
+ _gpgbin_path = _gpgbin
else:
- GPGBIN = 'gpg'
- if SSHKBIN is None:
+ _gpgbin_path = 'gpg'
+ if _sshkbin_path is None:
if config and config.get('ssh-keygen-bin'):
_sshkbin = config.get('ssh-keygen-bin')
assert isinstance(_sshkbin, str), 'ssh-keygen-bin must be a string'
- SSHKBIN = _sshkbin
+ _sshkbin_path = _sshkbin
elif (
_sshkbin := get_config_from_git(r'gpg\..*', section='ssh').get('program')
) is not None:
assert isinstance(_sshkbin, str), 'program must be a string'
- SSHKBIN = _sshkbin
+ _sshkbin_path = _sshkbin
else:
- SSHKBIN = 'ssh-keygen'
- return GPGBIN, SSHKBIN
+ _sshkbin_path = 'ssh-keygen'
+ return _gpgbin_path, _sshkbin_path
def get_algo_keydata(config: GitConfigType) -> Tuple[str, str]:
@@ -1574,9 +1575,7 @@ def cmd_sign(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
def validate_message(
msgdata: bytes, sources: List[str], trim_body: bool = False
-) -> List[
- Tuple[int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]]
-]:
+) -> List[AttestationResult]:
"""Validate all signatures in an RFC2822 message.
Args:
@@ -1590,11 +1589,7 @@ def validate_message(
Result codes: RES_VALID, RES_BADSIG, RES_NOKEY, RES_NOSIG, RES_ERROR
"""
- attestations: List[
- Tuple[
- int, Optional[str], Optional[str], Optional[str], Optional[str], List[str]
- ]
- ] = list()
+ attestations: List[AttestationResult] = list()
pm = PatattMessage(msgdata)
if not pm.signed:
logger.debug('message is not signed')
@@ -1605,7 +1600,7 @@ def validate_message(
# Find all identities for which we have public keys
for ds in pm.get_sigs():
- errors = list()
+ errors: List[str] = list()
a = ds.get_field_as_str('a')
i = ds.get_field_as_str('i')
s = ds.get_field_as_str('s')
@@ -1676,9 +1671,9 @@ def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
logger.critical('E: %s', ex)
sys.exit(1)
- messages = dict()
+ messages: Dict[str, bytes] = dict()
for msg in mbox:
- subject = msg.get('Subject', 'No subject')
+ subject = str(msg.get('Subject', 'No subject'))
messages[subject] = msg.as_bytes()
else:
try:
@@ -1689,9 +1684,11 @@ def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
ddir = get_data_dir()
pdir = ddir / 'public'
- sources = config.get('keyringsrc', list())
- if not isinstance(sources, list):
- sources = [sources]
+ raw_sources = config.get('keyringsrc', list())
+ if isinstance(raw_sources, list):
+ sources = raw_sources
+ else:
+ sources = [raw_sources]
if str(pdir) not in sources:
sources.append(str(pdir))
@@ -1705,7 +1702,7 @@ def cmd_validate(cmdargs: argparse.Namespace, config: GitConfigType) -> None:
for fn, msgdata in messages.items():
try:
attestations = validate_message(msgdata, sources, trim_body=trim_body)
- for result, identity, signtime, keysrc, algo, errors in attestations:
+ for result, identity, _signtime, keysrc, _algo, errors in attestations:
if result > highest_err:
highest_err = result
diff --git a/tests/test_validation.py b/tests/test_validation.py
index 1477fbe..c399f14 100644
--- a/tests/test_validation.py
+++ b/tests/test_validation.py
@@ -36,5 +36,5 @@ def test_validate(sample_file: str) -> None:
# Print validation details for debugging
print(f'Found {len(valid_signatures)} valid signatures:')
for result in valid_signatures:
- status, algo, keytype, identity, selector, errors = result
+ _status, _algo, keytype, identity, selector, _errors = result
print(f' - {keytype} signature by {identity} ({selector})')
--git a/tests/unit/test_devsig_header.py b/tests/unit/test_devsig_header.py
index 7b6e461..d94bdc9 100644
--- a/tests/unit/test_devsig_header.py
+++ b/tests/unit/test_devsig_header.py
@@ -1,6 +1,7 @@
import base64
import hashlib
from io import BytesIO
+from typing import List
import pytest
@@ -76,7 +77,7 @@ class TestDevsigHeader:
header = DevsigHeader()
# Parse the sample email to get headers
- headers = []
+ headers: List[bytes] = []
with BytesIO(sample_email_bytes) as fh:
while True:
line = fh.readline()
diff --git a/tests/unit/test_get_algo_keydata.py b/tests/unit/test_get_algo_keydata.py
index c4ccc8d..4d9b7e6 100644
--- a/tests/unit/test_get_algo_keydata.py
+++ b/tests/unit/test_get_algo_keydata.py
@@ -119,7 +119,7 @@ class TestGetAlgoKeydataSSHSigningKey:
gpgcfg={'format': 'ssh'},
)
config: GitConfigType = {}
- algo, keydata = get_algo_keydata(config)
+ algo, _keydata = get_algo_keydata(config)
assert config['identity'] == 'auto@example.com'
assert algo == 'openssh'
--
2.53.0
next prev parent reply other threads:[~2026-04-20 1:24 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-04-20 1:22 [PATCH patatt 0/7] Harden local checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 1/7] Add local CI script Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 2/7] Add Ruff import checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 3/7] Add Ruff format check Tamir Duberstein
2026-04-20 1:22 ` Tamir Duberstein [this message]
2026-04-20 1:22 ` [PATCH patatt 5/7] Add ty checks Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 6/7] Reduce dictionary lookups Tamir Duberstein
2026-04-20 1:22 ` [PATCH patatt 7/7] Import PyNaCl unconditionally Tamir Duberstein
2026-04-27 20:20 ` [PATCH patatt 0/7] Harden local checks Tamir Duberstein
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260419-stronger-type-checking-v1-4-5c108048d2c7@kernel.org \
--to=tamird@kernel.org \
--cc=konstantin@linuxfoundation.org \
--cc=tools@kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.