From b6b1f23188278d3c22f3a6306dab279e4963f6df Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Tue, 9 Nov 2021 12:31:31 -0700 Subject: [PATCH] Fmt. --- .../src/python/clusterctrl/__main__.py | 1 + .../src/python/clusterctrl/cmd/power.py | 1 + .../src/python/clusterctrl/driver.py | 52 +++++--- .../clusterctrl/test/python/test_driver.py | 114 ++++++++---------- 4 files changed, 90 insertions(+), 78 deletions(-) diff --git a/projects/clusterctrl/src/python/clusterctrl/__main__.py b/projects/clusterctrl/src/python/clusterctrl/__main__.py index 81a88e5..1bedb15 100644 --- a/projects/clusterctrl/src/python/clusterctrl/__main__.py +++ b/projects/clusterctrl/src/python/clusterctrl/__main__.py @@ -31,6 +31,7 @@ import click # # selects which Cluster CTRL devices matches that number + @click.group() def cli(): pass diff --git a/projects/clusterctrl/src/python/clusterctrl/cmd/power.py b/projects/clusterctrl/src/python/clusterctrl/cmd/power.py index 99c7feb..b6e2038 100644 --- a/projects/clusterctrl/src/python/clusterctrl/cmd/power.py +++ b/projects/clusterctrl/src/python/clusterctrl/cmd/power.py @@ -9,6 +9,7 @@ import click def power(): pass + @power.command("on") @click.argument("devices", nargs="*") def power_on(devices): diff --git a/projects/clusterctrl/src/python/clusterctrl/driver.py b/projects/clusterctrl/src/python/clusterctrl/driver.py index 8ecaf5c..649ee9e 100644 --- a/projects/clusterctrl/src/python/clusterctrl/driver.py +++ b/projects/clusterctrl/src/python/clusterctrl/driver.py @@ -42,6 +42,7 @@ def once(f): """ unset = val = object() + def _helper(*args, **kwargs): nonlocal val if val is unset: @@ -56,13 +57,13 @@ I2C_ADDRESS = 0x20 class BoardType(Enum): - DA = 0x00 # Unknown, presunably a prototype + DA = 0x00 # Unknown, presunably a prototype SINGLE = 0x01 # Do the 'single' and 'triple' really use the same board ID? TRIPLE = 0x01 # ??? - PHAT = 0x02 # The ClusterHAT boards - CTRL = 0x02 # The ClusterCTRL boards - A6 = 0x03 # https://clusterctrl.com/p/aplus6 - STACK = 0x04 # https://shop.pimoroni.com/products/clusterctrl-stack + PHAT = 0x02 # The ClusterHAT boards + CTRL = 0x02 # The ClusterCTRL boards + A6 = 0x03 # https://clusterctrl.com/p/aplus6 + STACK = 0x04 # https://shop.pimoroni.com/products/clusterctrl-stack class Status(Enum): @@ -105,8 +106,8 @@ class Cmd(Enum): ALERT_ON = 0x05 # Turn on Alert LED ALERT_OFF = 0x06 # Turn off Alert LED HUB_CYCLE = 0x07 # Reset USB HUB (turn off for data0*10ms, then back on) - HUB_ON = 0x08 # Turn on the USB hub - HUB_OFF = 0x09 # Turn off the USB hub + HUB_ON = 0x08 # Turn on the USB hub + HUB_OFF = 0x09 # Turn off the USB hub LED_EN = 0x0A # Enable Px LED (data0=x) (PHAT only) LED_DIS = 0x0B # Disable Px LED (data0=x) (PHAT only) PWR_ON = 0x0C # Turn off PWR LED @@ -146,6 +147,7 @@ class PiRef(NamedTuple): These IDs are expected to be unique at the host level; not at the cluster level. """ + controller_id: int pi_id: int @@ -154,11 +156,13 @@ class PiRef(NamedTuple): class ClusterCTRLv2Driver(object): - def __init__(self, - bus: smbus.SMBus, - address: int = I2C_ADDRESS, - delay: int = 0, - clear: bool = False): + def __init__( + self, + bus: smbus.SMBus, + address: int = I2C_ADDRESS, + delay: int = 0, + clear: bool = False, + ): """Initialize a ClusterCTRL/ClusterHAT driver instance for a given bus device.""" self._bus = bus self._address = address @@ -169,7 +173,9 @@ class ClusterCTRLv2Driver(object): if (version := self._read(Reg.VERSION)) != 2: raise IOError(f"Unsupported register format {version}; expected 2") except: - raise ValueError("Cannot communicate with a ClusterCTRL/ClusterHAT on the given bus") + raise ValueError( + "Cannot communicate with a ClusterCTRL/ClusterHAT on the given bus" + ) self._post_init() @@ -212,7 +218,7 @@ class ClusterCTRLv2Driver(object): return self._bus.write_byte_data(self._address, id.value, val) - def _call(self, op: Cmd, *args, clear = False): + def _call(self, op: Cmd, *args, clear=False): """A convenient abstraction over the 'calling' convention for ops. Operations are "called" when Reg.CMD is written to. @@ -228,7 +234,19 @@ class ClusterCTRLv2Driver(object): if self._clear or clear: args = chain(args, repeat(0)) - args = zip([Reg.DATA0, Reg.DATA1, Reg.DATA2, Reg.DATA3, Reg.DATA4, Reg.DATA5, Reg.DATA6, Reg.DATA7], args) + args = zip( + [ + Reg.DATA0, + Reg.DATA1, + Reg.DATA2, + Reg.DATA3, + Reg.DATA4, + Reg.DATA5, + Reg.DATA6, + Reg.DATA7, + ], + args, + ) for r, v in args: self._write(r, v) @@ -259,7 +277,7 @@ class ClusterCTRLv2Driver(object): maxpi = self._read(Reg.MAXPI) if 0 <= id <= maxpi: - return id + return id else: raise ValueError("Expected an id in [0,{maxpi:d}], got {id:d}") @@ -421,7 +439,7 @@ class ClusterCTRLv2Driver(object): # 'order' (board ID) management #################################################################################################### def get_order(self): - """Get the 'order' value of this device. Can be updated via """ + """Get the 'order' value of this device. Can be updated via""" return self._read(Reg.ORDER) diff --git a/projects/clusterctrl/test/python/test_driver.py b/projects/clusterctrl/test/python/test_driver.py index dbbf363..a230cf9 100644 --- a/projects/clusterctrl/test/python/test_driver.py +++ b/projects/clusterctrl/test/python/test_driver.py @@ -90,7 +90,10 @@ def assert_log(bus, log): log = [[simplify(e) for e in cmd] for cmd in log] assert sublist(bus._log, log), "\n".join( - ["Failed to find expected sublog", "log:"] + [f"- {e}" for e in bus._log] + ["expected:"] + [f"- {e}" for e in log] + ["Failed to find expected sublog", "log:"] + + [f"- {e}" for e in bus._log] + + ["expected:"] + + [f"- {e}" for e in log] ) @@ -98,103 +101,93 @@ def test_get_order(bus, driver): """Check that get_order sends the appropriate command sequence.""" assert driver.get_order() == 13 - assert_log(bus, - [["read", 0x20, Reg.ORDER]]) + assert_log(bus, [["read", 0x20, Reg.ORDER]]) def test_set_order(bus, driver): """Check that set_order sends the appropriate command sequence.""" driver.set_order(14) - assert_log(bus, - [["write", 0x20, Reg.DATA0, 14], - ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]]) + assert_log( + bus, [["write", 0x20, Reg.DATA0, 14], ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]] + ) def test_type(bus, driver): assert isinstance(driver.type, BoardType) - assert_log(bus, - [["read", 0x20, Reg.TYPE]]) + assert_log(bus, [["read", 0x20, Reg.TYPE]]) def test_version(bus, driver): assert driver.fw_version # Invoke "read version" - assert_log(bus, - [["write", 0x20, Reg.DATA0, Data.VERSION], - ["write", 0x20, Reg.CMD, Cmd.GET_DATA]]) + assert_log( + bus, + [ + ["write", 0x20, Reg.DATA0, Data.VERSION], + ["write", 0x20, Reg.CMD, Cmd.GET_DATA], + ], + ) # Read the two relevant registers - assert_log(bus, - [["read", 0x20, Reg.DATA1], - ["read", 0x20, Reg.DATA0]]) + assert_log(bus, [["read", 0x20, Reg.DATA1], ["read", 0x20, Reg.DATA0]]) def test_reset(bus, driver): driver.reset_all() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.RESET]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.RESET]]) def test_eeprom_save_all(bus, driver): driver.eeprom_save_all() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.SAVE]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE]]) def test_eeprom_reset(bus, driver): driver.eeprom_reset() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.SAVE_DEFAULTS]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_DEFAULTS]]) def test_eeprom_save_powerstate(bus, driver): driver.eeprom_save_powerstate() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.SAVE_POS]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_POS]]) def test_eeprom_save_leds(bus, driver): driver.eeprom_save_leds() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.SAVE_LEDS]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_LEDS]]) def test_eeprom_save_order(bus, driver): driver.eeprom_save_order() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.SAVE_ORDER]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_ORDER]]) def test_eeprom_save_ussbboot(bus, driver): driver.eeprom_save_usbboot() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.SAVE_USBBOOT]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.SAVE_USBBOOT]]) def test_hub_on(bus, driver): driver.hub_on() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.HUB_ON]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.HUB_ON]]) def test_hub_off(bus, driver): driver.hub_off() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.HUB_OFF]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.HUB_OFF]]) def test_hub_reset(bus, driver): driver.hub_reset() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.HUB_CYCLE]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.HUB_CYCLE]]) def test_power_status(bus, driver): bus._retvals.append(1) # Set a mocked return code. assert driver.power_status(1) == 1 - assert_log(bus, - [["write", 0x20, Reg.DATA0, 1], - ["write", 0x20, Reg.CMD, Cmd.GET_PSTATUS]]) + assert_log( + bus, [["write", 0x20, Reg.DATA0, 1], ["write", 0x20, Reg.CMD, Cmd.GET_PSTATUS]] + ) def test_pis(bus, driver): @@ -205,61 +198,60 @@ def test_power_all_on(bus, driver): driver.power_all_on() for pi in driver.pis(): - assert_log(bus, - [["write", 0x20, Reg.DATA0, pi.pi_id], - ["write", 0x20, Reg.CMD, Cmd.ON]]) + assert_log( + bus, + [["write", 0x20, Reg.DATA0, pi.pi_id], ["write", 0x20, Reg.CMD, Cmd.ON]], + ) def test_power_all_off(bus, driver): driver.power_all_off() for pi in driver.pis(): - assert_log(bus, - [["write", 0x20, Reg.DATA0, pi.pi_id], - ["write", 0x20, Reg.CMD, Cmd.OFF]]) + assert_log( + bus, + [["write", 0x20, Reg.DATA0, pi.pi_id], ["write", 0x20, Reg.CMD, Cmd.OFF]], + ) def test_alert_on(bus, driver): driver.alert_on() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.ALERT_ON]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.ALERT_ON]]) def test_alert_off(bus, driver): driver.alert_off() - assert_log(bus, - [["write", 0x20, Reg.CMD, Cmd.ALERT_OFF]]) + assert_log(bus, [["write", 0x20, Reg.CMD, Cmd.ALERT_OFF]]) def test_fan_on(bus, driver): driver.fan_on() - assert_log(bus, - [["write", 0x20, Reg.DATA0, 1], - ["write", 0x20, Reg.CMD, Cmd.FAN]]) + assert_log(bus, [["write", 0x20, Reg.DATA0, 1], ["write", 0x20, Reg.CMD, Cmd.FAN]]) def test_fan_off(bus, driver): driver.fan_off() - assert_log(bus, - [["write", 0x20, Reg.DATA0, 0], - ["write", 0x20, Reg.CMD, Cmd.FAN]]) + assert_log(bus, [["write", 0x20, Reg.DATA0, 0], ["write", 0x20, Reg.CMD, Cmd.FAN]]) def test_fan_status(bus, driver): driver.fan_status() - assert_log(bus, - [["write", 0x20, Reg.DATA0, Data.FANSTATUS], - ["write", 0x20, Reg.CMD, Cmd.GET_DATA]]) + assert_log( + bus, + [ + ["write", 0x20, Reg.DATA0, Data.FANSTATUS], + ["write", 0x20, Reg.CMD, Cmd.GET_DATA], + ], + ) def test_get_order(bus, driver): driver.get_order() - assert_log(bus, - [["read", 0x20, Reg.ORDER]]) + assert_log(bus, [["read", 0x20, Reg.ORDER]]) def test_set_order(bus, driver): driver.set_order(253) - assert_log(bus, - [["write", 0x20, Reg.DATA0, 253], - ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]]) + assert_log( + bus, [["write", 0x20, Reg.DATA0, 253], ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]] + )