From: Vincent Fazio <vfazio@gmail.com>
To: linux-gpio@vger.kernel.org
Cc: brgl@kernel.org, Vincent Fazio <vfazio@gmail.com>
Subject: [libgpiod][PATCH 1/3] bindings: python: tests: add multi-threaded tests
Date: Fri, 22 May 2026 15:04:16 -0500 [thread overview]
Message-ID: <20260522200419.105496-2-vfazio@gmail.com> (raw)
In-Reply-To: <20260522200419.105496-1-vfazio@gmail.com>
Add new multi-threaded test cases to ensure the bindings operate as
expected and do not lead to crashes or data corruption.
Signed-off-by: Vincent Fazio <vfazio@gmail.com>
---
bindings/python/tests/__main__.py | 1 +
bindings/python/tests/helpers.py | 5 +
bindings/python/tests/meson.build | 1 +
bindings/python/tests/tests_threading.py | 739 +++++++++++++++++++++++
4 files changed, 746 insertions(+)
create mode 100644 bindings/python/tests/tests_threading.py
diff --git a/bindings/python/tests/__main__.py b/bindings/python/tests/__main__.py
index 318e0df..591e20d 100644
--- a/bindings/python/tests/__main__.py
+++ b/bindings/python/tests/__main__.py
@@ -14,6 +14,7 @@ from .tests_line_info import *
from .tests_line_request import *
from .tests_line_settings import *
from .tests_module import *
+from .tests_threading import *
set_process_name("python-gpiod")
diff --git a/bindings/python/tests/helpers.py b/bindings/python/tests/helpers.py
index 4abd8b2..a6c02de 100644
--- a/bindings/python/tests/helpers.py
+++ b/bindings/python/tests/helpers.py
@@ -4,6 +4,7 @@
from __future__ import annotations
import os
+import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -25,3 +26,7 @@ class LinkGuard:
tb: TracebackType | None,
) -> None:
os.unlink(self.dst)
+
+
+def is_free_threaded() -> bool:
+ return hasattr(sys, "_is_gil_enabled") and not sys._is_gil_enabled()
diff --git a/bindings/python/tests/meson.build b/bindings/python/tests/meson.build
index 16d84cc..3ae36a4 100644
--- a/bindings/python/tests/meson.build
+++ b/bindings/python/tests/meson.build
@@ -19,6 +19,7 @@ foreach f : [
'tests_line_request.py',
'tests_line_settings.py',
'tests_module.py',
+ 'tests_threading.py',
]
fs.copyfile(f)
endforeach
diff --git a/bindings/python/tests/tests_threading.py b/bindings/python/tests/tests_threading.py
new file mode 100644
index 0000000..e9f551f
--- /dev/null
+++ b/bindings/python/tests/tests_threading.py
@@ -0,0 +1,739 @@
+import errno
+import fcntl
+import os
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from contextlib import nullcontext
+from select import EPOLLIN, epoll
+from typing import TYPE_CHECKING, ClassVar
+from unittest import TestCase
+
+import gpiod
+from gpiod.line import Direction, Edge, Value
+
+from . import gpiosim
+from .helpers import is_free_threaded
+
+if TYPE_CHECKING:
+ from contextlib import AbstractContextManager
+
+
+# Threading & the CPython bindings as they relate to the C extension:
+#
+# Python is sometimes mistakenly considered thread-safe but this is not the
+# case even with GIL enabled builds as there can still be data races between
+# threads on pure Python objects.
+#
+# What is guaranteed is ref counts, memory management, etc being handled safely.
+# Mutations on objects like dicts/lists are _not_ guaranteed to be safe.
+#
+# Of the objects exposed by the bindings, the following are effectively "frozen":
+# * ChipInfo
+# * LineInfo
+# * InfoEvent
+# * EdgeEvent
+# * gpiod.line Enums
+#
+# The *Info and *Event objects are return values from the C extension, are not
+# inputs, and are immutable. There should be no thread-safety concerns for them.
+#
+# The remaining objects are:
+# * Chip
+# * LineRequest
+# * LineSettings
+#
+# LineSettings is a pure Python class, is an argument to functions, and is not
+# passed to the C extension directly. There should be no major concerns about
+# the thread-safety of this object within the C extension.
+#
+# Chip and LineRequest objects are pure Python classes _but_ wrap classes that
+# are exposed by the C extension. Example: LineRequest wraps Request which wraps
+# request_object from the C extension that has buffers allocated at creation.
+# Calling get_values on the Python class will fill the buffer for the underlying
+# C object which could race with another thread writing/reading at the same time.
+#
+# As such, these classes are at risk for conflicts between threads.
+#
+# For GIL enabled CPython builds, calling into the extension maintains the GIL
+# until a call such as Py_BEGIN_ALLOW_THREADS releases it. Until that call, the
+# GIL provides implicit safety for the aforementioned buffers.
+#
+# For no-GIL builds, the GIL is no longer in place to provide that safety.
+# Without the GIL acting as a mutex, either the C extension or the caller are
+# responsible for providing thread safety.
+#
+# The libgpiod C API itself is not advertised as being thread-safe and the
+# bindings do not add any explicit thread-safety mechanisms (there is no internal
+# synchronization). Users of the bindings must provide external synchronization
+# if sharing Chip or LineRequest objects across threads.
+
+
+def get_lock() -> "AbstractContextManager[None | bool]":
+ """
+ Helper function to return a lock or a nullcontext so that no lock is used.
+ Can be used for a quick sanity check that things are not thread-safe.
+ """
+
+ lock: AbstractContextManager[None | bool]
+ if os.getenv("TESTS_NO_LOCKING"):
+ lock = nullcontext()
+ else:
+ lock = threading.Lock()
+ return lock
+
+
+# It should be noted that the values used for the tests below are not "smart"...
+# They do not auto-balance so any tweaks may require significant rework. For
+# example, there are generally 4 lines used for testing which matches the number
+# of threads spun up, with the thread identifier acting as an index to the line
+# it controls/queries.
+
+
+class ThreadedTestCase(TestCase):
+ NUM_THREADS: ClassVar[int]
+ ITERATIONS: ClassVar[int]
+ TIMEOUT: ClassVar[int]
+
+ def shortDescription(self) -> None:
+ return None
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.NUM_THREADS = 4
+ # we want to stress test free threaded builds a bit more
+ cls.ITERATIONS = 200 if is_free_threaded() else 20
+ cls.TIMEOUT = 2
+
+
+class Chip(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(
+ num_lines=4, label="foobar", line_names={0: "l0", 1: "l1", 2: "l2", 3: "l3"}
+ )
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.chip = None # type: ignore[assignment]
+ self.sim = None # type: ignore[assignment]
+
+ def test_per_thread_creation_and_query(self) -> None:
+ """
+ Test that multiple threads can create and query a chip pointing to the
+ same backing device without a mutex
+
+ Synchronization: Not required
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+
+ def worker(tid: int) -> None:
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ offset = tid % self.NUM_THREADS
+ with gpiod.Chip(self.sim.dev_path) as chip:
+ info = chip.get_info()
+ self.assertEqual(
+ (info.name, info.label, info.num_lines),
+ (
+ self.sim.name,
+ "foobar",
+ 4,
+ ),
+ )
+ line_info = chip.get_line_info(f"l{offset}")
+ self.assertEqual(
+ (line_info.offset, line_info.name), (offset, f"l{offset}")
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, i) for i in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_shared_creation_and_query(self) -> None:
+ """
+ Test querying a single chip shared across multiple threads
+
+ Synchronization: Not required
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ def worker(tid: int) -> None:
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ offset = tid % self.NUM_THREADS
+ with lock:
+ info = self.chip.get_info()
+ self.assertEqual(
+ (info.name, info.label, info.num_lines),
+ (self.sim.name, "foobar", 4),
+ )
+ with lock:
+ line_info = self.chip.get_line_info(f"l{offset}")
+ self.assertEqual(
+ (line_info.offset, line_info.name), (offset, f"l{offset}")
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, i) for i in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_shared_closed(self) -> None:
+ """
+ Tests that querying a single `Chip` shared across multiple threads after
+ closing raises an error
+
+ Synchronization: Required
+
+ Note:
+ The underlying `gpiod_chip` struct gets freed on close, leaving a mine
+ for other threads to step on
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ def worker() -> None:
+ barrier.wait()
+ with lock:
+ info = self.chip.get_info()
+ self.chip.close()
+ self.assertEqual(
+ (info.name, info.label, info.num_lines),
+ (self.sim.name, "foobar", 4),
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker) for _ in range(self.NUM_THREADS)]
+ error_count = 0
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ try:
+ future.result(timeout=self.TIMEOUT)
+ except gpiod.ChipClosedError:
+ error_count += 1
+ self.assertEqual(error_count, self.NUM_THREADS - 1)
+
+
+class InfoEvent(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(num_lines=4, label="foobar")
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.chip = None # type: ignore[assignment]
+ self.sim = None # type: ignore[assignment]
+
+ def test_watch_unwatch_line_info(self) -> None:
+ """
+ Tests that threads that share a `Chip` can watch/unwatch line info events
+
+ Synchronization: Not required
+
+ Note:
+ Threads may encounter EBUSY if the underlying file descriptor is busy or
+ if the offset is already being watched
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ num_lines = self.chip.get_info().num_lines
+
+ def worker(tid: int) -> None:
+ offset = tid % num_lines
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ try:
+ info = self.chip.watch_line_info(offset)
+ self.assertEqual(info.offset, offset)
+ except OSError as e:
+ if e.errno == errno.EBUSY:
+ retry_count = 0
+ while retry_count < 2:
+ try:
+ retry_count += 1
+ self.chip.unwatch_line_info(offset)
+ break
+ except OSError as e:
+ pass
+
+ info = self.chip.get_line_info(offset)
+ self.assertEqual(info.offset, offset)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, _) for _ in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_watch_unwatch_line_info_locks(self) -> None:
+ """
+ Tests that threads that share a `Chip` can watch/unwatch line info events
+ with locking
+
+ Same as test_watch_unwatch_line_info but with locks and no EBUSY handling
+
+ Synchronization: Not required
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+ watching: set[int] = set()
+
+ def worker(tid: int) -> None:
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ offset = tid % self.NUM_THREADS
+ with lock:
+ if offset in watching:
+ self.chip.unwatch_line_info(offset)
+ watching.remove(offset)
+ info = self.chip.get_line_info(offset)
+ else:
+ info = self.chip.watch_line_info(offset)
+ watching.add(offset)
+ self.assertEqual(info.offset, offset)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker, i) for i in range(self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_read_info_event(self) -> None:
+ """
+ Test that multiple threads that share a Chip can read info events
+
+ Synchronization: Not required
+ """
+
+ num_lines = self.chip.get_info().num_lines
+ for offset in range(num_lines):
+ self.chip.watch_line_info(offset)
+ # If read_edge_events() is blocking, threads will hang forever waiting
+ # for events that don't exist when we're looking to shutdown.
+ flags = fcntl.fcntl(self.chip.fd, fcntl.F_GETFL)
+ fcntl.fcntl(self.chip.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ worker_barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ feeder_barrier = threading.Barrier(2, timeout=self.TIMEOUT)
+ done_fd = os.eventfd(0)
+
+ total = 0
+ counter_lock = threading.Lock()
+
+ poll = epoll()
+ poll.register(self.chip.fd, EPOLLIN)
+ poll.register(done_fd, EPOLLIN)
+
+ def reader_worker(tid: int) -> None:
+ should_exit = False
+ local_count = 0
+ nonlocal total
+
+ worker_barrier.wait()
+ while not should_exit:
+ events = poll.poll(timeout=self.TIMEOUT)
+
+ for fd, _ in events:
+ if fd == done_fd:
+ should_exit = True
+ continue
+ if fd == self.chip.fd:
+ # read_info_event() only reads ONE event at a time (unlike edge events).
+ # We must loop until EAGAIN to fully drain the kernel buffer.
+ try:
+ while True:
+ _event = self.chip.read_info_event()
+ self.assertIsNotNone(_event)
+ local_count += 1
+ except OSError as e:
+ if e.errno == errno.EAGAIN:
+ continue
+ raise
+
+ with counter_lock:
+ total += local_count
+
+ def feeder(tid: int) -> None:
+ offsets = list(range(tid, num_lines, 2))
+ worker_barrier.wait()
+
+ for i in range(int(self.ITERATIONS)):
+ offset = offsets[i % len(offsets)]
+ with self.chip.request_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.INPUT)}
+ ) as req:
+ req.reconfigure_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+
+ feeder_barrier.wait()
+ # Thread 0 signals done when all events have fired
+ if tid == 0:
+ os.eventfd_write(done_fd, 1)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as ex:
+ futures = [ex.submit(feeder, i) for i in range(2)]
+ futures += [ex.submit(reader_worker, i) for i in range(2, self.NUM_THREADS)]
+
+ try:
+ for f in as_completed(futures, timeout=self.TIMEOUT):
+ f.result(timeout=self.TIMEOUT)
+ self.assertGreater(total, 0)
+ finally:
+ for fd in [self.chip.fd, done_fd]:
+ poll.unregister(fd)
+ poll.close()
+ os.close(done_fd)
+ for offset in range(num_lines):
+ self.chip.unwatch_line_info(offset)
+
+
+class LineRequest(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(
+ num_lines=4, label="foobar", line_names={0: "l0", 1: "l1", 2: "l2", 3: "l3"}
+ )
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.chip = None # type: ignore[assignment]
+ self.sim = None # type: ignore[assignment]
+
+ def test_per_thread_creation_and_query(self) -> None:
+ """
+ Test that multiple threads can create and query their own LineRequest
+ without a mutex
+
+ Synchronization: Not required
+
+ Note: without a lock, EPERM may get raised due to the direction of the
+ offset having been changed from output to input
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ def worker(tid: int) -> None:
+ # distribute threads across number of lines
+ offset = 2 + (tid % 2)
+ with lock:
+ request = self.chip.request_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+ counter = 0
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ try:
+ with lock:
+ direction = self.chip.get_line_info(offset).direction
+ if direction == Direction.INPUT:
+ continue
+ if request.get_value(offset) == Value.ACTIVE:
+ request.set_value(offset, Value.INACTIVE)
+ self.assertEqual(request.get_value(offset), Value.INACTIVE)
+ counter += 1
+ else:
+ request.set_value(offset, Value.ACTIVE)
+ self.assertEqual(request.get_value(offset), Value.ACTIVE)
+ counter += 1
+ # set_value may raise a permission error when the pin is INPUT
+ except OSError:
+ pass
+ self.assertGreater(counter, 0)
+
+ def feeder(tid: int) -> None:
+ offset = tid % 2
+ with lock:
+ request = self.chip.request_lines(
+ config={offset: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+ barrier.wait()
+ for iteration in range(self.ITERATIONS):
+ new_dir = Direction.INPUT if iteration % 2 == 0 else Direction.OUTPUT
+ with lock:
+ request.reconfigure_lines(
+ config={offset: gpiod.LineSettings(direction=new_dir)}
+ )
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(feeder, i) for i in range(2)]
+ futures += [executor.submit(worker, i) for i in range(2, self.NUM_THREADS)]
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+
+ def test_shared_creation_and_query(self) -> None:
+ """
+ Test multiple threads can reconfigure, set values and get values on a
+ shared line request
+
+ Synchronization: Required
+
+ Note:
+ This won't actually blow up, but based on the extension implementation
+ the request has a shared buffer for offets/values that are reused for
+ getting/setting line values
+
+ Without synchronization, a thread may think it's setting one set of values
+ but the buffer values may have been overwritten by another thread
+
+ Implementation Note:
+ We use a dual set of events to make sure the feeder/worker pair alternate
+ otherwise a thread may monopolize the lock and finish before triggering
+ a set_value call. We pair this with a lock to prevent issues with the
+ aforementioned buffer contention.
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+ num_lines = self.chip.get_info().num_lines
+ request = self.chip.request_lines(
+ config={range(num_lines): gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+
+ ready_events = {0: threading.Event(), 1: threading.Event()}
+ set_events = {0: threading.Event(), 1: threading.Event()}
+
+ def worker(tid: int) -> None:
+ # we're using 2 feeder threads, each with a dedicated offset
+ offset = tid % 2
+ counter = 0
+ ready_event = ready_events[offset]
+ set_event = set_events[offset]
+ set_event.set()
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ ready_event.wait(self.TIMEOUT)
+ ready_event.clear()
+ with lock:
+ if self.chip.get_line_info(offset).direction == Direction.OUTPUT:
+ if request.get_value(offset) == Value.ACTIVE:
+ request.set_value(offset, Value.INACTIVE)
+ self.assertEqual(request.get_value(offset), Value.INACTIVE)
+ counter += 1
+ else:
+ request.set_value(offset, Value.ACTIVE)
+ self.assertEqual(request.get_value(offset), Value.ACTIVE)
+ counter += 1
+ set_event.set()
+ self.assertGreater(counter, 0)
+
+ def feeder(tid: int) -> None:
+ offset = tid % 2
+ ready_event = ready_events[offset]
+ set_event = set_events[offset]
+ barrier.wait()
+ for iteration in range(self.ITERATIONS):
+ new_dir = Direction.INPUT if iteration % 2 == 0 else Direction.OUTPUT
+ set_event.wait(self.TIMEOUT)
+ set_event.clear()
+ with lock:
+ request.reconfigure_lines(
+ config={offset: gpiod.LineSettings(direction=new_dir)}
+ )
+ ready_event.set()
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(feeder, i) for i in range(2)]
+ futures += [executor.submit(worker, i) for i in range(2, self.NUM_THREADS)]
+ try:
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+ finally:
+ request.release()
+
+ def test_shared_set_get_values(self) -> None:
+ """
+ Test setting and getting values from a single line request shared across
+ multiple threads
+
+ Synchronization: Required
+
+ Note:
+ This won't actually blow up, but based on the extension implementation
+ the request has a shared buffer for offets/values that are reused for
+ getting/setting line values
+
+ Without synchronization, a thread may think it's setting one set of values
+ but the buffer values may have been overwritten by another thread
+ """
+
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+ offset = 0
+ request = self.chip.request_lines(
+ config={0: gpiod.LineSettings(direction=Direction.OUTPUT)}
+ )
+
+ def worker() -> None:
+ counter = 0
+ barrier.wait()
+ for _ in range(self.ITERATIONS):
+ with lock:
+ if request.get_value(offset) == Value.ACTIVE:
+ request.set_value(offset, Value.INACTIVE)
+ self.assertEqual(request.get_value(offset), Value.INACTIVE)
+ counter += 1
+ else:
+ request.set_value(offset, Value.ACTIVE)
+ self.assertEqual(request.get_value(offset), Value.ACTIVE)
+ counter += 1
+ self.assertGreater(counter, 0)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker) for _ in range(self.NUM_THREADS)]
+ try:
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ future.result(timeout=self.TIMEOUT)
+ finally:
+ request.release()
+
+ def test_shared_close(self) -> None:
+ """
+ Test that querying a single line request shared across multiple threads
+ after releasing raises an error
+
+ Synchronization: Required
+
+ Note:
+ The underlying `gpiod_line_request` struct gets freed on release, leaving
+ a mine for other threads to step on
+ """
+ barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ lock = get_lock()
+
+ num_lines = self.chip.get_info().num_lines
+ request = self.chip.request_lines(
+ config={
+ range(num_lines): gpiod.LineSettings(
+ direction=Direction.OUTPUT, output_value=Value.INACTIVE
+ )
+ }
+ )
+
+ def worker() -> None:
+ barrier.wait()
+ with lock:
+ info = request.get_values(range(num_lines))
+ request.release()
+ for line in info:
+ self.assertEqual(line, Value.INACTIVE)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
+ futures = [executor.submit(worker) for _ in range(self.NUM_THREADS)]
+ error_count = 0
+ for future in as_completed(futures, timeout=self.TIMEOUT):
+ try:
+ future.result(timeout=self.TIMEOUT)
+ except gpiod.RequestReleasedError:
+ error_count += 1
+ self.assertEqual(error_count, self.NUM_THREADS - 1)
+
+
+class EdgeEvent(ThreadedTestCase):
+ def setUp(self) -> None:
+ self.sim = gpiosim.Chip(num_lines=4, label="foobar")
+ self.chip = gpiod.Chip(self.sim.dev_path)
+
+ def tearDown(self) -> None:
+ self.chip.close()
+ self.sim = None # type: ignore[assignment]
+ self.chip = None # type: ignore[assignment]
+
+ def test_read_edge_events(self) -> None:
+ """
+ Test that multiple threads can read edge events on a shared LineRequest
+
+ Synchronization: Required
+
+ Note:
+ The request object has a gpiod_edge_event_buffer for events to be read into.
+ Without synchronization, that buffer will be overwritten by another thread
+ when attempting to create event objects
+ """
+ num_lines = self.chip.get_info().num_lines
+ req = self.chip.request_lines(
+ config={
+ range(num_lines): gpiod.LineSettings(
+ direction=Direction.INPUT, edge_detection=Edge.BOTH
+ )
+ }
+ )
+
+ # If read_edge_events() is blocking, threads will hang forever waiting
+ # for events that don't exist during shutdown.
+ flags = fcntl.fcntl(req.fd, fcntl.F_GETFL)
+ fcntl.fcntl(req.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ worker_barrier = threading.Barrier(self.NUM_THREADS, timeout=self.TIMEOUT)
+ feeder_barrier = threading.Barrier(2, timeout=self.TIMEOUT)
+ done_fd = os.eventfd(0)
+
+ total = 0
+ counter_lock = threading.Lock()
+ req_lock = get_lock()
+
+ poll = epoll()
+ poll.register(req.fd, EPOLLIN)
+ poll.register(done_fd, EPOLLIN)
+
+ def reader_worker(tid: int) -> None:
+ nonlocal total
+ should_exit = False
+ local_count = 0
+ worker_barrier.wait()
+
+ while not should_exit:
+ events = poll.poll(self.TIMEOUT)
+
+ for fd, _ in events:
+ if fd == done_fd:
+ should_exit = True
+ continue
+
+ if fd == req.fd:
+ try:
+ with req_lock:
+ # O_NONBLOCK prevents hanging
+ evs = req.read_edge_events()
+ if evs:
+ local_count += len(evs)
+ except OSError as e:
+ if e.errno == errno.EAGAIN:
+ continue
+ raise
+
+ with counter_lock:
+ total += local_count
+
+ def feeder(tid: int) -> None:
+ offsets = list(range(tid, num_lines, 2))
+ worker_barrier.wait()
+
+ for i in range(int(self.ITERATIONS)):
+ offset = offsets[i % len(offsets)]
+ for pull in [gpiosim.Chip.Pull.UP, gpiosim.Chip.Pull.DOWN]:
+ self.sim.set_pull(offset, pull)
+
+ feeder_barrier.wait()
+ # Thread 0 signals done when all pulses have fired
+ if tid == 0:
+ os.eventfd_write(done_fd, 1)
+
+ with ThreadPoolExecutor(max_workers=self.NUM_THREADS) as ex:
+ futures = [ex.submit(feeder, i) for i in range(2)]
+ futures += [ex.submit(reader_worker, i) for i in range(2, self.NUM_THREADS)]
+
+ try:
+ for f in as_completed(futures, timeout=self.TIMEOUT):
+ f.result(timeout=self.TIMEOUT)
+ self.assertGreater(total, 0)
+ finally:
+ for fd in [req.fd, done_fd]:
+ poll.unregister(fd)
+ poll.close()
+ os.close(done_fd)
+ req.release()
--
2.43.0
next prev parent reply other threads:[~2026-05-22 20:04 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-22 20:04 [libgpiod][PATCH 0/3] bindings: python: add support for free-threaded Python Vincent Fazio
2026-05-22 20:04 ` Vincent Fazio [this message]
2026-05-26 11:05 ` [libgpiod][PATCH 1/3] bindings: python: tests: add multi-threaded tests Bartosz Golaszewski
2026-05-26 17:45 ` Vincent Fazio
2026-05-27 7:51 ` Bartosz Golaszewski
2026-05-22 20:04 ` [libgpiod][PATCH 2/3] bindings: python: support free-threaded CPython Vincent Fazio
2026-05-26 11:07 ` Bartosz Golaszewski
2026-05-26 17:50 ` Vincent Fazio
2026-05-27 7:50 ` Bartosz Golaszewski
2026-05-22 20:04 ` [libgpiod][PATCH 3/3] bindings: python: add a changelog Vincent Fazio
2026-05-22 20:24 ` [libgpiod][PATCH 0/3] bindings: python: add support for free-threaded Python Vincent Fazio
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260522200419.105496-2-vfazio@gmail.com \
--to=vfazio@gmail.com \
--cc=brgl@kernel.org \
--cc=linux-gpio@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox