This commit is contained in:
Reid 'arrdem' McKenzie 2021-11-09 12:31:31 -07:00
parent 0d81f6540d
commit b6b1f23188
4 changed files with 90 additions and 78 deletions

View file

@ -31,6 +31,7 @@ import click
# #
# <order> selects which Cluster CTRL devices matches that <order> number # <order> selects which Cluster CTRL devices matches that <order> number
@click.group() @click.group()
def cli(): def cli():
pass pass

View file

@ -9,6 +9,7 @@ import click
def power(): def power():
pass pass
@power.command("on") @power.command("on")
@click.argument("devices", nargs="*") @click.argument("devices", nargs="*")
def power_on(devices): def power_on(devices):

View file

@ -42,6 +42,7 @@ def once(f):
""" """
unset = val = object() unset = val = object()
def _helper(*args, **kwargs): def _helper(*args, **kwargs):
nonlocal val nonlocal val
if val is unset: if val is unset:
@ -146,6 +147,7 @@ class PiRef(NamedTuple):
These IDs are expected to be unique at the host level; not at the cluster level. These IDs are expected to be unique at the host level; not at the cluster level.
""" """
controller_id: int controller_id: int
pi_id: int pi_id: int
@ -154,11 +156,13 @@ class PiRef(NamedTuple):
class ClusterCTRLv2Driver(object): class ClusterCTRLv2Driver(object):
def __init__(self, def __init__(
self,
bus: smbus.SMBus, bus: smbus.SMBus,
address: int = I2C_ADDRESS, address: int = I2C_ADDRESS,
delay: int = 0, delay: int = 0,
clear: bool = False): clear: bool = False,
):
"""Initialize a ClusterCTRL/ClusterHAT driver instance for a given bus device.""" """Initialize a ClusterCTRL/ClusterHAT driver instance for a given bus device."""
self._bus = bus self._bus = bus
self._address = address self._address = address
@ -169,7 +173,9 @@ class ClusterCTRLv2Driver(object):
if (version := self._read(Reg.VERSION)) != 2: if (version := self._read(Reg.VERSION)) != 2:
raise IOError(f"Unsupported register format {version}; expected 2") raise IOError(f"Unsupported register format {version}; expected 2")
except: except:
raise ValueError("Cannot communicate with a ClusterCTRL/ClusterHAT on the given bus") raise ValueError(
"Cannot communicate with a ClusterCTRL/ClusterHAT on the given bus"
)
self._post_init() self._post_init()
@ -228,7 +234,19 @@ class ClusterCTRLv2Driver(object):
if self._clear or clear: if self._clear or clear:
args = chain(args, repeat(0)) args = chain(args, repeat(0))
args = zip([Reg.DATA0, Reg.DATA1, Reg.DATA2, Reg.DATA3, Reg.DATA4, Reg.DATA5, Reg.DATA6, Reg.DATA7], args) args = zip(
[
Reg.DATA0,
Reg.DATA1,
Reg.DATA2,
Reg.DATA3,
Reg.DATA4,
Reg.DATA5,
Reg.DATA6,
Reg.DATA7,
],
args,
)
for r, v in args: for r, v in args:
self._write(r, v) self._write(r, v)

View file

