* [libgpiod][PATCH v2 1/3] bindings: python: tests: add multi-threaded tests
2026-05-27 21:59 [libgpiod][PATCH v2 0/3] bindings: python: add support for free-threaded Python Vincent Fazio
@ 2026-05-27 21:59 ` Vincent Fazio
2026-05-27 21:59 ` [libgpiod][PATCH v2 2/3] bindings: python: support free-threaded CPython Vincent Fazio
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Vincent Fazio @ 2026-05-27 21:59 UTC (permalink / raw)
To: linux-gpio; +Cc: brgl, Vincent Fazio
Add new multi-threaded test cases to ensure the bindings operate as
expected and do not lead to crashes or data corruption.
Signed-off-by: Vincent Fazio <vfazio@gmail.com>
---
bindings/python/tests/__main__.py | 1 +
bindings/python/tests/helpers.py | 5 +
bindings/python/tests/meson.build | 1 +
bindings/python/tests/tests_threading.py | 742 +++++++++++++++++++++++
4 files changed, 749 insertions(+)
create mode 100644 bindings/python/tests/tests_threading.py
diff --git a/bindings/python/tests/__main__.py b/bindings/python/tests/__main__.py
index 318e0df..591e20d 100644
--- a/bindings/python/tests/__main__.py
+++ b/bindings/python/tests/__main__.py
@@ -14,6 +14,7 @@ from .tests_line_info import *
from .tests_line_request import *
from .tests_line_settings import *
from .tests_module import *
+from .tests_threading import *
set_process_name("python-gpiod")
diff --git a/bindings/python/tests/helpers.py b/bindings/python/tests/helpers.py
index 4abd8b2..a6c02de 100644
--- a/bindings/python/tests/helpers.py
+++ b/bindings/python/tests/helpers.py
@@ -4,6 +4,7 @@
from __future__ import annotations
import os
+import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -25,3 +26,7 @@ class LinkGuard:
tb: TracebackType | None,
) -> None:
os.unlink(self.dst)
+
+
+def is_free_threaded() -> bool:
+ return hasattr(sys, "_is_gil_enabled") and not sys._is_gil_enabled()
diff --git a/bindings/python/tests/meson.build b/bindings/python/tests/meson.build
index 16d84cc..3ae36a4 100644
--- a/bindings/python/tests/meson.build
+++ b/bindings/python/tests/meson.build
@@ -19,6 +19,7 @@ foreach f : [
'tests_line_request.py',
'tests_line_settings.py',
'tests_module.py',
+ 'tests_threading.py',
]
fs.copyfile(f)
endforeach
diff --git a/bindings/python/tests/tests_threading.py b/bindings/python/tests/tests_threading.py
new file mode 100644
index 0000000..a61af5b
--- /dev/null
+++ b/bindings/python/tests/tests_threading.py
@@ -0,0 +1,742 @@
+import errno
+import fcntl
+import os
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from contextlib import nullcontext
+from select import EPOLLIN, epoll
+from typing import TYPE_CHECKING, ClassVar
+from unittest import TestCase
+
+import gpiod
+from gpiod.line import Direction, Edge, Value
+
+from . import gpiosim
+from .helpers import is_free_threaded
+
+if TYPE_CHECKING:
+ from contextlib import AbstractContextManager
+
+
+# Threading & the CPython bindings as they relate to the C extension:
+#
+# Python is sometimes mistakenly considered thread-safe but this is not the
+# case even with GIL enabled builds as there can still be data races between
+# threads on pure Python objects.
+#
+# What is guaranteed is ref counts, memory management, etc being handled safely.
+# Mutations on objects like dicts/lists are _not_ guaranteed to be safe.
+#
+# Of the objects exposed by the bindings, the following are effectively "frozen":
+# * ChipInfo
+# * LineInfo
+# * InfoEvent
+# * EdgeEvent
+# * gpiod.line Enums
+#
+# The *Info and *Event objects are return values from the C extension, are not
+# inputs, and are immutable. There should be no thread-safety concerns for them.
+#
+# The remaining objects are:
+# * Chip
+# * LineRequest
+# * LineSettings
+#
+# LineSettings is a pure Python class, is an argument to functions, and is not
+# passed to the C extension directly. There should be no major concerns about
+# the thread-safety of this object within the C extension.
+#
+# Chip and LineRequest objects are pure Python classes _but_ wrap classes that
+# are exposed by the C extension. Example: LineRequest wraps Request which wraps
+# request_object from the C extension that has buffers allocated at creation.
+# Calling get_values on the Python class will fill the buffer for the underlying
+# C object which could race with another thread writing/reading at the same time.
+#
+# As such, these classes are at risk for conflicts between threads.
+#
+# For GIL enabled CPython builds, calling into the extension maintains the GIL
+# until a call such as Py_BEGIN_ALLOW_THREADS releases it. Until that call, the
+# GIL provides implicit safety for the aforementioned buffers.
+#
+# For no-GIL builds, the GIL is no longer in place to provide that safety.
+# Without the GIL acting as a mutex, either the C extension or the caller are
+# responsible for providing thread safety.
+#
+# The libgpiod C API itself is not advertised as being thread-safe and the
+# bindings do not add any explicit thread-safety mechanisms (there is no internal
+# synchronization). Users of the bindings must provide external synchronization
+# if sharing Chip or LineRequest objects across threads.
+
+
+def get_lock() -> "AbstractContextManager[None | bool]":
+ """
+ Helper function to return a lock or a nullcontext so that no lock is used.
+ Can be used for a quick sanity check that things are not thread-safe.
+ """
+
+ lock: AbstractContextManager[None | bool]
+ if os.getenv("TESTS_NO_LOCKING"):
+ lock = nullcontext()
+ else:
+ lock = threading.Lock()
+ return lock
+
+
+# It should be noted that the values used for the tests below are not "smart"...
+# They do not auto-balance so any tweaks may require significant rework. For
+# example, there are generally 4 lines used for testing which matches the number
+# of threads spun up, with the thread identifier acting as an index to the line
+# it controls/queries.
+
+
+class ThreadedTestCase(TestCase):
+ NUM_THREADS: ClassVar[int]
+ ITERATIONS: ClassVar[int]
+ TIMEOUT: ClassVar[int]
+
+ # This is stubbed out to prevent docstrings from being used as descriptions
+ # when unit tests are run in verbose mode.
+ # See: https://docs.python.org/3/library/unittest.html#unittest.TestCase.shortDescription
+ def shortDescription(self) -> None:
+ return None
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.NUM_THREADS = 4
+ # we want to stress test free threaded builds a bit more
+ cls.ITERATIONS = 200 if is_free_threaded() else 20
+ cls.TIMEOUT = 2
+
+
+class Chip(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(
+ num_lines=4, label="foobar", line_names={0: "l0", 1: "l1", 2: "l2", 3: "l3"}
+ )
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.chip = None # type: ignore[assignment]
+ self.sim = None # type: ignore[assignment]
+
+ def test_per_thread_creation_and_query(self) -> None:
+ """
+ Test that multiple threads can create and query a chip pointing to the
+ same backing device without a mutex
+
+ Synchronization: Not required
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+
+ def worker(tid: int) -> None:
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ offset = tid % self.NUM_THREADS
+ with gpiod.Chip(self.sim.dev_path) as chip:
+ info = chip.get_info()
+ self.assertEqual(
+ (info.name, info.label, info.num_lines),
+ (
+ self.sim.name,
+ "foobar",
+ 4,
+ ),
+ )
+ line_info = chip.get_line_info(f"l{offset}")
+ self.assertEqual(
+ (line_info.offset, line_info.name), (offset, f"l{offset}")
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, i) for i in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_shared_creation_and_query(self) -> None:
+ """
+ Test querying a single chip shared across multiple threads
+
+ Synchronization: Not required
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ def worker(tid: int) -> None:
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ offset = tid % self.NUM_THREADS
+ with lock:
+ info = self.chip.get_info()
+ self.assertEqual(
+ (info.name, info.label, info.num_lines),
+ (self.sim.name, "foobar", 4),
+ )
+ with lock:
+ line_info = self.chip.get_line_info(f"l{offset}")
+ self.assertEqual(
+ (line_info.offset, line_info.name), (offset, f"l{offset}")
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, i) for i in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_shared_closed(self) -> None:
+ """
+ Tests that querying a single `Chip` shared across multiple threads after
+ closing raises an error
+
+ Synchronization: Required
+
+ Note:
+ The underlying `gpiod_chip` struct gets freed on close, leaving a mine
+ for other threads to step on
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ def worker() -> None:
+ barrier.wait()
+ with lock:
+ info = self.chip.get_info()
+ self.chip.close()
+ self.assertEqual(
+ (info.name, info.label, info.num_lines),
+ (self.sim.name, "foobar", 4),
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker) for _ in range(self.NUM_THREADS)]
+ error_count = 0
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ try:
+ future.result(timeout=self.TIMEOUT)
+ except gpiod.ChipClosedError:
+ error_count += 1
+ self.assertEqual(error_count, self.NUM_THREADS - 1)
+
+
+class InfoEvent(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(num_lines=4, label="foobar")
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.chip = None # type: ignore[assignment]
+ self.sim = None # type: ignore[assignment]
+
+ def test_watch_unwatch_line_info(self) -> None:
+ """
+ Tests that threads that share a `Chip` can watch/unwatch line info events
+
+ Synchronization: Not required
+
+ Note:
+ Threads may encounter EBUSY if the underlying file descriptor is busy or
+ if the offset is already being watched
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ num_lines = self.chip.get_info().num_lines
+
+ def worker(tid: int) -> None:
+ offset = tid % num_lines
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ try:
+ info = self.chip.watch_line_info(offset)
+ self.assertEqual(info.offset, offset)
+ except OSError as e:
+ if e.errno == errno.EBUSY:
+ retry_count = 0
+ while retry_count < 2:
+ try:
+ retry_count += 1
+ self.chip.unwatch_line_info(offset)
+ break
+ except OSError as e:
+ pass
+
+ info = self.chip.get_line_info(offset)
+ self.assertEqual(info.offset, offset)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, _) for _ in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_watch_unwatch_line_info_locks(self) -> None:
+ """
+ Tests that threads that share a `Chip` can watch/unwatch line info events
+ with locking
+
+ Same as test_watch_unwatch_line_info but with locks and no EBUSY handling
+
+ Synchronization: Not required
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+ watching: set[int] = set()
+
+ def worker(tid: int) -> None:
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ offset = tid % self.NUM_THREADS
+ with lock:
+ if offset in watching:
+ self.chip.unwatch_line_info(offset)
+ watching.remove(offset)
+ info = self.chip.get_line_info(offset)
+ else:
+ info = self.chip.watch_line_info(offset)
+ watching.add(offset)
+ self.assertEqual(info.offset, offset)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, i) for i in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_read_info_event(self) -> None:
+ """
+ Test that multiple threads that share a Chip can read info events
+
+ Synchronization: Not required
+ """
+
+ num_lines = self.chip.get_info().num_lines
+ for offset in range(num_lines):
+ self.chip.watch_line_info(offset)
+ # If read_edge_events() is blocking, threads will hang forever waiting
+ # for events that don't exist when we're looking to shutdown.
+ flags = fcntl.fcntl(self.chip.fd, fcntl.F_GETFL)
+ fcntl.fcntl(self.chip.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ worker_barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ feeder_barrier = threading.Barrier(2, timeout=self.TIMEOUT)
+ done_fd = os.eventfd(0)
+
+ total = 0
+ counter_lock = threading.Lock()
+
+ poll = epoll()
+ poll.register(self.chip.fd, EPOLLIN)
+ poll.register(done_fd, EPOLLIN)
+
+ def reader_worker(tid: int) -> None:
+ should_exit = False
+ local_count = 0
+ nonlocal total
+
+ worker_barrier.wait()
+ while not should_exit:
+ events = poll.poll(timeout=self.TIMEOUT)
+
+ for fd, _ in events:
+ if fd == done_fd:
+ should_exit = True
+ continue
+ if fd == self.chip.fd:
+ # read_info_event() only reads ONE event at a time (unlike edge events).
+ # We must loop until EAGAIN to fully drain the kernel buffer.
+ try:
+ while True:
+ _event = self.chip.read_info_event()
+ self.assertIsNotNone(_event)
+ local_count += 1
+ except OSError as e:
+ if e.errno == errno.EAGAIN:
+ continue
+ raise
+
+ with counter_lock:
+ total += local_count
+
+ def feeder(tid: int) -> None:
+ offsets = list(range(tid, num_lines, 2))
+ worker_barrier.wait()
+
+ for i in range(int(self.ITERATIONS)):
+ offset = offsets[i % len(offsets)]
+ with self.chip.request_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.INPUT)}
+ ) as req:
+ req.reconfigure_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+
+ feeder_barrier.wait()
+ # Thread 0 signals done when all events have fired
+ if tid == 0:
+ os.eventfd_write(done_fd, 1)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as ex:
+ futures = [ex.submit(feeder, i) for i in range(2)]
+ futures += [ex.submit(reader_worker, i) for i in range(2, self.NUM_THREADS)]
+
+ try:
+ for f in as_completed(futures, timeout=self.TIMEOUT):
+ f.result(timeout=self.TIMEOUT)
+ self.assertGreater(total, 0)
+ finally:
+ for fd in [self.chip.fd, done_fd]:
+ poll.unregister(fd)
+ poll.close()
+ os.close(done_fd)
+ for offset in range(num_lines):
+ self.chip.unwatch_line_info(offset)
+
+
+class LineRequest(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(
+ num_lines=4, label="foobar", line_names={0: "l0", 1: "l1", 2: "l2", 3: "l3"}
+ )
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.chip = None # type: ignore[assignment]
+ self.sim = None # type: ignore[assignment]
+
+ def test_per_thread_creation_and_query(self) -> None:
+ """
+ Test that multiple threads can create and query their own LineRequest
+ without a mutex
+
+ Synchronization: Not required
+
+ Note: without a lock, EPERM may get raised due to the direction of the
+ offset having been changed from output to input
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ def worker(tid: int) -> None:
+ # distribute threads across number of lines
+ offset = 2 + (tid % 2)
+ with lock:
+ request = self.chip.request_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+ counter = 0
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ try:
+ with lock:
+ direction = self.chip.get_line_info(offset).direction
+ if direction == Direction.INPUT:
+ continue
+ if request.get_value(offset) == Value.ACTIVE:
+ request.set_value(offset, Value.INACTIVE)
+ self.assertEqual(request.get_value(offset), Value.INACTIVE)
+ counter += 1
+ else:
+ request.set_value(offset, Value.ACTIVE)
+ self.assertEqual(request.get_value(offset), Value.ACTIVE)
+ counter += 1
+ # set_value may raise a permission error when the pin is INPUT
+ except OSError:
+ pass
+ self.assertGreater(counter, 0)
+
+ def feeder(tid: int) -> None:
+ offset = tid % 2
+ with lock:
+ request = self.chip.request_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+ barrier.wait()
+ for iteration in range(self.ITERATIONS):
+ new_dir = Direction.INPUT if iteration % 2 == 0 else Direction.OUTPUT
+ with lock:
+ request.reconfigure_lines(
+ config={offset: gpiod.LineSettings(direction=new_dir)}
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(feeder, i) for i in range(2)]
+ futures += [executor.submit(worker, i) for i in range(2, self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_shared_creation_and_query(self) -> None:
+ """
+ Test multiple threads can reconfigure, set values and get values on a
+ shared line request
+
+ Synchronization: Required
+
+ Note:
+ This won't actually blow up, but based on the extension implementation
+ the request has a shared buffer for offets/values that are reused for
+ getting/setting line values
+
+ Without synchronization, a thread may think it's setting one set of values
+ but the buffer values may have been overwritten by another thread
+
+ Implementation Note:
+ We use a dual set of events to make sure the feeder/worker pair alternate
+ otherwise a thread may monopolize the lock and finish before triggering
+ a set_value call. We pair this with a lock to prevent issues with the
+ aforementioned buffer contention.
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+ num_lines = self.chip.get_info().num_lines
+ request = self.chip.request_lines(
+ config={range(num_lines): gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+
+ ready_events = {0: threading.Event(), 1: threading.Event()}
+ set_events = {0: threading.Event(), 1: threading.Event()}
+
+ def worker(tid: int) -> None:
+ # we're using 2 feeder threads, each with a dedicated offset
+ offset = tid % 2
+ counter = 0
+ ready_event = ready_events[offset]
+ set_event = set_events[offset]
+ set_event.set()
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ ready_event.wait(self.TIMEOUT)
+ ready_event.clear()
+ with lock:
+ if self.chip.get_line_info(offset).direction == Direction.OUTPUT:
+ if request.get_value(offset) == Value.ACTIVE:
+ request.set_value(offset, Value.INACTIVE)
+ self.assertEqual(request.get_value(offset), Value.INACTIVE)
+ counter += 1
+ else:
+ request.set_value(offset, Value.ACTIVE)
+ self.assertEqual(request.get_value(offset), Value.ACTIVE)
+ counter += 1
+ set_event.set()
+ self.assertGreater(counter, 0)
+
+ def feeder(tid: int) -> None:
+ offset = tid % 2
+ ready_event = ready_events[offset]
+ set_event = set_events[offset]
+ barrier.wait()
+ for iteration in range(self.ITERATIONS):
+ new_dir = Direction.INPUT if iteration % 2 == 0 else Direction.OUTPUT
+ set_event.wait(self.TIMEOUT)
+ set_event.clear()
+ with lock:
+ request.reconfigure_lines(
+ config={offset: gpiod.LineSettings(direction=new_dir)}
+ )
+ ready_event.set()
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(feeder, i) for i in range(2)]
+ futures += [executor.submit(worker, i) for i in range(2, self.NUM_THREADS)]
+ try:
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+ finally:
+ request.release()
+
+ def test_shared_set_get_values(self) -> None:
+ """
+ Test setting and getting values from a single line request shared across
+ multiple threads
+
+ Synchronization: Required
+
+ Note:
+ This won't actually blow up, but based on the extension implementation
+ the request has a shared buffer for offets/values that are reused for
+ getting/setting line values
+
+ Without synchronization, a thread may think it's setting one set of values
+ but the buffer values may have been overwritten by another thread
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+ offset = 0
+ request = self.chip.request_lines(
+ config={0: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+
+ def worker() -> None:
+ counter = 0
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ with lock:
+ if request.get_value(offset) == Value.ACTIVE:
+ request.set_value(offset, Value.INACTIVE)
+ self.assertEqual(request.get_value(offset), Value.INACTIVE)
+ counter += 1
+ else:
+ request.set_value(offset, Value.ACTIVE)
+ self.assertEqual(request.get_value(offset), Value.ACTIVE)
+ counter += 1
+ self.assertGreater(counter, 0)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker) for _ in range(self.NUM_THREADS)]
+ try:
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+ finally:
+ request.release()
+
+ def test_shared_close(self) -> None:
+ """
+ Test that querying a single line request shared across multiple threads
+ after releasing raises an error
+
+ Synchronization: Required
+
+ Note:
+ The underlying `gpiod_line_request` struct gets freed on release, leaving
+ a mine for other threads to step on
+ """
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ num_lines = self.chip.get_info().num_lines
+ request = self.chip.request_lines(
+ config={
+ range(num_lines): gpiod.LineSettings(
+ direction=Direction.OUTPUT, output_value=Value.INACTIVE
+ )
+ }
+ )
+
+ def worker() -> None:
+ barrier.wait()
+ with lock:
+ info = request.get_values(range(num_lines))
+ request.release()
+ for line in info:
+ self.assertEqual(line, Value.INACTIVE)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker) for _ in range(self.NUM_THREADS)]
+ error_count = 0
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ try:
+ future.result(timeout=self.TIMEOUT)
+ except gpiod.RequestReleasedError:
+ error_count += 1
+ self.assertEqual(error_count, self.NUM_THREADS - 1)
+
+
+class EdgeEvent(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(num_lines=4, label="foobar")
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.sim = None # type: ignore[assignment]
+ self.chip = None # type: ignore[assignment]
+
+ def test_read_edge_events(self) -> None:
+ """
+ Test that multiple threads can read edge events on a shared LineRequest
+
+ Synchronization: Required
+
+ Note:
+ The request object has a gpiod_edge_event_buffer for events to be read into.
+ Without synchronization, that buffer will be overwritten by another thread
+ when attempting to create event objects
+ """
+ num_lines = self.chip.get_info().num_lines
+ req = self.chip.request_lines(
+ config={
+ range(num_lines): gpiod.LineSettings(
+ direction=Direction.INPUT, edge_detection=Edge.BOTH
+ )
+ }
+ )
+
+ # If read_edge_events() is blocking, threads will hang forever waiting
+ # for events that don't exist during shutdown.
+ flags = fcntl.fcntl(req.fd, fcntl.F_GETFL)
+ fcntl.fcntl(req.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ worker_barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ feeder_barrier = threading.Barrier(2, timeout=self.TIMEOUT)
+ done_fd = os.eventfd(0)
+
+ total = 0
+ counter_lock = threading.Lock()
+ req_lock = get_lock()
+
+ poll = epoll()
+ poll.register(req.fd, EPOLLIN)
+ poll.register(done_fd, EPOLLIN)
+
+ def reader_worker(tid: int) -> None:
+ nonlocal total
+ should_exit = False
+ local_count = 0
+ worker_barrier.wait()
+
+ while not should_exit:
+ events = poll.poll(self.TIMEOUT)
+
+ for fd, _ in events:
+ if fd == done_fd:
+ should_exit = True
+ continue
+
+ if fd == req.fd:
+ try:
+ with req_lock:
+ # O_NONBLOCK prevents hanging
+ evs = req.read_edge_events()
+ if evs:
+ local_count += len(evs)
+ except OSError as e:
+ if e.errno == errno.EAGAIN:
+ continue
+ raise
+
+ with counter_lock:
+ total += local_count
+
+ def feeder(tid: int) -> None:
+ offsets = list(range(tid, num_lines, 2))
+ worker_barrier.wait()
+
+ for i in range(int(self.ITERATIONS)):
+ offset = offsets[i % len(offsets)]
+ for pull in [gpiosim.Chip.Pull.UP, gpiosim.Chip.Pull.DOWN]:
+ self.sim.set_pull(offset, pull)
+
+ feeder_barrier.wait()
+ # Thread 0 signals done when all pulses have fired
+ if tid == 0:
+ os.eventfd_write(done_fd, 1)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as ex:
+ futures = [ex.submit(feeder, i) for i in range(2)]
+ futures += [ex.submit(reader_worker, i) for i in range(2, self.NUM_THREADS)]
+
+ try:
+ for f in as_completed(futures, timeout=self.TIMEOUT):
+ f.result(timeout=self.TIMEOUT)
+ self.assertGreater(total, 0)
+ finally:
+ for fd in [req.fd, done_fd]:
+ poll.unregister(fd)
+ poll.close()
+ os.close(done_fd)
+ req.release()
--
2.43.0
^ permalink raw reply related [flat|nested] 5+ messages in thread* [libgpiod][PATCH v2 2/3] bindings: python: support free-threaded CPython
2026-05-27 21:59 [libgpiod][PATCH v2 0/3] bindings: python: add support for free-threaded Python Vincent Fazio
2026-05-27 21:59 ` [libgpiod][PATCH v2 1/3] bindings: python: tests: add multi-threaded tests Vincent Fazio
@ 2026-05-27 21:59 ` Vincent Fazio
2026-05-27 21:59 ` [libgpiod][PATCH v2 3/3] bindings: python: add a changelog Vincent Fazio
2026-05-28 9:03 ` [libgpiod][PATCH v2 0/3] bindings: python: add support for free-threaded Python Bartosz Golaszewski
3 siblings, 0 replies; 5+ messages in thread
From: Vincent Fazio @ 2026-05-27 21:59 UTC (permalink / raw)
To: linux-gpio; +Cc: brgl, Vincent Fazio
PEP 703 [0] discusses making the GIL optional in certain builds of the
CPython interpreter.
This build option was available experiementally in Python 3.13 but has
since been stabilized in Python 3.14 per PEP 779 [1].
According to the porting guide [2], there is no strict requirement that
C extensions must be thread-safe.
Experiments have shown that no logic changes are required if callers
use sychronization mechanisms provided by the Python standard library.
The documentation has been updated to call this out specifically using
terminology from the porting guide [3].
[0]: https://peps.python.org/pep-0703/
[1]: https://peps.python.org/pep-0779/
[2]: https://py-free-threading.github.io/porting/#define-and-document-thread-safety-guarantees
[3]: https://py-free-threading.github.io/documentation-principles/#free-threading-terminology
Signed-off-by: Vincent Fazio <vfazio@gmail.com>
---
bindings/python/README.md | 8 +++++++-
bindings/python/gpiod/chip.py | 3 +++
bindings/python/gpiod/chip_info.py | 4 +++-
bindings/python/gpiod/edge_event.py | 2 ++
bindings/python/gpiod/ext/module.c | 5 ++++-
bindings/python/gpiod/info_event.py | 2 ++
bindings/python/gpiod/line_info.py | 4 +++-
bindings/python/gpiod/line_request.py | 3 +++
bindings/python/gpiod/line_settings.py | 2 ++
bindings/python/pyproject.toml | 2 +-
bindings/python/tests/gpiosim/ext.c | 3 +++
bindings/python/tests/system/ext.c | 3 +++
bindings/python/tests/tests_threading.py | 3 +++
docs/python_api.rst | 6 ++++++
14 files changed, 45 insertions(+), 5 deletions(-)
diff --git a/bindings/python/README.md b/bindings/python/README.md
index 2faa6f4..f3cd77a 100644
--- a/bindings/python/README.md
+++ b/bindings/python/README.md
@@ -5,10 +5,16 @@
These are the official Python bindings for [libgpiod](https://git.kernel.org/pub/scm/libs/libgpiod/libgpiod.git/about/).
+Both GIL-enabled and free-threaded CPython are supported.
+
+The Python bindings, much like the C API they wrap, are not thread-safe and do
+require external synchronization by the caller to serialize access to objects
+shared across threads.
+
The gpiod library has been vendored into this package for your convenience and
this version of gpiod is independent from your system package.
-Binary wheels are not provided. The source package requires python3-dev.
+Binary wheels are provided for some platforms. The source package requires python3-dev.
## Rationale
diff --git a/bindings/python/gpiod/chip.py b/bindings/python/gpiod/chip.py
index 6b7b32b..2acb872 100644
--- a/bindings/python/gpiod/chip.py
+++ b/bindings/python/gpiod/chip.py
@@ -50,6 +50,9 @@ class Chip:
with gpiod.Chip(path="/dev/gpiochip0") as chip:
do_something(chip)
+
+ Synchronization: objects of this class require external synchronization.
+ Protect calls with a lock when sharing an instance across threads.
"""
def __init__(self, path: str):
diff --git a/bindings/python/gpiod/chip_info.py b/bindings/python/gpiod/chip_info.py
index 737a45e..1a81a9d 100644
--- a/bindings/python/gpiod/chip_info.py
+++ b/bindings/python/gpiod/chip_info.py
@@ -10,7 +10,9 @@ __all__ = ["ChipInfo"]
@dataclass(frozen=True, repr=False)
class ChipInfo:
"""
- Snapshot of a chip's status.
+ Immutable snapshot of a chip's status.
+
+ Synchronization: none required
"""
name: str
diff --git a/bindings/python/gpiod/edge_event.py b/bindings/python/gpiod/edge_event.py
index c888cb2..99bb83d 100644
--- a/bindings/python/gpiod/edge_event.py
+++ b/bindings/python/gpiod/edge_event.py
@@ -13,6 +13,8 @@ __all__ = ["EdgeEvent"]
class EdgeEvent:
"""
Immutable object containing data about a single edge event.
+
+ Synchronization: none required
"""
class Type(Enum):
diff --git a/bindings/python/gpiod/ext/module.c b/bindings/python/gpiod/ext/module.c
index e567f07..21ea519 100644
--- a/bindings/python/gpiod/ext/module.c
+++ b/bindings/python/gpiod/ext/module.c
@@ -188,7 +188,10 @@ static int module_exec(PyObject* module)
static struct PyModuleDef_Slot module_slots[] = {
{ Py_mod_exec, module_exec },
- { },
+#if PY_VERSION_HEX >= 0x030E0000 && defined(Py_GIL_DISABLED)
+ {Py_mod_gil, Py_MOD_GIL_NOT_USED},
+#endif
+ {0, NULL},
};
static PyModuleDef module_def = {
diff --git a/bindings/python/gpiod/info_event.py b/bindings/python/gpiod/info_event.py
index cd2785e..e3dd582 100644
--- a/bindings/python/gpiod/info_event.py
+++ b/bindings/python/gpiod/info_event.py
@@ -14,6 +14,8 @@ __all__ = ["InfoEvent"]
class InfoEvent:
"""
Immutable object containing data about a single line info event.
+
+ Synchronization: none required
"""
class Type(Enum):
diff --git a/bindings/python/gpiod/line_info.py b/bindings/python/gpiod/line_info.py
index d31565e..b11ed42 100644
--- a/bindings/python/gpiod/line_info.py
+++ b/bindings/python/gpiod/line_info.py
@@ -12,7 +12,9 @@ __all__ = ["LineInfo"]
@dataclass(frozen=True, init=False, repr=False)
class LineInfo:
"""
- Snapshot of a line's status.
+ Immutable snapshot of a line's status.
+
+ Synchronization: none required
"""
offset: int
diff --git a/bindings/python/gpiod/line_request.py b/bindings/python/gpiod/line_request.py
index a271080..dc7cd22 100644
--- a/bindings/python/gpiod/line_request.py
+++ b/bindings/python/gpiod/line_request.py
@@ -26,6 +26,9 @@ __all__ = ["LineRequest"]
class LineRequest:
"""
Stores the context of a set of requested GPIO lines.
+
+ Synchronization: objects of this class require external synchronization.
+ Protect calls with a lock when sharing an instance across threads.
"""
def __init__(self, req: _ext.Request):
diff --git a/bindings/python/gpiod/line_settings.py b/bindings/python/gpiod/line_settings.py
index 3752acd..e54bc80 100644
--- a/bindings/python/gpiod/line_settings.py
+++ b/bindings/python/gpiod/line_settings.py
@@ -14,6 +14,8 @@ __all__ = ["LineSettings"]
class LineSettings:
"""
Stores a set of line properties.
+
+ Synchronization: none required
"""
direction: Direction = Direction.AS_IS
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index 98bb44c..7c4474a 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -27,6 +27,7 @@ classifiers = [
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
+ "Programming Language :: Python :: Free Threading :: 3 - Stable",
]
[project.urls]
@@ -70,5 +71,4 @@ ignore=[
[tool.cibuildwheel]
build = "cp*"
-skip = "cp31?t-*" # Do not build free-threaded wheels
archs = ["x86_64", "aarch64"]
diff --git a/bindings/python/tests/gpiosim/ext.c b/bindings/python/tests/gpiosim/ext.c
index c15ebf5..cc7bd37 100644
--- a/bindings/python/tests/gpiosim/ext.c
+++ b/bindings/python/tests/gpiosim/ext.c
@@ -367,6 +367,9 @@ static void free_module_state(void *mod)
static struct PyModuleDef_Slot module_slots[] = {
{ Py_mod_exec, module_exec },
+#if PY_VERSION_HEX >= 0x030E0000 && defined(Py_GIL_DISABLED)
+ {Py_mod_gil, Py_MOD_GIL_NOT_USED},
+#endif
{ 0, NULL },
};
diff --git a/bindings/python/tests/system/ext.c b/bindings/python/tests/system/ext.c
index 8f451fc..8b307f6 100644
--- a/bindings/python/tests/system/ext.c
+++ b/bindings/python/tests/system/ext.c
@@ -68,6 +68,9 @@ static PyMethodDef module_methods[] = {
};
static struct PyModuleDef_Slot module_slots[] = {
+#if PY_VERSION_HEX >= 0x030E0000 && defined(Py_GIL_DISABLED)
+ {Py_mod_gil, Py_MOD_GIL_NOT_USED},
+#endif
{ 0, NULL },
};
diff --git a/bindings/python/tests/tests_threading.py b/bindings/python/tests/tests_threading.py
index a61af5b..6fbf8be 100644
--- a/bindings/python/tests/tests_threading.py
+++ b/bindings/python/tests/tests_threading.py
@@ -1,3 +1,6 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# SPDX-FileCopyrightText: 2026 Vincent Fazio <vfazio@gmail.com>
+
import errno
import fcntl
import os
diff --git a/docs/python_api.rst b/docs/python_api.rst
index 2c4f59d..57d45ff 100644
--- a/docs/python_api.rst
+++ b/docs/python_api.rst
@@ -17,6 +17,12 @@ easily through Python scripts, enabling tasks such as reading input values,
setting outputs, monitoring events, and configuring more fine-grained pin
options.
+The Python bindings, much like the C API they wrap, are not thread-safe and do
+require external synchronization by the caller to serialize access to objects
+shared across threads.
+
+The bindings support both GIL-enabled and free-threaded CPython interpreters.
+
.. note::
Python bindings require python3 support and libpython development files for
building from sources.
--
2.43.0
^ permalink raw reply related [flat|nested] 5+ messages in thread