Wait, poll, predicate#
This page is the test-author’s polling toolkit: how to block until hardware reaches a state, how to express that state as a value comparison or a predicate, how to sample a register many times for statistical checks, and what happens when the wait times out. Every register and field exposes the same small surface; the goal is that the most common test idiom — “wait until this bit is set” — is one method call, not a hand-rolled loop.
Why this matters#
From the design sketch:
wait_foris the single most common test idiom; it deserves dedicated ergonomics.
A naive polling loop in user code is easy to get subtly wrong: missing a timeout, never sleeping, never logging the last value seen, never back-pressuring a busy bus. The methods on this page handle all of that uniformly, so test code reads as a statement of intent (“wait for tx_ready”) and not as bus plumbing.
The contract:
Default-bounded. Every wait takes a
timeout=and raises a descriptive error if it expires.Returns the value seen. A successful wait returns the matching read so callers don’t immediately read again.
Sampling is built in.
read(n=...)andhistogram(n=...)cover debouncing and glitch-detection without a hand-rolled loop.Sync first, async parallel. Every wait has an
await_for/aio*dual usable fromasynciocode.
wait_for on a single field#
The shortest path: wait until a field equals a value.
# Block until tx_ready becomes True, or raise after 1.0 s.
soc.uart.status.tx_ready.wait_for(True, timeout=1.0)
# Tune the polling cadence and add jitter to avoid lock-step
# interference with hardware events.
soc.uart.status.tx_ready.wait_for(
True,
timeout=1.0,
period=0.001,
jitter=True,
)
Arguments:
value(positional): the target. For a 1-bit field,True/False. For an enum field, the enum member (BaudRate.BAUD_115200) or its int value. Type-checked against the field’s encoding.timeout(seconds): hard upper bound on the wait. Required in practice; the implementation raises if you callwait_forwith no timeout to keep test runs from hanging.period(seconds, default master-dependent): nominal time between polls. Smaller is more responsive, larger is gentler on the bus.jitter(bool, defaultFalse): perturb each sleep by a small random factor. Useful when the polled signal is itself periodic and a fixed period could alias.
On success, wait_for returns the matching FieldValue (the value the
field had on the read that satisfied the comparison). On timeout, it raises
WaitTimeoutError (see below).
Note
wait_for is robust against exotic equality comparators. If the wrapped
value’s == raises (for example, NumPy ndarray’s vector == when
shapes don’t broadcast, or a third-party value wrapper that throws on
incompatible types), the polling loop treats that comparison as a no-match
and keeps polling instead of crashing the test. This matters when callers
wrap fields with custom equality behaviour.
wait_until on a register predicate#
When the wait condition involves more than one field of the same register —
“tx_ready set and error clear” — use wait_until on the register, not
wait_for on a field. The predicate receives a fresh RegisterValue per
poll, so it sees a coherent snapshot for that one bus read:
# Each poll is one bus read; the predicate sees a coherent register value.
soc.uart.status.wait_until(
lambda v: v.tx_ready and not v.error,
timeout=1.0,
)
The predicate’s argument is a RegisterValue; field accessors
(v.tx_ready, v.error) return the same typed values you’d get from
soc.uart.status.read(). Returning truthy ends the wait; returning falsy
re-polls.
wait_until accepts the same timeout, period, and jitter
arguments as wait_for. On success it returns the RegisterValue that
satisfied the predicate.
Use wait_for when the condition is “field equals value”; use
wait_until when the condition is anything more complex, even if it’s
still over a single field. wait_until is also the right tool for
inequality checks (v.count > 16) and for waits that depend on
external state (lambda v: v.tx_ready and feature_enabled).
IRQ shortcut#
For interrupt sources, the polling pattern is so common that the
Interrupts group exposes its own wait:
# Block until tx_done is pending, or raise after 1.0 s.
soc.uart.interrupts.tx_done.wait(timeout=1.0)
This is the same as
soc.uart.status.tx_done_int.wait_for(True, timeout=1.0) for chips where
the interrupt state register is the natural place to wait, but it reads
better and respects the per-RDL clear/acknowledge semantics. See
Interrupts for the full interrupt surface (enable, clear,
fire, group operations).
Sampling#
Two patterns come up so often that they live next to wait_for: capturing
N reads of the same register, and bucketing them into a histogram.
# 100 fresh bus reads of soc.adc.sample, returned as a NumPy array.
samples = soc.adc.sample.read(n=100)
samples.shape # (100,)
samples.mean()
# 1000 reads, bucketed by value.
from collections import Counter
hist = soc.adc.sample.histogram(n=1000)
hist.most_common(5)
A note on shape polymorphism: soc.adc.sample.read() (no n) returns a
single decoded value, the same shape as any other field read. Passing
n=... switches to a NumPy array of length n. The cutover is the
keyword argument, not the operation.
read(n=...) is also valid on register and memory views; the array’s dtype
matches the register width or the memory’s word width.
Sampling is intentionally separate from wait_for. Use wait_for to
synchronize (“I want a known state before I read”); use read(n=...) to
characterize (“I want N samples of whatever the hardware is doing”).
Return types#
The two sampling helpers return distinct, freshly allocated containers:
sample(field, n=N)(and the equivalentread(n=N)) returns anumpy.ndarray. The dtype is whatever NumPy infers from the readings — for register/field reads that decode to Pythonintthis lands on a platform-appropriate integer width (typicallyint64/uint64on 64-bit hosts). Callers that need a specific dtype should cast explicitly. The array is freshly allocated per call; it is not aliased to any internal buffer, so mutation of the returned array is safe.histogram(field, n=N)returns acollections.Counterkeyed by value.histogramdeliberately avoids materialising an intermediate ndarray so it works with read payloads that are unhashable as a NumPy dtype but hashable as Python objects (notably enum members and other rich field values).
import numpy as np
from collections import Counter
samples = soc.adc.sample.read(n=100)
assert isinstance(samples, np.ndarray)
assert samples.shape == (100,)
hist = soc.adc.sample.histogram(n=1000)
assert isinstance(hist, Counter)
hist.most_common(5)
Async equivalents#
Every blocking wait has a coroutine dual. The methods are renamed with an
await_ or aio prefix so the call site reads correctly under
await:
# asyncio test
await soc.uart.status.tx_ready.await_for(True, timeout=1.0)
await soc.uart.status.await_until(
lambda v: v.tx_ready and not v.error,
timeout=1.0,
)
await soc.uart.interrupts.tx_done.aiowait(timeout=1.0)
The argument shapes match their sync counterparts. Returns and exceptions are
the same. Use the async forms whenever you’re already inside an async def
test or a notebook running an event loop, so a slow hardware response doesn’t
block the loop.
The complementary surface (aread / awrite / amodify on every
node) is described in the bus & masters documentation; everything on this
page about timeouts, predicates, and sampling applies identically there.
Note
The async dual is not natively non-blocking. There is no native async
master transport at this layer. Inside await_for, await_until, and
aiowait the polling loop calls the sync node.read() directly; only
the inter-poll sleep is an await asyncio.sleep yield point. The bus
read therefore blocks the running event loop for its full duration. This
is fine for in-process mocks and short transactions, but if your master
has long read() latency you should not assume the loop stays
responsive while a wait is in flight — do not expect true non-blocking
I/O from these helpers.
Want true offload? Wrap the underlying transport in
peakrdl_pybind11.runtime.async_session.AsyncSession, which uses
asyncio.run_in_executor() against a
concurrent.futures.ThreadPoolExecutor to push every bus call to
a worker thread. await_for/await_until/aiowait themselves do
not go through that path — they call the sync node’s read() directly.
Note
The loop= kwarg is accepted but not forwarded. await_for,
await_until, and aiowait accept a loop= keyword for API
symmetry with older asyncio code, but the value is ignored: it is not
passed to asyncio.sleep(), because the explicit-loop parameter was
deprecated in Python 3.8 and removed in 3.10. Pass it if you like for
backwards source compatibility; do not rely on it to bind the wait to a
non-default event loop.
# Both forms are valid; the loop= argument is accepted but ignored.
await soc.uart.status.tx_ready.await_for(True, timeout=1.0)
await soc.uart.status.tx_ready.await_for(
True, timeout=1.0, loop=asyncio.get_running_loop()
)
Timeout error#
On timeout, every wait raises WaitTimeoutError with a descriptive message
that names the wait target, the expected condition, the last value seen, and
how long the wait ran:
WaitTimeoutError: soc.uart.status.tx_ready did not reach True within 1.000s
last value seen : False
expected : True
polled : 1024 reads, period=0.001s
For predicate waits, the message includes a short repr of the failing
RegisterValue so post-mortem doesn’t require a re-run:
WaitTimeoutError: soc.uart.status predicate not satisfied within 1.000s
last value seen : UartStatus(0x00000004) tx_ready=0 error=1
polled : 1024 reads, period=0.001s
For deeper post-mortem, every wait accepts a capture=True flag that
attaches the full list of sampled RegisterValue (or FieldValue)
objects to the exception under .samples:
try:
soc.uart.status.wait_until(
lambda v: v.tx_ready and not v.error,
timeout=1.0,
capture=True,
)
except WaitTimeoutError as e:
# Replay the trace for the bug report.
for v in e.samples:
print(v.hex(), v.error, v.tx_ready)
capture is opt-in because the sample list can grow long for tight polling
loops; the default error message is descriptive enough for most failures.
Which WaitTimeoutError?#
Two classes share the name:
peakrdl_pybind11.errors.WaitTimeoutError— the one this module raises. It is the user-facing class re-exported at the top-levelpeakrdl_pybind11.errorspath. It inherits from bothPeakRDLErrorandTimeoutError.peakrdl_pybind11.runtime.errors.WaitTimeoutError— the entry in the lower-level error taxonomy, with a different positional argument signature. Some lower-level code paths still raise this one; it inherits only fromTimeoutError.
Both classes inherit from TimeoutError, so a single
except TimeoutError catches either. If you want to catch every
PeakRDL-pybind11 error in one clause, catch
PeakRDLError; that catches
the wait-poll variant but not the runtime-only one (the runtime one does
not inherit from PeakRDLError). When you really need to distinguish
which class was raised, import the user-facing one explicitly:
from peakrdl_pybind11.errors import WaitTimeoutError
try:
soc.uart.status.tx_ready.wait_for(True, timeout=1.0)
except WaitTimeoutError as e:
# e.path, e.expected, e.last_seen, e.timeout, e.polls are all set.
...
See Error Model for the full typed-error taxonomy and the rules for catching errors broadly versus narrowly.
See also#
Interrupts — interrupt sources, group operations, and the
wait/aiowaitshortcuts.Observers and Observation Hooks — read/write hooks, including how to capture sampled values from a wait without
capture=True(an observer sees every read the wait performs, with no opt-in needed).