Source code for peakrdl_pybind11.masters.openocd

"""
OpenOCD Master for JTAG/SWD debugging
"""

import socket

from . import MasterBase


[docs] class OpenOCDMaster(MasterBase): """ Master interface using OpenOCD TCL server Connects to OpenOCD's TCL interface for reading/writing memory """
[docs] def __init__(self, host: str = "localhost", port: int = 6666, timeout: float = 5.0) -> None: """ Initialize OpenOCD connection Args: host: OpenOCD server host port: OpenOCD TCL server port (default 6666) timeout: Socket timeout in seconds """ self.host = host self.port = port self.timeout = timeout self.socket: socket.socket | None = None self._connect()
def _connect(self) -> None: """Establish connection to OpenOCD""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(self.timeout) self.socket.connect((self.host, self.port)) # Read the welcome message self._recv() except Exception as e: raise RuntimeError(f"Failed to connect to OpenOCD at {self.host}:{self.port}: {e}") from e def _send(self, command: str) -> str: """Send a command to OpenOCD and return the response""" if self.socket is None: raise RuntimeError("Not connected to OpenOCD") try: # Send command with newline self.socket.sendall((command + "\n").encode("utf-8")) return self._recv() except Exception as e: raise RuntimeError(f"OpenOCD command failed: {e}") from e def _recv(self) -> str: """Receive response from OpenOCD""" if self.socket is None: raise RuntimeError("Not connected to OpenOCD") data = b"" while True: chunk = self.socket.recv(4096) if not chunk: break data += chunk # OpenOCD ends responses with a specific marker if data.endswith(b"\x1a"): break return data.decode("utf-8", errors="ignore").rstrip("\x1a")
[docs] def read(self, address: int, width: int) -> int: """ Read memory via OpenOCD Args: address: Memory address to read width: Width in bytes Returns: Value read from memory """ # Determine OpenOCD memory access command based on width if width == 1: cmd = f"mdb 0x{address:x}" elif width == 2: cmd = f"mdh 0x{address:x}" elif width == 4: cmd = f"mdw 0x{address:x}" elif width == 8: cmd = f"mdd 0x{address:x}" else: raise ValueError(f"Unsupported width: {width}") response = self._send(cmd) # Parse response (format: "0xADDRESS: VALUE") try: parts = response.split(":") if len(parts) < 2: raise ValueError("missing ':' separator") value_str = parts[1].strip().split()[0] return int(value_str, 16) except (ValueError, IndexError) as e: raise RuntimeError(f"Failed to parse OpenOCD response: {response!r}: {e}") from e
[docs] def write(self, address: int, value: int, width: int) -> None: """ Write memory via OpenOCD Args: address: Memory address to write value: Value to write width: Width in bytes """ # Determine OpenOCD memory access command based on width if width == 1: cmd = f"mwb 0x{address:x} 0x{value:x}" elif width == 2: cmd = f"mwh 0x{address:x} 0x{value:x}" elif width == 4: cmd = f"mww 0x{address:x} 0x{value:x}" elif width == 8: cmd = f"mwd 0x{address:x} 0x{value:x}" else: raise ValueError(f"Unsupported width: {width}") self._send(cmd)
[docs] def halt(self) -> None: """Halt the target""" self._send("halt")
[docs] def resume(self) -> None: """Resume the target""" self._send("resume")
[docs] def reset(self, halt: bool = False) -> None: """Reset the target""" if halt: self._send("reset halt") else: self._send("reset")
[docs] def close(self) -> None: """Close the connection""" if self.socket: try: self._send("exit") except Exception: pass self.socket.close() self.socket = None
def __del__(self) -> None: """Cleanup on deletion. Avoid issuing protocol commands during GC.""" sock = getattr(self, "socket", None) if sock is not None: try: sock.close() except Exception: pass self.socket = None