Tapping on the revamped clusterctrl

This commit is contained in:
Reid 'arrdem' McKenzie 2021-11-02 01:12:06 -06:00
parent db84e8b26c
commit a74c715041
11 changed files with 274 additions and 1165 deletions

View file

@ -1,5 +1,8 @@
py_project(
name = "lib"
name = "lib",
lib_deps = [
py_requirement("smbus2"),
],
)
zapp_binary(

View file

@ -2,11 +2,12 @@
This project is a rewrite of the `clusterctrl` tool provided by the 8086 consultancy for interacting with their ClusterCTRL and ClusterHAT line of Raspberry Pi backplane products.
It serves to factor out the underlying i²c driver common to the ClusterCTRL family of products, and aims to implement a far more idiomatic and better documented CLI tool over top of that driver.
It serves to factor out the underlying i²c driver common to the ClusterCTRL family of products, and aims to implement a far more idiomatic and better documented and far more consistent CLI tool.
## license
This software is published under the MIT License
Copyright (c) 2018 Chris Burton (8086 consultancy)
Copyright (c) 2021 Reid McKenzie
This software is published under the MIT License

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,21 @@
#!/usr/bin/env python3
# alert on [<devices>] # Turns on all ALERT LED or for pX devices
# alert off [<devices>] # Turns off all ALERT LED or for pX devices
import click
@click.group()
def alert():
pass
@alert.command("on")
def alert_on():
pass
@alert.command("off")
def alert_off():
pass

View file

@ -0,0 +1,18 @@
"""Turns FAN on/off for CTRL with <order>."""
import click
@click.group()
def fan():
pass
@fan.command("on")
def fan_on():
pass
@fan.command("off")
def fan_off():
pass

View file

@ -0,0 +1,23 @@
"""USB hub can be turned on/off on Cluster HAT and reset on CTRL"""
import click
@click.group()
def hub():
pass
@hub.command("on")
def hub_on():
pass
@hub.command("off")
def hub_off():
pass
@hub.command("reset")
def hub_reset():
pass

View file

@ -0,0 +1,18 @@
"""Enable or disable all status LEDs."""
import click
@click.group()
def led():
pass
@led.command("on")
def led_on():
pass
@led.command("off")
def led_off():
pass

View file

@ -0,0 +1,21 @@
import click
# power on [<devices>] # Turn on All Pi Zero or devices
# power off [<devices>] # Turn off All Pi Zero or devices
@click.group()
def power():
pass
@power.command("on")
@click.argument("devices", nargs="*")
def power_on(devices):
"""Power on selected devices."""
@power.command()
@click.argument("devices", nargs="*")
def power_off(devices):
"""Power off selected devices."""

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
# save all <order> # Save current settings to EEPROM
# save order <order> # Save current "order" setting to EEPROM
# save usbboot <order> # Save current USBBOOT settings to EEPROM
# save pos <order> # Save current Power On State to EEPROM
# save defaults <order> # Save default settings to EEPROM
import click
@click.group()
def save():
pass
@save.command("all")
def save_all():
pass
@save.command("order")
def save_order():
pass
@save.command("usbboot")
def save_usbboot():
pass
@save.command("power")
def save_power():
pass
@save.command("defaults")
def save_defaults():
pass

View file

@ -2,10 +2,30 @@
from enum import Enum
from itertools import chain, repeat
import logging
from random import SystemRandom
from time import sleep
from typing import Union
from typing import NamedTuple, Union
import smbus
log = logging.getLogger(__name__)
smbus = None
if not smbus:
try:
import smbus
except ImportError as e:
log.warning(e)
if not smbus:
try:
import smbus2 as smbus
except ImportError as e:
log.warning(e)
if not smbus:
raise ImportError("Unable to load either SMBus or SMBus2")
def once(f):
@ -113,14 +133,43 @@ class Data(Enum):
FANSTATUS = 0x04 # Read fan status
class ClusterCTRLDriver(object):
def __init__(self, bus: smbus.SMBus, address: int = I2C_ADDRESS, delay: int = 0, clear = False):
class PiID(NamedTuple):
"""Represent Pi IDs as something somewhat unique; a CTRL/HAT id and the Pi ID.
These IDs are expected to be unique at the host level; not at the cluster level.
"""
ctrl_id: int
pi_id: int
def __repr__(self) -> str:
return f"<PiID {self.ctrl_id:03d}-{self.pi_id:02d}>"
class ClusterCTRLv2Driver(object):
def __init__(self, bus: smbus2.SMBus, address: int = I2C_ADDRESS, delay: int = 0, clear = False):
"""Initialize a ClusterCTRL/ClusterHAT driver instance for a given bus device."""
self._bus = bus
self._address = address
self._delay = delay
self._clear = clear
try:
if (version := self._read(Reg.VERSION)) != 2:
raise IOError(f"Unsupported register format {version}; expected 2")
except:
raise ValueError("Cannot communicate with a ClusterCTRL/ClusterHAT on the given bus")
# This is a firmware default value indicating an uninitialized board.
# Randomize it if present.
if self.get_order() == 20:
v = 20
r = SystemRandom()
while v == 20:
v = r.randint(0, 256)
self.set_order(v)
self.eeprom_save_order()
def _read(self, id: Union[Reg, Data], len: int = 1):
"""A convenient abstraction for reading data back."""
@ -182,24 +231,32 @@ class ClusterCTRLDriver(object):
# Return the (mostly) meaningful return code
return self._read(Reg.DATA0)
def _id(self, id: PiID) -> int:
"""Validate a logical ID and convert it to a numeric one."""
assert self.min_pi <= id <= self.max_pi
assert self.get_order() == id.ctrl_id
return id.pi_id
@property
def min_pi(self):
"""Get the minimum supported Pi ID on this controller."""
return 1
return PiID(self.get_order(), 1)
@property
@once
def max_pi(self):
"""Get the maximum supported Pi ID on this controller."""
return self._read(Reg.MAXPI)
return PiID(self.get_order(), self._read(Reg.MAXPI))
@property
def pi_ids(self):
"""Iterate over the IDs of Pis which could be connected to this controller."""
return range(self.min_pi, self.max_pi + 1)
order = self.get_order()
for i in range(1, 6):
yield PiID(order, i)
@property
def type(self) -> BoardType:
@ -275,22 +332,22 @@ class ClusterCTRLDriver(object):
####################################################################################################
# Power management
####################################################################################################
def power_on(self, id: int):
def power_on(self, id: PiID):
"""Power on a given slot by ID."""
assert 0 < id <= self.max_pi
id = self._id(id)
return self._call(Cmd.ON, id)
def power_off(self, id: int):
def power_off(self, id: PiID):
"""Power off a given slot by ID."""
assert 0 < id <= self.max_pi
id = self._id(id)
return self._call(Cmd.OFF, id)
def power_status(self, id: int):
def power_status(self, id: PiID):
"""Read the status of a given slot by ID."""
assert 0 < id <= self.max_pi
id = self._id(id)
return self._call(Cmd.GET_PSTATUS, id)
def power_all_on(self):
@ -348,28 +405,29 @@ class ClusterCTRLDriver(object):
def set_order(self, order: int):
"""Set an 'order' (Controller ID) value."""
assert 0 < order <= 255
assert 0 < order <= 255, "Order must be in the single byte range"
assert order != 20, "20 is the uninitialized order value, use something else"
return self._call(Cmd.SET_ORDER, order)
####################################################################################################
# USB booting
####################################################################################################
def usbboot_on(self, id: int):
def usbboot_on(self, id: PiID):
"""Enable USB booting for the given Pi."""
assert 0 < id <= self.max_pi
id = self._id(id)
return self._call(Cmd.USBBOOT_EN, id)
def usbboot_off(self, id: int):
def usbboot_off(self, id: PiID):
"""Disable USB booting for the given Pi."""
assert 0 < id <= self.max_pi
id = self._id(id)
return self._call(Cmd.USBBOOT_DIS, id)
def usbboot_status(self, id: int):
def usbboot_status(self, id: PiID):
"""Get the current USB booting status for the given Pi."""
assert 0 < id <= self.max_pi
id = self._id(id)
return self._call(Cmd.GET_USTATUS, id)
####################################################################################################
@ -384,7 +442,8 @@ class ClusterCTRLDriver(object):
def adc_ids(self):
return range(1, self.max_adc + 1)
def read_adc(self, id: int):
def read_adc(self, id: PiID):
id = self._id(id)
self._call(Cmd.GET_DATA, Data.ADC_READ.value, id)
# Now this is screwy.
# DATA0 gets set to 0 or 1, indicating the voldage type
@ -412,19 +471,17 @@ class ClusterCTRLDriver(object):
if self._call(Cmd.GET_DATA, Data.ADC_TEMP.value) == 2:
return self._repack(self._read(Reg.DATA2, 2))
class ClusterHATDriver(ClusterCTRLDriver):
"""The ClusterHAT controller supports some verbs not supported by the basic ClusterCTRL board."""
# FIXME: The ClusterHAT also has some CONSIDERABLE differences in how it does I/O, due to leveraging RPi GPIO not
# just i2c. Whether this is essential or incidental is unclear.
def led_on(self, id: int):
####################################################################################################
# Operations with inconsistent platform support
####################################################################################################
def led_on(self, id: PiID):
"""Turn on an LED by ID."""
id = self._id(id)
return self._call(Cmd.LED_EN, id)
def led_off(self, id: int):
def led_off(self, id: PiID):
"""Turn off an LED by ID."""
id = self._id(id)
return self._call(Cmd.LED_DIS, id)

View file

@ -75,6 +75,7 @@ requests-toolbelt==0.9.1
requirements-parser==0.2.0
setuptools==51.0.0
six==1.15.0
smbus2==0.4.1
snowballstemmer==2.1.0
sortedcontainers==2.3.0
soupsieve==2.2.1