266 lines
6.7 KiB
Python
266 lines
6.7 KiB
Python
|
"""Tests covering the v2 ClusterCTRL driver."""
|
||
|
|
||
|
from clusterctrl.driver import (
|
||
|
BoardType,
|
||
|
ClusterCTRLv2Driver,
|
||
|
Cmd,
|
||
|
Data,
|
||
|
Reg,
|
||
|
)
|
||
|
import pytest
|
||
|
|
||
|
|
||
|
class MockSMBus(object):
|
||
|
"""An object that looks like smbus[2].SMBus.
|
||
|
|
||
|
This object builds an append/mock log of all i2c calls performed, allowing a
|
||
|
user to assert that a specific sequence of reads and writes was issued.
|
||
|
|
||
|
"""
|
||
|
|
||
|
def __init__(self, regs={}):
|
||
|
self._log = []
|
||
|
self._regs = regs
|
||
|
self._retvals = []
|
||
|
|
||
|
def read_byte_data(self, bus_addr, memory_addr) -> int:
|
||
|
self._log.append(["read", bus_addr, memory_addr])
|
||
|
return self._regs[memory_addr]
|
||
|
|
||
|
def write_byte_data(self, bus_addr, memory_addr, value) -> None:
|
||
|
self._regs[memory_addr] = value
|
||
|
self._log.append(["write", bus_addr, memory_addr, value])
|
||
|
|
||
|
# Minimum viable call/return mocking
|
||
|
if memory_addr == Reg.CMD.value:
|
||
|
# Clear data registers after a "call"
|
||
|
for r in range(Reg.DATA7.value, Reg.CMD.value):
|
||
|
self._regs[r] = 0
|
||
|
|
||
|
if self._retvals:
|
||
|
self._regs[Reg.DATA0.value] = self._retvals.pop()
|
||
|
|
||
|
def read_block_data(self, bus_addr, memory_addr, block_size) -> int:
|
||
|
raise NotImplemented
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def bus():
|
||
|
"""An SMBus-alike object."""
|
||
|
|
||
|
regs = {x: 0 for x in range(0, 16)}
|
||
|
regs[Reg.ORDER.value] = 13 # (un)lucky 13
|
||
|
regs[Reg.VERSION.value] = 2
|
||
|
regs[Reg.MAXPI.value] = 5
|
||
|
return MockSMBus(regs)
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def driver(bus):
|
||
|
"""The driver under test atop a mocked bus."""
|
||
|
|
||
|
driver = ClusterCTRLv2Driver(bus)
|
||
|
bus._log.clear() # The driver has startup behavior, so flush the log
|
||
|
return driver
|
||
|
|
||
|
|
||
|
def sublist(l1, l2):
|
||
|
"""Naive but adequate sublist testing."""
|
||
|
|
||
|
for i in range(len(l1)):
|
||
|
if l1[i] == l2[0]:
|
||
|
for j in range(len(l2)):
|
||
|
try:
|
||
|
if l1[i + j] != l2[j]:
|
||
|
break
|
||
|
except IndexError:
|
||
|
break
|
||
|
else:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
|
||
|
def assert_log(bus, log):
|
||
|
def simplify(e):
|
||
|
if hasattr(e, "value"):
|
||
|
return e.value
|
||
|
else:
|
||
|
return e
|
||
|
|
||
|
log = [[simplify(e) for e in cmd] for cmd in log]
|
||
|
|
||
|
assert sublist(bus._log, log), "\n".join(
|
||
|
["Failed to find expected sublog", "log:"] + [f"- {e}" for e in bus._log] + ["expected:"] + [f"- {e}" for e in log]
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_get_order(bus, driver):
|
||
|
"""Check that get_order sends the appropriate command sequence."""
|
||
|
|
||
|
assert driver.get_order() == 13
|
||
|
assert_log(bus,
|
||
|
[["read", 0x20, Reg.ORDER]])
|
||
|
|
||
|
|
||
|
def test_set_order(bus, driver):
|
||
|
"""Check that set_order sends the appropriate command sequence."""
|
||
|
|
||
|
driver.set_order(14)
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, 14],
|
||
|
["write", 0x20, Reg.CMD, Cmd.SET_ORDER]])
|
||
|
|
||
|
|
||
|
def test_type(bus, driver):
|
||
|
assert isinstance(driver.type, BoardType)
|
||
|
assert_log(bus,
|
||
|
[["read", 0x20, Reg.TYPE]])
|
||
|
|
||
|
|
||
|
def test_version(bus, driver):
|
||
|
assert driver.fw_version
|
||
|
# Invoke "read version"
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, Data.VERSION],
|
||
|
["write", 0x20, Reg.CMD, Cmd.GET_DATA]])
|
||
|
# Read the two relevant registers
|
||
|
assert_log(bus,
|
||
|
[["read", 0x20, Reg.DATA1],
|
||
|
["read", 0x20, Reg.DATA0]])
|
||
|
|
||
|
|
||
|
def test_reset(bus, driver):
|
||
|
driver.reset_all()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.RESET]])
|
||
|
|
||
|
|
||
|
def test_eeprom_save_all(bus, driver):
|
||
|
driver.eeprom_save_all()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.SAVE]])
|
||
|
|
||
|
|
||
|
def test_eeprom_reset(bus, driver):
|
||
|
driver.eeprom_reset()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.SAVE_DEFAULTS]])
|
||
|
|
||
|
|
||
|
def test_eeprom_save_powerstate(bus, driver):
|
||
|
driver.eeprom_save_powerstate()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.SAVE_POS]])
|
||
|
|
||
|
|
||
|
def test_eeprom_save_leds(bus, driver):
|
||
|
driver.eeprom_save_leds()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.SAVE_LEDS]])
|
||
|
|
||
|
|
||
|
def test_eeprom_save_order(bus, driver):
|
||
|
driver.eeprom_save_order()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.SAVE_ORDER]])
|
||
|
|
||
|
|
||
|
def test_eeprom_save_ussbboot(bus, driver):
|
||
|
driver.eeprom_save_usbboot()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.SAVE_USBBOOT]])
|
||
|
|
||
|
|
||
|
def test_hub_on(bus, driver):
|
||
|
driver.hub_on()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.HUB_ON]])
|
||
|
|
||
|
|
||
|
def test_hub_off(bus, driver):
|
||
|
driver.hub_off()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.HUB_OFF]])
|
||
|
|
||
|
|
||
|
def test_hub_reset(bus, driver):
|
||
|
driver.hub_reset()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.HUB_CYCLE]])
|
||
|
|
||
|
|
||
|
def test_power_status(bus, driver):
|
||
|
bus._retvals.append(1) # Set a mocked return code.
|
||
|
assert driver.power_status(1) == 1
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, 1],
|
||
|
["write", 0x20, Reg.CMD, Cmd.GET_PSTATUS]])
|
||
|
|
||
|
|
||
|
def test_pis(bus, driver):
|
||
|
assert len(list(driver.pis())) == 5
|
||
|
|
||
|
|
||
|
def test_power_all_on(bus, driver):
|
||
|
driver.power_all_on()
|
||
|
|
||
|
for pi in driver.pis():
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, pi.pi_id],
|
||
|
["write", 0x20, Reg.CMD, Cmd.ON]])
|
||
|
|
||
|
|
||
|
def test_power_all_off(bus, driver):
|
||
|
driver.power_all_off()
|
||
|
|
||
|
for pi in driver.pis():
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, pi.pi_id],
|
||
|
["write", 0x20, Reg.CMD, Cmd.OFF]])
|
||
|
|
||
|
|
||
|
def test_alert_on(bus, driver):
|
||
|
driver.alert_on()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.ALERT_ON]])
|
||
|
|
||
|
|
||
|
def test_alert_off(bus, driver):
|
||
|
driver.alert_off()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.CMD, Cmd.ALERT_OFF]])
|
||
|
|
||
|
|
||
|
def test_fan_on(bus, driver):
|
||
|
driver.fan_on()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, 1],
|
||
|
["write", 0x20, Reg.CMD, Cmd.FAN]])
|
||
|
|
||
|
|
||
|
def test_fan_off(bus, driver):
|
||
|
driver.fan_off()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, 0],
|
||
|
["write", 0x20, Reg.CMD, Cmd.FAN]])
|
||
|
|
||
|
|
||
|
def test_fan_status(bus, driver):
|
||
|
driver.fan_status()
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, Data.FANSTATUS],
|
||
|
["write", 0x20, Reg.CMD, Cmd.GET_DATA]])
|
||
|
|
||
|
|
||
|
def test_get_order(bus, driver):
|
||
|
driver.get_order()
|
||
|
assert_log(bus,
|
||
|
[["read", 0x20, Reg.ORDER]])
|
||
|
|
||
|
|
||
|
def test_set_order(bus, driver):
|
||
|
driver.set_order(253)
|
||
|
assert_log(bus,
|
||
|
[["write", 0x20, Reg.DATA0, 253],
|
||
|
["write", 0x20, Reg.CMD, Cmd.SET_ORDER]])
|