From ac5f5e1637997a6cda17868dd72b9dc18b354591 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Fri, 5 Nov 2021 11:47:17 -0600 Subject: [PATCH] Finishing and testing the driver --- .../src/python/clusterctrl/driver.py | 116 +++++--- .../clusterctrl/test/python/test_driver.py | 265 ++++++++++++++++++ 2 files changed, 335 insertions(+), 46 deletions(-) create mode 100644 projects/clusterctrl/test/python/test_driver.py diff --git a/projects/clusterctrl/src/python/clusterctrl/driver.py b/projects/clusterctrl/src/python/clusterctrl/driver.py index 9c88904..8ecaf5c 100644 --- a/projects/clusterctrl/src/python/clusterctrl/driver.py +++ b/projects/clusterctrl/src/python/clusterctrl/driver.py @@ -1,4 +1,4 @@ -"""An I2C driver for the ClusterCTRL/ClusterHAT device(s).""" +"""An I2C driver for the ClusterCTRL/ClusterHAT family of device(s).""" from enum import Enum from itertools import chain, repeat @@ -10,8 +10,14 @@ from typing import NamedTuple, Union log = logging.getLogger(__name__) - +# Variable source import "smbus" +# - smbus can come from i2c-dev (host package, c ext.) +# - smbus2 is a PyPi hosted pure Python implementation +# +# In order to support reasonable packaging and deployment, we want to prefer +# smbus and fall back to smbus2 if it isn't available. smbus = None + if not smbus: try: import smbus @@ -51,11 +57,12 @@ I2C_ADDRESS = 0x20 class BoardType(Enum): DA = 0x00 # Unknown, presunably a prototype + SINGLE = 0x01 # Do the 'single' and 'triple' really use the same board ID? + TRIPLE = 0x01 # ??? + PHAT = 0x02 # The ClusterHAT boards + CTRL = 0x02 # The ClusterCTRL boards A6 = 0x03 # https://clusterctrl.com/p/aplus6 STACK = 0x04 # https://shop.pimoroni.com/products/clusterctrl-stack - SINGLE = 0x01 # Do the 'single' and 'triple' really use the same board ID? - TRIPLE = 0x01 - PHAT = 0x02 class Status(Enum): @@ -113,12 +120,12 @@ class Cmd(Enum): GET_USTATUS = 0x14 # Get Px USBBOOT status (data0=x) SET_ORDER = 0x15 # Set order (data0=order) SAVE = 0xF0 # Save current PWR/P1-LED/P2-LED/P1/P2/Order/Mode to EEPROM - SAVEDEFAULTS = 0xF1 # Save factory defaults + SAVE_DEFAULTS = 0xF1 # Save factory defaults GET_DATA = 0xF2 # Get DATA (Temps/ADC/etc.) SAVE_ORDER = 0xF3 # Save order to EEPROM SAVE_USBBOOT = 0xF4 # Save usbboot status to EEPROM SAVE_POS = 0xF5 # Save Power On State to EEPROM - SAVE_LED = 0xF6 # Save LED to EEPROM + SAVE_LEDS = 0xF6 # Save LED to EEPROM NOP = 0x90 # Do nothing @@ -133,21 +140,25 @@ class Data(Enum): FANSTATUS = 0x04 # Read fan status -class PiID(NamedTuple): - """Represent Pi IDs as something somewhat unique; a CTRL/HAT id and the Pi ID. +class PiRef(NamedTuple): + """An ID for a specific Pi/PiZero controlled by a Cluster{CTRL,HAT}. These IDs are expected to be unique at the host level; not at the cluster level. """ - ctrl_id: int + controller_id: int pi_id: int def __repr__(self) -> str: - return f"" + return f"" class ClusterCTRLv2Driver(object): - def __init__(self, bus: smbus2.SMBus, address: int = I2C_ADDRESS, delay: int = 0, clear = False): + def __init__(self, + bus: smbus.SMBus, + address: int = I2C_ADDRESS, + delay: int = 0, + clear: bool = False): """Initialize a ClusterCTRL/ClusterHAT driver instance for a given bus device.""" self._bus = bus self._address = address @@ -160,8 +171,11 @@ class ClusterCTRLv2Driver(object): except: raise ValueError("Cannot communicate with a ClusterCTRL/ClusterHAT on the given bus") + self._post_init() + + def _post_init(self): # This is a firmware default value indicating an uninitialized board. - # Randomize it if present. + # Randomize it if present and the user hasn't told us not to. if self.get_order() == 20: v = 20 r = SystemRandom() @@ -231,32 +245,42 @@ class ClusterCTRLv2Driver(object): # Return the (mostly) meaningful return code return self._read(Reg.DATA0) - def _id(self, id: PiID) -> int: - """Validate a logical ID and convert it to a numeric one.""" - assert self.min_pi <= id <= self.max_pi - assert self.get_order() == id.ctrl_id - return id.pi_id + def _id(self, id: Union[PiRef, int]) -> int: + """Validate a user-provided ID and convert it to a numeric one.""" + + if isinstance(id, PiRef): + # Yes this is backwards, but its' most convenient for sharing validation + assert self.get_order() == id.controller_id + id = id.pi_id + elif isinstance(id, int): + pass + else: + raise ValueError(f"Expected an int or PiRef, got {type(id)}") + + maxpi = self._read(Reg.MAXPI) + if 0 <= id <= maxpi: + return id + else: + raise ValueError("Expected an id in [0,{maxpi:d}], got {id:d}") - @property def min_pi(self): """Get the minimum supported Pi ID on this controller.""" - return PiID(self.get_order(), 1) + return PiRef(self.get_order(), 1) - @property @once def max_pi(self): """Get the maximum supported Pi ID on this controller.""" - return PiID(self.get_order(), self._read(Reg.MAXPI)) + return PiRef(self.get_order(), self._read(Reg.MAXPI)) - @property - def pi_ids(self): + def pis(self): """Iterate over the IDs of Pis which could be connected to this controller.""" order = self.get_order() - for i in range(1, 6): - yield PiID(order, i) + maxpi = self._read(Reg.MAXPI) + for i in range(1, maxpi + 1): + yield PiRef(order, i) @property def type(self) -> BoardType: @@ -287,7 +311,7 @@ class ClusterCTRLv2Driver(object): def eeprom_reset(self): """Reset EEPROM to factory/firmware default value(s).""" - return self._call(Cmd.SAVEDEFAULTS) + return self._call(Cmd.SAVE_DEFAULTS) def eeprom_save_powerstate(self): """Persist the current power state to EEPROM.""" @@ -297,7 +321,7 @@ class ClusterCTRLv2Driver(object): def eeprom_save_leds(self): """Persist the current LED state to EEPROM.""" - return self._call(Cmd.SAVE_LED) + return self._call(Cmd.SAVE_LEDS) def eeprom_save_order(self): """Persist the current order value to EEPROM.""" @@ -307,7 +331,7 @@ class ClusterCTRLv2Driver(object): def eeprom_save_usbboot(self): """Persist USB booting settings to EEPROM.""" - return self._call(Cmd.SAVEUSBBOOT) + return self._call(Cmd.SAVE_USBBOOT) #################################################################################################### # USB hub management @@ -320,11 +344,11 @@ class ClusterCTRLv2Driver(object): def hub_off(self): """Turn off the USB hub.""" - return self._call(Cmd.HUB_ON) + return self._call(Cmd.HUB_OFF) # FIXME: Is hub_status unsupported in the firmware? - def reset_hub(self, delay: int = 0): + def hub_reset(self, delay: int = 0): """[Power] cycle the Controller hub for `delay` x 10ms.""" return self._call(Cmd.HUB_CYCLE, delay) @@ -332,19 +356,19 @@ class ClusterCTRLv2Driver(object): #################################################################################################### # Power management #################################################################################################### - def power_on(self, id: PiID): + def power_on(self, id: Union[PiRef, int]): """Power on a given slot by ID.""" id = self._id(id) return self._call(Cmd.ON, id) - def power_off(self, id: PiID): + def power_off(self, id: Union[PiRef, int]): """Power off a given slot by ID.""" id = self._id(id) return self._call(Cmd.OFF, id) - def power_status(self, id: PiID): + def power_status(self, id: Union[PiRef, int]): """Read the status of a given slot by ID.""" id = self._id(id) @@ -353,14 +377,13 @@ class ClusterCTRLv2Driver(object): def power_all_on(self): """Power on all slots in this Controller.""" - for id in self.pi_ids: - if not self.power_status(id): - self.power_on(id) + for id in self.pis(): + self.power_on(id) def power_all_off(self): """Power off all slots in this Controller.""" - for id in self.pi_ids: + for id in self.pis(): self.power_off(id) #################################################################################################### @@ -412,19 +435,19 @@ class ClusterCTRLv2Driver(object): #################################################################################################### # USB booting #################################################################################################### - def usbboot_on(self, id: PiID): + def usbboot_on(self, id: Union[PiRef, int]): """Enable USB booting for the given Pi.""" id = self._id(id) return self._call(Cmd.USBBOOT_EN, id) - def usbboot_off(self, id: PiID): + def usbboot_off(self, id: Union[PiRef, int]): """Disable USB booting for the given Pi.""" id = self._id(id) return self._call(Cmd.USBBOOT_DIS, id) - def usbboot_status(self, id: PiID): + def usbboot_status(self, id: Union[PiRef, int]): """Get the current USB booting status for the given Pi.""" id = self._id(id) @@ -442,9 +465,8 @@ class ClusterCTRLv2Driver(object): def adc_ids(self): return range(1, self.max_adc + 1) - def read_adc(self, id: PiID): - id = self._id(id) - self._call(Cmd.GET_DATA, Data.ADC_READ.value, id) + def read_adc(self, adc_id: int): + self._call(Cmd.GET_DATA, Data.ADC_READ.value, adc_id) # Now this is screwy. # DATA0 gets set to 0 or 1, indicating the voldage type # DATA1 and DATA2 are a 16bi number that need to be reassembled. @@ -474,14 +496,16 @@ class ClusterCTRLv2Driver(object): #################################################################################################### # Operations with inconsistent platform support #################################################################################################### - def led_on(self, id: PiID): + def led_on(self, id: Union[PiRef, int]): """Turn on an LED by ID.""" id = self._id(id) return self._call(Cmd.LED_EN, id) - def led_off(self, id: PiID): + def led_off(self, id: Union[PiRef, int]): """Turn off an LED by ID.""" id = self._id(id) return self._call(Cmd.LED_DIS, id) + + # FIXME: LED status? diff --git a/projects/clusterctrl/test/python/test_driver.py b/projects/clusterctrl/test/python/test_driver.py new file mode 100644 index 0000000..dbbf363 --- /dev/null +++ b/projects/clusterctrl/test/python/test_driver.py @@ -0,0 +1,265 @@ +"""Tests covering the v2 ClusterCTRL driver.""" + +from clusterctrl.driver import ( + BoardType, + ClusterCTRLv2Driver, + Cmd, + Data, + Reg, +) +import pytest + + +class MockSMBus(object): + """An object that looks like smbus[2].SMBus. + + This object builds an append/mock log of all i2c calls performed, allowing a + user to assert that a specific sequence of reads and writes was issued. + + """ + + def __init__(self, regs={}): + self._log = [] + self._regs = regs + self._retvals = [] + + def read_byte_data(self, bus_addr, memory_addr) -> int: + self._log.append(["read", bus_addr, memory_addr]) + return self._regs[memory_addr] + + def write_byte_data(self, bus_addr, memory_addr, value) -> None: + self._regs[memory_addr] = value + self._log.append(["write", bus_addr, memory_addr, value]) + + # Minimum viable call/return mocking + if memory_addr == Reg.CMD.value: + # Clear data registers after a "call" + for r in range(Reg.DATA7.value, Reg.CMD.value): + self._regs[r] = 0 + + if self._retvals: + self._regs[Reg.DATA0.value] = self._retvals.pop() + + def read_block_data(self, bus_addr, memory_addr, block_size) -> int: + raise NotImplemented + + +@pytest.fixture +def bus(): + """An SMBus-alike object.""" + + regs = {x: 0 for x in range(0, 16)} + regs[Reg.ORDER.value] = 13 # (un)lucky 13 + regs[Reg.VERSION.value] = 2 + regs[Reg.MAXPI.value] = 5 + return MockSMBus(regs) + + +@pytest.fixture +def driver(bus): + """The driver under test atop a mocked bus.""" + + driver = ClusterCTRLv2Driver(bus) + bus._log.clear() # The driver has startup behavior, so flush the log + return driver + + +def sublist(l1, l2): + """Naive but adequate sublist testing.""" + + for i in range(len(l1)): + if l1[i] == l2[0]: + for j in range(len(l2)): + try: + if l1[i + j] != l2[j]: + break + except IndexError: + break + else: + return True + return False + + +def assert_log(bus, log): + def simplify(e): + if hasattr(e, "value"): + return e.value + else: + return e + + log = [[simplify(e) for e in cmd] for cmd in log] + + assert sublist(bus._log, log), "\n".join( + ["Failed to find expected sublog", "log:"] + [f"- {e}" for e in bus._log] + ["expected:"] + [f"- {e}" for e in log] + ) + + +def test_get_order(bus, driver): + """Check that get_order sends the appropriate command sequence.""" + + assert driver.get_order() == 13 + assert_log(bus, + [["read", 0x20, Reg.ORDER]]) + + +def test_set_order(bus, driver): + """Check that set_order sends the appropriate command sequence.""" + + driver.set_order(14) + assert_log(bus, + [["write", 0x20, Reg.DATA0, 14], + ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]]) + + +def test_type(bus, driver): + assert isinstance(driver.type, BoardType) + assert_log(bus, + [["read", 0x20, Reg.TYPE]]) + + +def test_version(bus, driver): + assert driver.fw_version + # Invoke "read version" + assert_log(bus, + [["write", 0x20, Reg.DATA0, Data.VERSION], + ["write", 0x20, Reg.CMD, Cmd.GET_DATA]]) + # Read the two relevant registers + assert_log(bus, + [["read", 0x20, Reg.DATA1], + ["read", 0x20, Reg.DATA0]]) + + +def test_reset(bus, driver): + driver.reset_all() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.RESET]]) + + +def test_eeprom_save_all(bus, driver): + driver.eeprom_save_all() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.SAVE]]) + + +def test_eeprom_reset(bus, driver): + driver.eeprom_reset() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.SAVE_DEFAULTS]]) + + +def test_eeprom_save_powerstate(bus, driver): + driver.eeprom_save_powerstate() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.SAVE_POS]]) + + +def test_eeprom_save_leds(bus, driver): + driver.eeprom_save_leds() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.SAVE_LEDS]]) + + +def test_eeprom_save_order(bus, driver): + driver.eeprom_save_order() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.SAVE_ORDER]]) + + +def test_eeprom_save_ussbboot(bus, driver): + driver.eeprom_save_usbboot() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.SAVE_USBBOOT]]) + + +def test_hub_on(bus, driver): + driver.hub_on() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.HUB_ON]]) + + +def test_hub_off(bus, driver): + driver.hub_off() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.HUB_OFF]]) + + +def test_hub_reset(bus, driver): + driver.hub_reset() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.HUB_CYCLE]]) + + +def test_power_status(bus, driver): + bus._retvals.append(1) # Set a mocked return code. + assert driver.power_status(1) == 1 + assert_log(bus, + [["write", 0x20, Reg.DATA0, 1], + ["write", 0x20, Reg.CMD, Cmd.GET_PSTATUS]]) + + +def test_pis(bus, driver): + assert len(list(driver.pis())) == 5 + + +def test_power_all_on(bus, driver): + driver.power_all_on() + + for pi in driver.pis(): + assert_log(bus, + [["write", 0x20, Reg.DATA0, pi.pi_id], + ["write", 0x20, Reg.CMD, Cmd.ON]]) + + +def test_power_all_off(bus, driver): + driver.power_all_off() + + for pi in driver.pis(): + assert_log(bus, + [["write", 0x20, Reg.DATA0, pi.pi_id], + ["write", 0x20, Reg.CMD, Cmd.OFF]]) + + +def test_alert_on(bus, driver): + driver.alert_on() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.ALERT_ON]]) + + +def test_alert_off(bus, driver): + driver.alert_off() + assert_log(bus, + [["write", 0x20, Reg.CMD, Cmd.ALERT_OFF]]) + + +def test_fan_on(bus, driver): + driver.fan_on() + assert_log(bus, + [["write", 0x20, Reg.DATA0, 1], + ["write", 0x20, Reg.CMD, Cmd.FAN]]) + + +def test_fan_off(bus, driver): + driver.fan_off() + assert_log(bus, + [["write", 0x20, Reg.DATA0, 0], + ["write", 0x20, Reg.CMD, Cmd.FAN]]) + + +def test_fan_status(bus, driver): + driver.fan_status() + assert_log(bus, + [["write", 0x20, Reg.DATA0, Data.FANSTATUS], + ["write", 0x20, Reg.CMD, Cmd.GET_DATA]]) + + +def test_get_order(bus, driver): + driver.get_order() + assert_log(bus, + [["read", 0x20, Reg.ORDER]]) + + +def test_set_order(bus, driver): + driver.set_order(253) + assert_log(bus, + [["write", 0x20, Reg.DATA0, 253], + ["write", 0x20, Reg.CMD, Cmd.SET_ORDER]])