@ -90,7 +90,10 @@ def assert_log(bus, log):
log = [[simplify(e) for e in cmd] for cmd in log] log = [[simplify(e) for e in cmd] for cmd in log]
assert sublist(bus._log, log), "\n".join( assert sublist(bus._log, log), "\n".join(
["Failed to find expected sublog", "log:"] + [f"- {e}" for e in bus._log] + ["expected:"] + [f"- {e}" for e in log] ["Failed to find expected sublog", "log:"]
+ [f"- {e}" for e in bus._log]
+ ["expected:"]
+ [f"- {e}" for e in log]
) )
@ -98,103 +101,93 @@ def test_get_order(bus, driver):
"""Check that get_order sends the appropriate command sequence.""" """Check that get_order sends the appropriate command sequence."""
assert driver.get_order() == 13 assert driver.get_order() == 13
assert_log(bus, assert_log(bus, [["read", 0x20, Reg.ORDER]])
[["read", 0x20, Reg.ORDER]])
def test_set_order(bus, driver): def test_set_order(bus, driver):
"""Check that set_order sends the appropriate command sequence.""" """Check that set_order sends the appropriate command sequence."""
driver.set_order(14) driver.set_order(14)
assert_log(bus, assert_log(
[["write", 0x20, Reg.DATA0, 14], bus, [["write", 0x20, Reg.DATA0, 14], ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]]
["write", 0x20, Reg.CMD, Cmd.SET_ORDER]]) )
def test_type(bus, driver): def test_type(bus, driver):
assert isinstance(driver.type, BoardType) assert isinstance(driver.type, BoardType)
assert_log(bus, assert_log(bus, [["read", 0x20, Reg.TYPE]])
[["read", 0x20, Reg.TYPE]])
def test_version(bus, driver): def test_version(bus, driver):
assert driver.fw_version assert driver.fw_version
# Invoke "read version" # Invoke "read version"
assert_log(bus, assert_log(
[["write", 0x20, Reg.DATA0, Data.VERSION], bus,
["write", 0x20, Reg.CMD, Cmd.GET_DATA]]) [
["write", 0x20, Reg.DATA0, Data.VERSION],
["write", 0x20, Reg.CMD, Cmd.GET_DATA],
],
)
# Read the two relevant registers # Read the two relevant registers
assert_log(bus, assert_log(bus, [["read", 0x20, Reg.DATA1], ["read", 0x20, Reg.DATA0]])
[["read", 0x20, Reg.DATA1],
["read", 0x20, Reg.DATA0]])
def test_reset(bus, driver): def test_reset(bus, driver):
driver.reset_all() driver.reset_all()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.RESET]])
[["write", 0x20, Reg.CMD, Cmd.RESET]])
def test_eeprom_save_all(bus, driver): def test_eeprom_save_all(bus, driver):
driver.eeprom_save_all() driver.eeprom_save_all()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE]])
[["write", 0x20, Reg.CMD, Cmd.SAVE]])
def test_eeprom_reset(bus, driver): def test_eeprom_reset(bus, driver):
driver.eeprom_reset() driver.eeprom_reset()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_DEFAULTS]])
[["write", 0x20, Reg.CMD, Cmd.SAVE_DEFAULTS]])
def test_eeprom_save_powerstate(bus, driver): def test_eeprom_save_powerstate(bus, driver):
driver.eeprom_save_powerstate() driver.eeprom_save_powerstate()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_POS]])
[["write", 0x20, Reg.CMD, Cmd.SAVE_POS]])
def test_eeprom_save_leds(bus, driver): def test_eeprom_save_leds(bus, driver):
driver.eeprom_save_leds() driver.eeprom_save_leds()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_LEDS]])
[["write", 0x20, Reg.CMD, Cmd.SAVE_LEDS]])
def test_eeprom_save_order(bus, driver): def test_eeprom_save_order(bus, driver):
driver.eeprom_save_order() driver.eeprom_save_order()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_ORDER]])
[["write", 0x20, Reg.CMD, Cmd.SAVE_ORDER]])
def test_eeprom_save_ussbboot(bus, driver): def test_eeprom_save_ussbboot(bus, driver):
driver.eeprom_save_usbboot() driver.eeprom_save_usbboot()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_USBBOOT]])
[["write", 0x20, Reg.CMD, Cmd.SAVE_USBBOOT]])
def test_hub_on(bus, driver): def test_hub_on(bus, driver):
driver.hub_on() driver.hub_on()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.HUB_ON]])
[["write", 0x20, Reg.CMD, Cmd.HUB_ON]])
def test_hub_off(bus, driver): def test_hub_off(bus, driver):
driver.hub_off() driver.hub_off()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.HUB_OFF]])
[["write", 0x20, Reg.CMD, Cmd.HUB_OFF]])
def test_hub_reset(bus, driver): def test_hub_reset(bus, driver):
driver.hub_reset() driver.hub_reset()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.HUB_CYCLE]])
[["write", 0x20, Reg.CMD, Cmd.HUB_CYCLE]])
def test_power_status(bus, driver): def test_power_status(bus, driver):
bus._retvals.append(1) # Set a mocked return code. bus._retvals.append(1) # Set a mocked return code.
assert driver.power_status(1) == 1 assert driver.power_status(1) == 1
assert_log(bus, assert_log(
[["write", 0x20, Reg.DATA0, 1], bus, [["write", 0x20, Reg.DATA0, 1], ["write", 0x20, Reg.CMD, Cmd.GET_PSTATUS]]
["write", 0x20, Reg.CMD, Cmd.GET_PSTATUS]]) )
def test_pis(bus, driver): def test_pis(bus, driver):
@ -205,61 +198,60 @@ def test_power_all_on(bus, driver):
driver.power_all_on() driver.power_all_on()
for pi in driver.pis(): for pi in driver.pis():
assert_log(bus, assert_log(
[["write", 0x20, Reg.DATA0, pi.pi_id], bus,
["write", 0x20, Reg.CMD, Cmd.ON]]) [["write", 0x20, Reg.DATA0, pi.pi_id], ["write", 0x20, Reg.CMD, Cmd.ON]],
)
def test_power_all_off(bus, driver): def test_power_all_off(bus, driver):
driver.power_all_off() driver.power_all_off()
for pi in driver.pis(): for pi in driver.pis():
assert_log(bus, assert_log(
[["write", 0x20, Reg.DATA0, pi.pi_id], bus,
["write", 0x20, Reg.CMD, Cmd.OFF]]) [["write", 0x20, Reg.DATA0, pi.pi_id], ["write", 0x20, Reg.CMD, Cmd.OFF]],
)
def test_alert_on(bus, driver): def test_alert_on(bus, driver):
driver.alert_on() driver.alert_on()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.ALERT_ON]])
[["write", 0x20, Reg.CMD, Cmd.ALERT_ON]])
def test_alert_off(bus, driver): def test_alert_off(bus, driver):
driver.alert_off() driver.alert_off()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.ALERT_OFF]])
[["write", 0x20, Reg.CMD, Cmd.ALERT_OFF]])
def test_fan_on(bus, driver): def test_fan_on(bus, driver):
driver.fan_on() driver.fan_on()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.DATA0, 1], ["write", 0x20, Reg.CMD, Cmd.FAN]])
[["write", 0x20, Reg.DATA0, 1],
["write", 0x20, Reg.CMD, Cmd.FAN]])
def test_fan_off(bus, driver): def test_fan_off(bus, driver):
driver.fan_off() driver.fan_off()
assert_log(bus, assert_log(bus, [["write", 0x20, Reg.DATA0, 0], ["write", 0x20, Reg.CMD, Cmd.FAN]])
[["write", 0x20, Reg.DATA0, 0],
["write", 0x20, Reg.CMD, Cmd.FAN]])
def test_fan_status(bus, driver): def test_fan_status(bus, driver):
driver.fan_status() driver.fan_status()
assert_log(bus, assert_log(
[["write", 0x20, Reg.DATA0, Data.FANSTATUS], bus,
["write", 0x20, Reg.CMD, Cmd.GET_DATA]]) [
["write", 0x20, Reg.DATA0, Data.FANSTATUS],
["write", 0x20, Reg.CMD, Cmd.GET_DATA],
],
)
def test_get_order(bus, driver): def test_get_order(bus, driver):
driver.get_order() driver.get_order()
assert_log(bus, assert_log(bus, [["read", 0x20, Reg.ORDER]])
[["read", 0x20, Reg.ORDER]])
def test_set_order(bus, driver): def test_set_order(bus, driver):
driver.set_order(253) driver.set_order(253)
assert_log(bus, assert_log(
[["write", 0x20, Reg.DATA0, 253], bus, [["write", 0x20, Reg.DATA0, 253], ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]]
["write", 0x20, Reg.CMD, Cmd.SET_ORDER]]) )