Source code for peakrdl_pybind11.masters.ssh
"""
SSH Master for remote register access
"""
import subprocess
from . import MasterBase
[docs]
class SSHMaster(MasterBase):
"""
Master interface using SSH for remote memory access
Uses devmem or similar tools on the remote system
"""
[docs]
def __init__(
self,
host: str,
username: str | None = None,
key_file: str | None = None,
tool: str = "devmem",
) -> None:
"""
Initialize SSH connection
Args:
host: SSH host to connect to
username: SSH username (optional, uses current user if not specified)
key_file: Path to SSH private key file (optional)
tool: Memory access tool on remote system (default: "devmem")
Note:
Password authentication is not supported. Use SSH keys via
``key_file`` (or your ssh-agent / ``~/.ssh/config``).
"""
self.host = host
self.username = username
self.key_file = key_file
self.tool = tool
# Build base SSH command
self.ssh_cmd = ["ssh"]
if key_file:
self.ssh_cmd.extend(["-i", key_file])
if username:
self.ssh_cmd.append(f"{username}@{host}")
else:
self.ssh_cmd.append(host)
# Test connection
self._test_connection()
def _test_connection(self) -> None:
"""Test SSH connection"""
try:
result = subprocess.run(
[*self.ssh_cmd, "echo", "test"], capture_output=True, text=True, timeout=10
)
except (OSError, subprocess.TimeoutExpired) as e:
raise RuntimeError(f"Failed to connect via SSH to {self.host}: {e}") from e
if result.returncode != 0:
raise RuntimeError(f"SSH connection test to {self.host} failed: {result.stderr.strip()}")
def _run_remote_command(self, command: str) -> str:
"""Execute a command on the remote system"""
try:
result = subprocess.run([*self.ssh_cmd, command], capture_output=True, text=True, timeout=30)
except subprocess.TimeoutExpired:
raise RuntimeError(f"Remote command timed out: {command}") from None
except OSError as e:
raise RuntimeError(f"Failed to execute remote command: {e}") from e
if result.returncode != 0:
raise RuntimeError(f"Remote command failed: {result.stderr.strip()}")
return result.stdout.strip()
[docs]
def read(self, address: int, width: int) -> int:
"""
Read memory via SSH using devmem
Args:
address: Memory address to read
width: Width in bytes
Returns:
Value read from memory
"""
# devmem uses bit width, not byte width
bit_width = width * 8
if self.tool == "devmem":
# devmem <address> [width]
cmd = f"{self.tool} 0x{address:x}"
if width != 4: # devmem defaults to 32-bit
cmd += f" {bit_width}"
else:
# Custom tool - assume similar syntax
cmd = f"{self.tool} read 0x{address:x} {width}"
output = self._run_remote_command(cmd)
# Parse output (devmem format: "0xVALUE")
try:
# Look for hex value in output
for line in output.split("\n"):
line = line.strip()
if line.startswith("0x") or line.startswith("0X"):
return int(line, 16)
# Try parsing as decimal
return int(output)
except ValueError as e:
raise RuntimeError(f"Failed to parse remote read output: {output}: {e}") from e
[docs]
def write(self, address: int, value: int, width: int) -> None:
"""
Write memory via SSH using devmem
Args:
address: Memory address to write
value: Value to write
width: Width in bytes
"""
# devmem uses bit width, not byte width
bit_width = width * 8
if self.tool == "devmem":
# devmem <address> [width] <value>
cmd = f"{self.tool} 0x{address:x}"
if width != 4: # devmem defaults to 32-bit
cmd += f" {bit_width}"
cmd += f" 0x{value:x}"
else:
# Custom tool - assume similar syntax
cmd = f"{self.tool} write 0x{address:x} 0x{value:x} {width}"
self._run_remote_command(cmd)