Source code for peakrdl_pybind11.masters.base

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from typing import Any


[docs] @dataclass class AccessOp: """One register-access operation used by batched ``read_many`` / ``write_many``. For reads, ``value`` is ignored and conventionally zero; for writes it carries the value to write. Mirrors the C++ ``AccessOp`` struct exposed by every generated module. """ address: int value: int = 0 width: int = 4
[docs] class MasterBase(ABC): """ Base class for Master interfaces Masters provide the actual communication mechanism for reading/writing registers. .. note:: For in-memory test/mock fixtures, prefer the C++ ``MockMaster`` and ``CallbackMaster`` classes shipped *inside* every generated module (e.g. ``my_soc.MockMaster()``). They live entirely in C++, skip the pybind11 trampoline, and are noticeably faster on a tight register loop than wrapping a Python subclass of ``MasterBase`` via ``wrap_master``. Subclass ``MasterBase`` only when the master truly has to be implemented in Python (sockets, REST APIs, exotic hardware glue) — at which point per-access overhead is dominated by I/O anyway. Extension points (no-op defaults; sibling units of the API overhaul override these in ``runtime/bus_policies.py`` and friends): * :meth:`peek` — non-snooping read (defaults to :meth:`read`). * :meth:`barrier` — flush in-flight ops (default: no-op). * :meth:`on_disconnect` / :attr:`_on_disconnect_callbacks` — callbacks fired when transport drops. * :meth:`set_retry_policy` / :attr:`_retry_policy` — retry config storage. * :meth:`cache_get` / :meth:`cache_set` — read-cache hooks (default: pass-through, no caching). """ # State buckets are populated lazily so subclass ``__init__`` doesn't # need to call super().__init__(); the existing concrete masters # (MockMaster etc.) keep working unchanged. _on_disconnect_callbacks: list[Callable[[], None]] _retry_policy: dict[str, Any]
[docs] @abstractmethod def read(self, address: int, width: int) -> int: """ Read a value from the given address Args: address: Absolute address to read from width: Width of the register in bytes Returns: Value read from the address """ pass
[docs] @abstractmethod def write(self, address: int, value: int, width: int) -> None: """ Write a value to the given address Args: address: Absolute address to write to value: Value to write width: Width of the register in bytes """ pass
[docs] def read_many(self, ops: Sequence[AccessOp]) -> list[int]: """Batched read. Default impl loops single-op :meth:`read`. Subclasses can override with a fast path that performs one transport round-trip for N ops (e.g. one socket exchange instead of N). """ return [self.read(op.address, op.width) for op in ops]
[docs] def write_many(self, ops: Sequence[AccessOp]) -> None: """Batched write. Default impl loops single-op :meth:`write`.""" for op in ops: self.write(op.address, op.value, op.width)
# ----------------------------------------------------------------- # Extension points # -----------------------------------------------------------------
[docs] def peek(self, address: int, width: int) -> int: """Non-snooping read. Default implementation forwards to :meth:`read`. Sibling unit ``runtime/bus_policies.py`` overrides this on caching masters to avoid promoting cache lines. """ return self.read(address, width)
[docs] def barrier(self) -> None: """Flush any in-flight operations. Default: no-op. Sibling unit ``runtime/bus_policies.py`` provides per-master implementations (e.g. waiting for outstanding write completions on JTAG masters). The ``soc.barrier(scope="all")`` SoC-wide form fans this out across every attached master. """ return None
[docs] def on_disconnect(self, callback: Callable[[], None]) -> None: """Register ``callback`` to fire when the transport drops. Default: store on a per-instance list. Concrete masters that actually have a transport (SSH, OpenOCD, JTAG) override this to wire the callback into their disconnect detection. Sibling extension point: ``runtime/bus_policies.py``. """ if not hasattr(self, "_on_disconnect_callbacks"): self._on_disconnect_callbacks = [] self._on_disconnect_callbacks.append(callback)
[docs] def set_retry_policy(self, **kwargs: Any) -> None: """Configure transient-error retry behaviour. Default: stash kwargs on a per-instance dict. Sibling unit ``runtime/bus_policies.py`` consumes the dict in its retry/backoff wrapper. """ if not hasattr(self, "_retry_policy"): self._retry_policy = {} self._retry_policy.update(kwargs)
[docs] def cache_get(self, address: int) -> int | None: """Look up ``address`` in the master's read cache. Default: ``None`` (no cache). Sibling unit ``runtime/bus_policies.py`` overrides on caching masters. """ return None
[docs] def cache_set(self, address: int, value: int) -> None: """Insert ``(address, value)`` into the master's read cache. Default: pass-through (no cache). """ return None