From a74c7150417d7b2bfb4afaa670b7fdf125a55276 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Tue, 2 Nov 2021 01:12:06 -0600 Subject: [PATCH] Tapping on the revamped clusterctrl --- projects/clusterctrl/BUILD | 5 +- projects/clusterctrl/README.md | 7 +- .../src/python/clusterctrl/__main__.py | 1169 +---------------- .../src/python/clusterctrl/cmd/alert.py | 21 + .../src/python/clusterctrl/cmd/fan.py | 18 + .../src/python/clusterctrl/cmd/hub.py | 23 + .../src/python/clusterctrl/cmd/led.py | 18 + .../src/python/clusterctrl/cmd/power.py | 21 + .../src/python/clusterctrl/cmd/save.py | 39 + .../src/python/clusterctrl/driver.py | 117 +- tools/python/requirements.txt | 1 + 11 files changed, 274 insertions(+), 1165 deletions(-) create mode 100644 projects/clusterctrl/src/python/clusterctrl/cmd/alert.py create mode 100644 projects/clusterctrl/src/python/clusterctrl/cmd/fan.py create mode 100644 projects/clusterctrl/src/python/clusterctrl/cmd/hub.py create mode 100644 projects/clusterctrl/src/python/clusterctrl/cmd/led.py create mode 100644 projects/clusterctrl/src/python/clusterctrl/cmd/power.py create mode 100644 projects/clusterctrl/src/python/clusterctrl/cmd/save.py diff --git a/projects/clusterctrl/BUILD b/projects/clusterctrl/BUILD index 57e75d2..0f44e22 100644 --- a/projects/clusterctrl/BUILD +++ b/projects/clusterctrl/BUILD @@ -1,5 +1,8 @@ py_project( - name = "lib" + name = "lib", + lib_deps = [ + py_requirement("smbus2"), + ], ) zapp_binary( diff --git a/projects/clusterctrl/README.md b/projects/clusterctrl/README.md index 2c5b8e0..7710fa7 100644 --- a/projects/clusterctrl/README.md +++ b/projects/clusterctrl/README.md @@ -2,11 +2,12 @@ This project is a rewrite of the `clusterctrl` tool provided by the 8086 consultancy for interacting with their ClusterCTRL and ClusterHAT line of Raspberry Pi backplane products. -It serves to factor out the underlying i²c driver common to the ClusterCTRL family of products, and aims to implement a far more idiomatic and better documented CLI tool over top of that driver. +It serves to factor out the underlying i²c driver common to the ClusterCTRL family of products, and aims to implement a far more idiomatic and better documented and far more consistent CLI tool. ## license -This software is published under the MIT License - Copyright (c) 2018 Chris Burton (8086 consultancy) Copyright (c) 2021 Reid McKenzie + +This software is published under the MIT License + diff --git a/projects/clusterctrl/src/python/clusterctrl/__main__.py b/projects/clusterctrl/src/python/clusterctrl/__main__.py index 3590330..81a88e5 100644 --- a/projects/clusterctrl/src/python/clusterctrl/__main__.py +++ b/projects/clusterctrl/src/python/clusterctrl/__main__.py @@ -1,38 +1,29 @@ -#!/usr/bin/env python -# -# Cluster Control -# -# (c) 8086 Consultancy 2018-2020 -# -import glob, sys, time, os +""" +Cluster Control -from .xra1200 import Xra1200 +(c) 8086 Consultancy 2018-2020 +(c) Reid D. 'arrdem' McKenzie 2021 +""" + + +from .cmd.fan import fan +from .cmd.hub import hub +from .cmd.led import led +from .cmd.power import power +from .cmd.save import save + +import click -import smbus # Usage # clusterctl [] # Commands (cmd) -# on [] # Turn on All Pi Zero or devices -# off [] # Turn off All Pi Zero or devices -# status # shows status -# maxpi # returns max number of Pi Zeros we control -# init # Init ClusterHAT -# alert on [] # Turns on all ALERT LED or for pX devices -# alert off [] # Turns off all ALERT LED or for pX devices -# led on # Enable all LED -# led off # Disable all LED -# hub off|on|reset # USB hub can be turned on/off on Cluster HAT and reset on CTRL -# -# save # Save current settings to EEPROM -# saveorder # Save current "order" setting to EEPROM -# saveusbboot # Save current USBBOOT settings to EEPROM -# savepos # Save current Power On State to EEPROM -# savedefaults # Save default settings to EEPROM -# fan on|off # Turns FAN on/off for CTRL with -# setorder # Set order on device to -# getpath # Get USB path to Px -# +# status # shows status +# maxpi # returns max number of Pi Zeros we control +# init # Init ClusterHAT +# setorder # Set order on device to +# getpath # Get USB path to Px + # # Where is either a single Pi Zero "p1" or a list like "p1 p4 p7" # from p1 to p (without the quotes), so to turn on P1, P5 and P9 you would use @@ -40,1116 +31,32 @@ import smbus # # selects which Cluster CTRL devices matches that number -# Config / constants - -# I2C address of ClusterCTRL device -I2C_ADDRESS = 0x20 - -# Number of Pi Zero in ClusterHAT (set below) -clusterhat_size = 0 - -# ClusterCTRL Registers -REG_VERSION = 0x00 # Register layout version -REG_MAXPI = 0x01 # Maximum number of Pi -REG_ORDER = 0x02 # Order - used to sort multiple ClusterCTRL devices -REG_MODE = 0x03 # N/A -REG_TYPE = 0x04 # 0=DA, 1=pHAT -REG_DATA7 = 0x05 # -REG_DATA6 = 0x06 # -REG_DATA5 = 0x07 # -REG_DATA4 = 0x08 # -REG_DATA3 = 0x09 # -REG_DATA2 = 0x0A # -REG_DATA1 = 0x0B # -REG_DATA0 = 0x0C # -REG_CMD = 0x0D # Command -REG_STATUS = 0x0E # Status - -# ClusterCTRL Commands -CMD_ON = 0x03 # Turn on Px (data0=x) -CMD_OFF = 0x04 # Turn off Px (data0=x) -CMD_ALERT_ON = 0x05 # Turn on Alert LED -CMD_ALERT_OFF = 0x06 # Turn off Alert LED -CMD_HUB_CYCLE = 0x07 # Reset USB HUB (turn off for data0*10ms, then back on) -CMD_LED_EN = 0x0A # Enable Px LED (data0=x) -CMD_LED_DIS = 0x0B # Disable Px LED (data0=x) -CMD_PWR_ON = 0x0C # Turn off PWR LED -CMD_PWR_OFF = 0x0D # Turn off PWR LED -CMD_RESET = 0x0E # Resets ClusterCTRL (does not keep power state) -CMD_GET_PSTATUS = 0x0F # Get Px power status (data0=x) -CMD_FAN = 0x10 # Turn fan on (data0=1) or off (data0=0) -CMD_GETPATH = 0x11 # Get USB path to Px (data0=x 0=controller) returned in data7-data0 -CMD_USBBOOT_EN = 0x12 # Turn on USBBOOT -CMD_USBBOOT_DIS = 0x13 # Turn off USBBOOT -CMD_GET_USTATUS = 0x14 # Get Px USBBOOT status (data0=x) -CMD_SET_ORDER = 0x15 # Set order (data0=order) -CMD_SAVE = 0xF0 # Save current PWR/P1-LED/P2-LED/P1/P2/Order/Mode to EEPROM -CMD_SAVEDEFAULTS = 0xF1 # Save factory defaults -CMD_GET_DATA = 0xF2 # Get DATA (Temps/ADC/etc.) -CMD_SAVE_ORDER = 0xF3 # Save order to EEPROM -CMD_SAVE_USBBOOT = 0xF4 # Save usbboot status to EEPROM -CMD_SAVE_POS = 0xF5 # Save Power On State to EEPROM -CMD_SAVE_LED = 0xF6 # Save LED to EEPROM -CMD_NOP = 0x90 # Do nothing - -# Get arbitrary data from ClusterCTRL -GET_DATA_VERSION = 0x00 # Get firmware version -GET_DATA_ADC_CNT = 0x01 # Returns number of ADC ClusterCTRL supports -GET_DATA_ADC_READ = 0x02 # Read ADC data for ADC number 'data0' -GET_DATA_ADC_TEMP = 0x03 # Read Temperature ADC -GET_DATA_FANSTATUS = 0x04 # Read fan status - -# Files/paths -clusterctrl_prefix = "/dev/ClusterCTRL-" -vcgencmdpath = "/usr/bin/vcgencmd" -hat_product = "/proc/device-tree/hat/product" -hat_version = "/proc/device-tree/hat/product_ver" -hat_uuid = "/proc/device-tree/hat/uuid" -hat_vendor = "/proc/device-tree/hat/vendor" -hat_pid = "/proc/device-tree/hat/product_id" -nfsboot = "/var/lib/clusterctrl/boot/" -nfsroot = "/var/lib/clusterctrl/nfs/" +@click.group() +def cli(): + pass -def send_cmd( - c, - cmd, - data0=None, - data1=None, - data2=None, - data3=None, - data4=None, - data5=None, - data6=None, - data7=None, -): - """Send command to ClusterCTRL via I2C.""" - # print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3, data4, data5, data6, data7)) - if data7 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA7, data7) - if data6 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA6, data6) - if data5 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA5, data5) - if data4 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA4, data4) - if data3 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA3, data3) - if data2 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA2, data2) - if data1 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA1, data1) - if data0 is not None: - c[1].write_byte_data(I2C_ADDRESS, REG_DATA0, data0) - try: - c[1].write_byte_data(I2C_ADDRESS, REG_CMD, cmd) - except IOError: - return False +@cli.command() +def status(): + """Show status information for all available devices.""" -def read_reg(c, offset, len=1): - """Read register from ClusterCTRL via I2C.""" - - if len > 1: - tmp = c[1].read_i2c_block_data(I2C_ADDRESS, offset, len) - else: - tmp = c[1].read_byte_data(I2C_ADDRESS, offset) - return tmp +@cli.command() +def maxpi(): + """Show the number of available/attached Pis.""" -def get_throttled(): - """Get throttled status.""" - if not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath, os.X_OK): - return "NA" - return ( - (os.popen(vcgencmdpath + " get_throttled").readline()).split("=", 1)[-1].strip() - ) +@cli.command() +def init(): + """Init ClusterHAT""" -def usbpathfrombus(bus): - """Get USB path (eg 1-1.4.1) for I2C bus.""" - - for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"): - parts = device.split("/") - path = parts[6].split(":")[0] - id = parts[7][4:] - if int(id) == bus: - return path - - return False - - -def getusbpaths(): - """Build list of pi zero numbers to get USB path of.""" - - paths = {} - zeros = [] - - if args > 2: - for zero in sys.argv[2:]: - if zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi): - print("ERROR: Valid options are p1-p" + str(maxpi)) - sys.exit(1) - zeros.append(int(zero[1:])) - - else: - zeros = range(1, maxpi + 1) - - cache_clusterhat = None # USB path to HUB on Cluster HAT - cache_clusterctrl = {} # Cache of ClusterCTRL USB path prefixes - - for zero in zeros: - lastpi = 0 # max pX for the current device - # Get USB path to pi device - if clusterhat: - lastpi += clusterhat_size - if zero <= lastpi: - if version == 1: - if "clusterhatv1" in config: - paths[str(zero)] = config["clusterhatv1"] + "." + str(5 - zero) - if version == 2: - if cache_clusterhat == None: - # Detect Cluster HAT by turning the HUB on / off / on - # First ensure the hub is turned on - if version_minor == 0: - hub.on() - else: - hub.off() - time.sleep(1) - # Get list of USB hubs with the correct pid/vid - import usb.core as prescan - - devices = {} - hubs = prescan.find( - idVendor=0x05E3, idProduct=0x0608, find_all=1 - ) - for clusterhathub in hubs: - devices[ - str(clusterhathub.bus) - + "-" - + ".".join(map(str, clusterhathub.port_numbers)) - ] = "pre" - pre_count = len(devices) - # Turn hub off - if version_minor == 0: - hub.off() - else: - hub.on() - time.sleep(1) - import usb.core as postscan - - hubs = postscan.find( - idVendor=0x05E3, idProduct=0x0608, find_all=1 - ) - for clusterhathub in hubs: - devices[ - str(clusterhathub.bus) - + "-" - + ".".join(map(str, clusterhathub.port_numbers)) - ] = "post" - post_count = len(devices) - # Check we haven't gained an extra USB hubs - if pre_count == post_count: - found = 0 - for path, state in devices.iteritems(): - if state == "pre": - found = found + 1 - cache_clusterhat = path - # Turn hub back on - if version_minor == 0: - hub.on() - else: - hub.off() - # If more than one hub went awol then we don't know which one it should be - if found != 1: - cache_clusterhat = None - if cache_clusterhat != None: - paths[str(zero)] = cache_clusterhat + "." + str(5 - zero) - if clusterctrl: - for c in ctrl: - lastpi += c[3] - if zero <= lastpi and zero > lastpi - c[3]: - if c[0] not in cache_clusterctrl: - # Get USB controllers path - usbpathname = usbpathfrombus(c[2]) - # Get path to controller - send_cmd(c, CMD_GETPATH, 0) - # Remove controllers path from usbpathname - pathdata = "" - for tmp in read_reg(c, REG_DATA7, len=8): - if tmp != 255: - if len(pathdata) > 0: - pathdata = pathdata + "." - pathdata = pathdata + str(tmp) - usbpathname = usbpathname[: -len(pathdata)] - cache_clusterctrl[c[0]] = usbpathname - - # Append path to Px - send_cmd(c, CMD_GETPATH, zero - lastpi + c[3]) - pathdata = "" - for tmp in read_reg(c, REG_DATA7, len=8): - if tmp != 255: - if len(pathdata) > 0: - pathdata = pathdata + "." - pathdata = pathdata + str(tmp) - paths[str(zero)] = cache_clusterctrl[c[0]] + pathdata - return paths - - -def is_float(n): - try: - float(n) - return True - except ValueError: - return False +cli.add_command(led) +cli.add_command(power) +cli.add_command(hub) +cli.add_command(fan) +cli.add_command(save) if __name__ == "__main__": - args = len(sys.argv) - - if ( - args == 1 - or sys.argv[1] == "help" - or sys.argv[1] == "--help" - or sys.argv[1] == "-h" - or sys.argv[1] == "/?" - ): - print( - """Usage :{0} - - can be a single device 'p1' or a list 'p2 p3 p5' - is the order listed by '{0} status' (default 20) - - # Power on/off all or listed device(s) - {0} on|off [] - - # Show status of ClusterHAT/CTRL - {0} status - - # Get number of controllable Pi - {0} maxpi - - # Create/update symlinks for rpiboot [root]" - sudo {0} init - - # Turn ALERT LED on/off for all or listed device(s)" - {0} alert on|off [] - - # Enable LED (Power/pX/etc.) - {0} led on - - # Disable LED (Power/pX/etc.) - {0} led off - - # Turns on/off or resets the USB HUB - {0} hub off|on|reset - - ## The following are only available on ClusterCTRL devices - # Set order on device to - {0} setorder - - # Get USB path to Px - {0} getpath - - # Turns FAN on/off for CTRL with - {0} fan on|off - - # Save current settings to EEPROM - {0} save - - # Save current order to EEPROM - {0} saveorder - - # Save current Power On State to EEPROM - {0} savepos - - # Save factory default settings to EEPROM - {0} savedefaults - """.format( - sys.argv[0] - ) - ) - sys.exit() - - # Read configruation file - config = {} - if os.path.isfile("/etc/default/clusterctrl"): - with open("/etc/default/clusterctrl") as configfile: - for line in configfile: - if line[:1] != "#": - k, v = line.partition("=")[::2] - config[k.strip().lower()] = v.split("#")[0].strip(" \"'\n\t") - - # If we're not a controller of some sort exit cleanly - if "type" not in config or not (config["type"] == "c" or config["type"] == "cnat"): - print("Unable to load config, or invalid config loaded", file=sys.stderr) - sys.exit(1) - - ########## - # Init # - ########## - - # Get Pi power on delay from config - delay = ( - 1 - if "clusterctrl_delay" not in config - or not is_float(config["clusterctrl_delay"]) - or float(config["clusterctrl_delay"]) < 0 - else config["clusterctrl_delay"] - ) - - maxpi = 0 - clusterctrl = False - - # Do we have a ClusterHAT ? - - # Check for override - clusterhat = 1 if "clusterhat_force" not in config else config["clusterhat_force"] - - if clusterhat != 1: - parts = clusterhat.split(".") - version = int(parts[0]) - version_minor = int(parts[1]) - - elif ( - not os.path.isfile(hat_product) - or not os.access(hat_product, os.R_OK) - or not os.path.isfile(hat_uuid) - or not os.access(hat_uuid, os.R_OK) - or not os.path.isfile(hat_vendor) - or not os.access(hat_vendor, os.R_OK) - or not os.path.isfile(hat_pid) - or not os.access(hat_pid, os.R_OK) - or not os.path.isfile(hat_version) - or not os.access(hat_version, os.R_OK) - ): - clusterhat = False # No HAT found - - else: - # HAT has been found validate it - f = open(hat_product, "r") - if f.read().strip("\x00") != "ZC4:ClusterHAT": - clusterhat = False # No ClusterHAT found - - if clusterhat: - version = 0 - f = open(hat_version, "r") - tmp = int(f.read().strip("\x00"), 16) - f.close() - if tmp >= 16 and tmp <= 31: - version = 1 - version_minor = tmp - 16 - elif tmp >= 32 and tmp <= 47: - version = 2 - version_minor = tmp - 32 - else: - clusterhat = False # No ClusterHAT found - - if clusterhat: - clusterhat_size = ( - 4 if "clusterhat_size" not in config else int(config["clusterhat_size"]) - ) - if clusterhat_size > 4: - clusterhat_size = 4 - fangpio = False if "fangpio" not in config else int(config["fangpio"]) - - # Init ClusterHAT if we have one - if clusterhat: - maxpi += clusterhat_size - - if version == 1: - import RPi.GPIO as GPIO - - GPIO.setwarnings(False) - ports = [5, 6, 13, 19, 26] - GPIO.setmode(GPIO.BCM) - GPIO.setup(ports, GPIO.OUT) - - else: # v2.x - wp_link = 0 - bus = smbus.SMBus(1) - hat = Xra1200(bus=1, address=I2C_ADDRESS) - p1 = Xra1200(bus=1, address=I2C_ADDRESS, port=0) - p2 = Xra1200(bus=1, address=I2C_ADDRESS, port=1) - p3 = Xra1200(bus=1, address=I2C_ADDRESS, port=2) - p4 = Xra1200(bus=1, address=I2C_ADDRESS, port=3) - led = Xra1200(bus=1, address=I2C_ADDRESS, port=4) - hub = Xra1200(bus=1, address=I2C_ADDRESS, port=5) - alert = Xra1200(bus=1, address=I2C_ADDRESS, port=6) - wp = Xra1200(bus=1, address=I2C_ADDRESS, port=7) - - # Get status of I/O Extender - dir = hat.get_dir() # I/O pin directions - status = hat.read_byte() # Pin Status - - # Detect I/O Expander - xra1200p = True - pur = hat.get_pur() - if pur == -1: - xra1200p = False - - # If all pins are inputs this is the first run since HAT power up - if dir == 255: - # Detect if WP is being pulled high - if xra1200p: - hat.set_pur(0x7F) # Disable pullup for EEPROM WP on I/O expander - wp_link = hat.read_byte() >> 7 # 1 = soldered / 0 = open - if wp_link == 1: - hat.set_pur(0xFF) - else: - wp.on() - else: - wp.on() - wp_link = -1 - - if (status & 0xF) == 0xF: # Check POS [Power On State] - # POS [NO LINK] set power ON (CUT) - p1.on() - p2.on() - p3.on() - p4.on() - else: - # POS [LINK] set power off (Default) - p1.off() - p2.off() - p3.off() - p4.off() - - # Set default state for other pins - alert.off() - led.on() - if version_minor == 0: - hub.on() - else: - hub.off() - - hat.set_dir(0x00) # Set all pins as outputs - - else: - if version == 2 and xra1200p == True: - if hat.get_pur() >> 7: - wp_link = 1 - else: - wp_link = -1 - - # Get list of ClusterCTRL I2C devices - busses = [] # Get list of devices - for fn in glob.glob(clusterctrl_prefix + "*"): - clusterctrl += 1 - length = len(clusterctrl_prefix) - busses.append((smbus.SMBus(int(fn[length:])), int(fn[length:]))) - - # Ensure we have at least one ClusterCTRL or a ClusterHAT - if len(busses) < 1 and not clusterhat: - print("ERROR: No ClusterHAT/CTRL devices found\n") - sys.exit(1) - - if clusterctrl: - # Make sure we haven't got a conflict on the ClusterCTRL "order" - # When using multiple ClusterCTRL devices they each have an "order" which must be unique - orders = [] - ctrl = [] - - # Loop bus and get order and maxpi - for bus in busses: - bus_order = bus[0].read_byte_data(I2C_ADDRESS, REG_ORDER) - bus_maxpi = bus[0].read_byte_data(I2C_ADDRESS, REG_MAXPI) - maxpi += bus_maxpi - ctrl.append((bus_order, bus[0], bus[1], bus_maxpi)) - orders.append(bus_order) - - if len(orders) > len(set(orders)): # Ensure all enties are unique - print("ERROR: Duplicate ClusterCTRL 'order' found") - for c in ctrl: - print("I2C Bus: " + str(c[2]) + " Order: " + str(c[0])) - sys.exit(1) - - # Sort devices based on order - ctrl.sort(key=lambda tup: tup[0]) - - ############## - ## End Init ## - ############## - - # Parse arguments and do actions - - if args == 2 and sys.argv[1] == "init": - if "link" in config and config["link"] == "1": - # Only root should fiddle with the links - if os.geteuid() == 0 and os.path.isdir(nfsboot) and os.path.isdir(nfsroot): - paths = getusbpaths() - # Delete links for Px - for link in glob.glob(nfsboot + "*-*"): - if os.path.islink(link): - path = os.path.realpath(link) - if path[0 : len(nfsroot)] == nfsroot and path[-5:] == "/boot": - p = path[len(nfsroot) :][:-5] - if p[1:] in paths: - os.unlink(link) - # Create new link for Px - for p, path in sorted(paths.iteritems()): - if path: - # If the link already exists remove it - if os.path.islink(nfsboot + path): - os.unlink(nfsboot + path) - os.symlink(nfsroot + "p" + p + "/boot/", nfsboot + path) - - elif args == 2 and (sys.argv[1] == "on" or sys.argv[1] == "off"): - # Turn on/off ALL devices - if clusterhat: - # Turn all ClusterHAT ports on - actioned = 0 - if version == 1: - alertstatus = GPIO.input(ports[0]) - if not alertstatus: - GPIO.output(ports[0], 1) - for port in ports[1:]: - if actioned >= clusterhat_size: - break - if sys.argv[1] == "on": - if not GPIO.input(port): - GPIO.output(port, 1) - if actioned < maxpi: - time.sleep(delay) - actioned += 1 - else: - GPIO.output(port, 0) - if not alertstatus: - GPIO.output(ports[0], 0) - else: - alertstatus = alert.get() - if not alertstatus: - alert.on() - if sys.argv[1] == "on": - status = hat.read_byte() - if (actioned < clusterhat_size) and ((status & (1 << (0))) == 0): - p1.on() - time.sleep(delay) - actioned = actioned + 1 - if (actioned < clusterhat_size) and ((status & (1 << (1))) == 0): - p2.on() - time.sleep(delay) - actioned = actioned + 1 - if (actioned < clusterhat_size) and ((status & (1 << (2))) == 0): - p3.on() - time.sleep(delay) - actioned = actioned + 1 - if (actioned < clusterhat_size) and ((status & (1 << (3))) == 0): - p4.on() - if clusterctrl: - time.sleep( - delay - ) # delay again if we have ClusterCTRL devices - actioned = actioned + 1 - else: - p1.off() - p2.off() - p3.off() - p4.off() - if not alertstatus: - alert.off() - if clusterctrl: - # Turn all ClusterCTRL ports on - # Loop through devices - i = clusterhat_size - for c in ctrl: - send_cmd(c, CMD_ALERT_ON) - for pi in range(1, c[3] + 1): - i += 1 - if sys.argv[1] == "on": - send_cmd(c, CMD_GET_PSTATUS, pi) - if read_reg(c, REG_DATA0) == 0: - send_cmd(c, CMD_ON, pi) - if i < maxpi: - time.sleep(delay) # Delay on all but last - else: - send_cmd(c, CMD_OFF, pi) - send_cmd(c, CMD_ALERT_OFF) - - elif args > 2 and (sys.argv[1] == "on" or sys.argv[1] == "off"): - # Turn on/off pX - actioned = 0 - # Build list of pi zero numbers to turn alert LED on for - zeros = [] - for zero in sys.argv[2:]: - if zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi): - print("ERROR: Valid options are p1-p" + str(maxpi)) - sys.exit(1) - zeros.append(int(zero[1:])) - for zero in zeros: - lastpi = 0 # max pX for the current device - if clusterhat: - lastpi += clusterhat_size - if zero <= lastpi: - if version == 1: - actioned += 1 - if sys.argv[1] == "on": - if not GPIO.input(ports[zero]): - GPIO.output(ports[zero], 1) - if actioned < len(zeros): - time.sleep(delay) - else: - GPIO.output(ports[zero], 0) - else: - if sys.argv[1] == "on": - status = hat.read_byte() - actioned += 1 - if zero == 1: - if (status & (1 << (0))) == 0: - p1.on() - if actioned < len(zeros): - time.sleep(delay) - elif zero == 2: - if (status & (1 << (1))) == 0: - p2.on() - if actioned < len(zeros): - time.sleep(delay) - elif zero == 3: - if (status & (1 << (2))) == 0: - p3.on() - if actioned < len(zeros): - time.sleep(delay) - elif zero == 4: - if (status & (1 << (3))) == 0: - p4.on() - if actioned < len(zeros): - time.sleep(delay) - else: - if zero == 1: - p1.off() - elif zero == 2: - p2.off() - elif zero == 3: - p3.off() - elif zero == 4: - p4.off() - continue - if clusterctrl: - for c in ctrl: - lastpi += c[3] - if zero <= lastpi: - if sys.argv[1] == "on": - # Get power status for Pi Zero - send_cmd(c, CMD_GET_PSTATUS, zero - lastpi + c[3]) - # Only turn on/delay if it's currently off - if read_reg(c, REG_DATA0) == 0: - send_cmd(c, CMD_ON, zero - lastpi + c[3]) - if actioned < len(zeros): - time.sleep(delay) - actioned += 1 - else: - send_cmd(c, CMD_OFF, zero - lastpi + c[3]) - break - - elif ( - args > 2 - and sys.argv[1] == "usbboot" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - # Enable of Disable USBBOOT (supported on Compute Modules) for Px - actioned = 0 - # Build list of pi zero numbers to turn USBBOOT on for - zeros = [] - for zero in sys.argv[3:]: - if zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi): - print("ERROR: Valid options are p1-p" + str(maxpi)) - sys.exit(1) - zeros.append(int(zero[1:])) - for zero in zeros: - lastpi = 0 # max pX for the current device - if clusterhat: - lastpi += clusterhat_size - if zero <= lastpi: # Ignore any Px on Cluster HAT - continue - if clusterctrl: - for c in ctrl: - lastpi += c[3] - if zero <= lastpi: - if sys.argv[2] == "on": - # Turn USBBOOT on for Px - send_cmd(c, CMD_USBBOOT_EN, zero - lastpi + c[3]) - actioned += 1 - else: - send_cmd(c, CMD_USBBOOT_DIS, zero - lastpi + c[3]) - - elif args == 2 and sys.argv[1] == "status": - # Show status of all Cluster HAT / ClusterCTRL devices - print("clusterhat:{}".format(clusterhat)) - print("clusterctrl:{}".format(clusterctrl)) - print("maxpi:{}".format(maxpi)) - cnt = 0 - print("throttled:{}".format(get_throttled())) - if clusterctrl: - s = "" - i = 0 - for c in ctrl: - s += str(c[0]) + ":" + str(c[2]) + ":" + str(c[3]) - if i < len(ctrl): - s += " " - print("ctrl_bus:{}".format(s)) - if clusterhat: - print("hat_version:{}.{}".format(version, version_minor)) - print("hat_version_major:{}".format(version)) - print("hat_version_minor:{}".format(version_minor)) - print("hat_size:{}".format(clusterhat_size)) - if "clusterhat_force" in config: - print("hat_uuid:NA") - print("hat_vendor:NA") - print("hat_pid:NA") - print("hat_force:{}".format(config["clusterhat_force"])) - else: - f = open(hat_uuid, "r") - print("hat_uuid:{}".format(f.read().strip("\x00"))) - f.close() - f = open(hat_vendor, "r") - print("hat_vendor:{}".format(f.read().strip("\x00"))) - f.close() - f = open(hat_pid, "r") - print("hat_product_id:{}".format(f.read().strip("\x00"))) - f.close() - if version == 1: - print("hat_alert:{}".format(GPIO.input(ports[0]))) - for p in range(1, clusterhat_size + 1): - print("p{}:{}".format(p, GPIO.input(ports[p]))) - else: - print("hat_alert:{}".format(alert.get())) - if version_minor == 0: - print("hat_hub:{:d}".format(hub.get())) - else: - print("hat_hub:{:d}".format(not hub.get())) - print("hat_wp:{}".format(wp.get())) - print("hat_led:{}".format(led.get())) - print("hat_wplink:{}".format(wp_link)) - print("hat_xra1200p:{}".format(xra1200p)) - status = hat.read_byte() - for p in range(1, clusterhat_size + 1): - print("p{}:{:d}".format(p, ((status & (1 << (p - 1))) > 0))) - cnt += clusterhat_size - if clusterctrl: - # Power/USBBOOT status for Px - for c in ctrl: - info = "" - # Get firmware version - send_cmd(c, CMD_GET_DATA, GET_DATA_VERSION) - data = read_reg(c, REG_DATA1, 2) - ctrl_version = float(str(data[0]) + "." + str(data[1])) - fw_major = data[0] - fw_minor = data[1] - # Get number of ADC supported - send_cmd(c, CMD_GET_DATA, GET_DATA_ADC_CNT) - for adc in range(read_reg(c, REG_DATA0)): - send_cmd(c, CMD_GET_DATA, GET_DATA_ADC_READ, adc + 1) - data = read_reg(c, REG_DATA2, 3) - if data[2] == 1: # Voltage type '1' 3v3 REF, Voltage /2 - voltage = int(((data[0] << 8) + data[1]) * 6.4453125) - info += " ADC" + str(adc + 1) + ":" + str(voltage) + "mV" - if ( - data[2] == 2 - ): # Voltage type '2' 3v3 REF, Voltage = ((VIN*1.07)/10+1.07) - voltage = int(((data[0] << 8) + data[1]) * 33.34093896028037) - info += " ADC" + str(adc + 1) + ":" + str(voltage) + "mV" - send_cmd(c, CMD_GET_DATA, GET_DATA_ADC_TEMP) - data = read_reg(c, REG_DATA2, 3) - if data[2] == 2: - temp = (((data[0] << 8) + data[1]) - 247) / 1.22 - info += " T1:" + format(temp, ".2f") + "C" - if fw_major == 1 and fw_minor == 6: - send_cmd(c, CMD_GET_DATA, GET_DATA_FANSTATUS) - data = read_reg(c, REG_DATA0) - info += " FAN:{:08b}".format(data) - print("ctrl{}:FW:{} {}".format(c[0], ctrl_version, info.strip())) - for pi in range(1, c[3] + 1): - send_cmd(c, CMD_GET_PSTATUS, pi) - cnt += 1 - print("p{}:{}".format(cnt, read_reg(c, REG_DATA0))) - send_cmd(c, CMD_GET_USTATUS, pi) - # Only show USBBOOT if supported - if read_reg(c, REG_DATA0) != 0xFF: - print("u{}:{}".format(cnt, read_reg(c, REG_DATA0))) - - elif ( - args == 3 - and sys.argv[1] == "hub" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - if clusterhat: - if version == 1: - print("ERROR: hub control not supported on Cluster HAT v1.x\n") - else: - if sys.argv[2] == "on": - if version_minor == 0: - hub.on() - else: - hub.off() - else: - if version_minor == 0: - hub.off() - else: - hub.on() - - # if (clusterctrl): # TODO - elif args == 3 and sys.argv[1] == "hub" and (sys.argv[2] == "reset"): - if clusterhat and version != 1: - if version_minor == 0: - hub.off() - time.sleep(delay) - hub.on() - else: - hub.on() - time.sleep(delay) - hub.off() - if clusterctrl: - for c in ctrl: - send_cmd(c, CMD_HUB_CYCLE) - - elif ( - args == 3 - and sys.argv[1] == "alert" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - # Turn ALL ALERT LED on/off - if clusterhat: - if version == 1: - if sys.argv[2] == "on": - GPIO.output(ports[0], 1) - else: - GPIO.output(ports[0], 0) - else: - if sys.argv[2] == "on": - alert.on() - else: - alert.off() - - if clusterctrl: - for c in ctrl: - if sys.argv[2] == "on": - send_cmd(c, CMD_ALERT_ON) - else: - send_cmd(c, CMD_ALERT_OFF) - - elif ( - args > 3 - and sys.argv[1] == "alert" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - # Turn on/off ALERT LED for pX - - # Build list of pi zero numbers to turn alert LED on for - zeros = [] - for zero in sys.argv[3:]: - if zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi): - print("ERROR: Valid options are p1-p" + str(maxpi)) - sys.exit(1) - zeros.append(int(zero[1:])) - - for zero in zeros: - lastpi = 0 # max pX for the current device - if clusterhat: - lastpi += clusterhat_size - if zero <= lastpi: - if version == 1: - if sys.argv[2] == "on": - GPIO.output(ports[0], 1) - else: - GPIO.output(ports[0], 0) - else: - if sys.argv[2] == "on": - alert.on() - else: - alert.off() - continue - if clusterctrl: - for c in ctrl: - lastpi += c[3] - if zero <= lastpi: - if sys.argv[2] == "on": - send_cmd(c, CMD_ALERT_ON) - else: - send_cmd(c, CMD_ALERT_OFF) - break - - elif ( - args == 3 - and sys.argv[1] == "led" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - # Enable or Disable LED (not supported on ClusterHAT v1.x) - if clusterhat and version == 2: - if sys.argv[2] == "on": - led.on() - else: - led.off() - if clusterctrl: - for c in ctrl: - if sys.argv[2] == "on": - send_cmd(c, CMD_LED_EN, 0) - else: - send_cmd(c, CMD_LED_DIS, 0) - - elif ( - args == 3 - and sys.argv[1] == "wp" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - # Not supported on ClusterCTRL or ClusterHAT v1.x - if clusterhat and version == 2: - if sys.argv[2] == "on": - wp.on() - else: - if xra1200p and wp_link: - print("Unable to disable EEPROM WP (Solder link set)") - else: - wp.off() - - elif args > 1 and sys.argv[1] == "getpath": - paths = getusbpaths() - for p, path in sorted(paths.iteritems()): - print("p{}:{}".format(p, path)) - - elif args == 3 and sys.argv[1] == "savedefaults": - # Set default EEPROM for device with "order" - if int(sys.argv[2]) < 1 or int(sys.argv[2]) > 255: - print("Invalid order") - sys.exit(1) - if clusterctrl: - for c in ctrl: - if int(sys.argv[2]) == int(c[0]): - send_cmd(c, CMD_SAVEDEFAULTS) - print("saved") - sys.exit() - print("Error: Unable to find Cluster CTRL device with that order") - - elif args == 4 and sys.argv[1] == "setorder": - if int(sys.argv[2]) < 1 or int(sys.argv[2]) > 255: - print("Invalid order old") - sys.exit(1) - if int(sys.argv[3]) < 1 or int(sys.argv[3]) > 255: - print("Invalid order new") - sys.exit(1) - if clusterctrl: - for c in ctrl: - if int(sys.argv[2]) == int(c[0]): - send_cmd(c, CMD_SET_ORDER, int(sys.argv[3])) - - elif args == 3 and sys.argv[1] == "save": - # Set Power on state/USBBOOT/order to EEPROM for device with "order" - if int(sys.argv[2]) < 1 or int(sys.argv[2]) > 255: - print("Invalid order") - sys.exit(1) - if clusterctrl: - for c in ctrl: - if int(sys.argv[2]) == int(c[0]): - send_cmd(c, CMD_SAVE) - print("saved") - sys.exit() - print("Error: Unable to find Cluster CTRL device with that order") - - elif args == 3 and sys.argv[1] == "saveorder": - # Set order to EEPROM for device with "order" - if int(sys.argv[2]) < 1 or int(sys.argv[2]) > 255: - print("Invalid order") - if clusterctrl: - for c in ctrl: - if int(sys.argv[2]) == int(c[0]): - send_cmd(c, CMD_SAVE_ORDER) - print("saved") - sys.exit() - print("Error: Unable to find Cluster CTRL device with that order") - - elif args == 3 and sys.argv[1] == "saveusbboot": - # Set usbboot to EEPROM for device with "order" - if int(sys.argv[2]) < 1 or int(sys.argv[2]) > 255: - print("Invalid order") - if clusterctrl: - for c in ctrl: - if int(sys.argv[2]) == int(c[0]): - send_cmd(c, CMD_SAVE_USBBOOT) - print("saved") - sys.exit() - print("Error: Unable to find Cluster CTRL device with that order") - - elif args == 3 and sys.argv[1] == "savepos": - # Set Power On State to EEPROM for device with "order" - if int(sys.argv[2]) < 1 or int(sys.argv[2]) > 255: - print("Invalid order") - if clusterctrl: - for c in ctrl: - if int(sys.argv[2]) == int(c[0]): - send_cmd(c, CMD_SAVE_POS) - print("saved") - sys.exit() - print("Error: Unable to find Cluster CTRL device with that order") - - elif args == 3 and sys.argv[1] == "reset": - # Reset Cluster CTRL device with "order" - if int(sys.argv[2]) < 1 or int(sys.argv[2]) > 255: - print("Invalid order") - sys.exit(1) - if clusterctrl: - for c in ctrl: - if int(sys.argv[2]) == int(c[0]): - send_cmd(c, CMD_RESET) - print("reset") - sys.exit() - print("Error: Unable to find Cluster CTRL device with that order") - - elif ( - args == 3 - and sys.argv[1] == "fan" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - # Turn all fan on/off - - # "ClusterHAT" using GPIO - if clusterhat and fangpio: - import RPi.GPIO as GPIO - - GPIO.setwarnings(False) - GPIO.setmode(GPIO.BCM) - GPIO.setup(fangpio, GPIO.OUT) - GPIO.output(fangpio, 1) - if sys.argv[2] == "on": - GPIO.output(fangpio, 1) - else: - GPIO.output(fangpio, 0) - - if clusterctrl: - for c in ctrl: - if sys.argv[2] == "on": - send_cmd(c, CMD_FAN, 1) - else: - send_cmd(c, CMD_FAN, 0) - - elif ( - args == 4 - and sys.argv[1] == "fan" - and (sys.argv[2] == "on" or sys.argv[2] == "off") - ): - # Turn fan on/off for CTRL device with "order" or Controller if arg is "c" - if sys.argv[3] != "c" and (int(sys.argv[3]) < 1 or int(sys.argv[3]) > 255): - print("Invalid order") - if clusterhat and fangpio and sys.argv[3] == "c": - import RPi.GPIO as GPIO - - GPIO.setwarnings(False) - GPIO.setmode(GPIO.BCM) - GPIO.setup(fangpio, GPIO.OUT) - if sys.argv[2] == "on": - GPIO.output(fangpio, 1) - else: - GPIO.output(fangpio, 0) - sys.exit() - if clusterctrl: - for c in ctrl: - if int(sys.argv[3]) == int(c[0]): - if sys.argv[2] == "on": - send_cmd(c, CMD_FAN, 1) - else: - send_cmd(c, CMD_FAN, 0) - sys.exit() - - elif args == 2 and sys.argv[1] == "maxpi": - print(maxpi) - - else: - print("Error: Missing arguments") + cli() diff --git a/projects/clusterctrl/src/python/clusterctrl/cmd/alert.py b/projects/clusterctrl/src/python/clusterctrl/cmd/alert.py new file mode 100644 index 0000000..60efce9 --- /dev/null +++ b/projects/clusterctrl/src/python/clusterctrl/cmd/alert.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +# alert on [] # Turns on all ALERT LED or for pX devices +# alert off [] # Turns off all ALERT LED or for pX devices + +import click + + +@click.group() +def alert(): + pass + + +@alert.command("on") +def alert_on(): + pass + + +@alert.command("off") +def alert_off(): + pass diff --git a/projects/clusterctrl/src/python/clusterctrl/cmd/fan.py b/projects/clusterctrl/src/python/clusterctrl/cmd/fan.py new file mode 100644 index 0000000..7f8acbb --- /dev/null +++ b/projects/clusterctrl/src/python/clusterctrl/cmd/fan.py @@ -0,0 +1,18 @@ +"""Turns FAN on/off for CTRL with .""" + +import click + + +@click.group() +def fan(): + pass + + +@fan.command("on") +def fan_on(): + pass + + +@fan.command("off") +def fan_off(): + pass diff --git a/projects/clusterctrl/src/python/clusterctrl/cmd/hub.py b/projects/clusterctrl/src/python/clusterctrl/cmd/hub.py new file mode 100644 index 0000000..f4a58fd --- /dev/null +++ b/projects/clusterctrl/src/python/clusterctrl/cmd/hub.py @@ -0,0 +1,23 @@ +"""USB hub can be turned on/off on Cluster HAT and reset on CTRL""" + +import click + + +@click.group() +def hub(): + pass + + +@hub.command("on") +def hub_on(): + pass + + +@hub.command("off") +def hub_off(): + pass + + +@hub.command("reset") +def hub_reset(): + pass diff --git a/projects/clusterctrl/src/python/clusterctrl/cmd/led.py b/projects/clusterctrl/src/python/clusterctrl/cmd/led.py new file mode 100644 index 0000000..0e513a9 --- /dev/null +++ b/projects/clusterctrl/src/python/clusterctrl/cmd/led.py @@ -0,0 +1,18 @@ +"""Enable or disable all status LEDs.""" + +import click + + +@click.group() +def led(): + pass + + +@led.command("on") +def led_on(): + pass + + +@led.command("off") +def led_off(): + pass diff --git a/projects/clusterctrl/src/python/clusterctrl/cmd/power.py b/projects/clusterctrl/src/python/clusterctrl/cmd/power.py new file mode 100644 index 0000000..99c7feb --- /dev/null +++ b/projects/clusterctrl/src/python/clusterctrl/cmd/power.py @@ -0,0 +1,21 @@ +import click + + +# power on [] # Turn on All Pi Zero or devices +# power off [] # Turn off All Pi Zero or devices + + +@click.group() +def power(): + pass + +@power.command("on") +@click.argument("devices", nargs="*") +def power_on(devices): + """Power on selected devices.""" + + +@power.command() +@click.argument("devices", nargs="*") +def power_off(devices): + """Power off selected devices.""" diff --git a/projects/clusterctrl/src/python/clusterctrl/cmd/save.py b/projects/clusterctrl/src/python/clusterctrl/cmd/save.py new file mode 100644 index 0000000..2d88405 --- /dev/null +++ b/projects/clusterctrl/src/python/clusterctrl/cmd/save.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +# save all # Save current settings to EEPROM +# save order # Save current "order" setting to EEPROM +# save usbboot # Save current USBBOOT settings to EEPROM +# save pos # Save current Power On State to EEPROM +# save defaults # Save default settings to EEPROM + +import click + + +@click.group() +def save(): + pass + + +@save.command("all") +def save_all(): + pass + + +@save.command("order") +def save_order(): + pass + + +@save.command("usbboot") +def save_usbboot(): + pass + + +@save.command("power") +def save_power(): + pass + + +@save.command("defaults") +def save_defaults(): + pass diff --git a/projects/clusterctrl/src/python/clusterctrl/driver.py b/projects/clusterctrl/src/python/clusterctrl/driver.py index 2e6b65c..9c88904 100644 --- a/projects/clusterctrl/src/python/clusterctrl/driver.py +++ b/projects/clusterctrl/src/python/clusterctrl/driver.py @@ -2,10 +2,30 @@ from enum import Enum from itertools import chain, repeat +import logging +from random import SystemRandom from time import sleep -from typing import Union +from typing import NamedTuple, Union -import smbus + +log = logging.getLogger(__name__) + + +smbus = None +if not smbus: + try: + import smbus + except ImportError as e: + log.warning(e) + +if not smbus: + try: + import smbus2 as smbus + except ImportError as e: + log.warning(e) + +if not smbus: + raise ImportError("Unable to load either SMBus or SMBus2") def once(f): @@ -113,14 +133,43 @@ class Data(Enum): FANSTATUS = 0x04 # Read fan status -class ClusterCTRLDriver(object): - def __init__(self, bus: smbus.SMBus, address: int = I2C_ADDRESS, delay: int = 0, clear = False): +class PiID(NamedTuple): + """Represent Pi IDs as something somewhat unique; a CTRL/HAT id and the Pi ID. + + These IDs are expected to be unique at the host level; not at the cluster level. + + """ + ctrl_id: int + pi_id: int + + def __repr__(self) -> str: + return f"" + + +class ClusterCTRLv2Driver(object): + def __init__(self, bus: smbus2.SMBus, address: int = I2C_ADDRESS, delay: int = 0, clear = False): """Initialize a ClusterCTRL/ClusterHAT driver instance for a given bus device.""" self._bus = bus self._address = address self._delay = delay self._clear = clear + try: + if (version := self._read(Reg.VERSION)) != 2: + raise IOError(f"Unsupported register format {version}; expected 2") + except: + raise ValueError("Cannot communicate with a ClusterCTRL/ClusterHAT on the given bus") + + # This is a firmware default value indicating an uninitialized board. + # Randomize it if present. + if self.get_order() == 20: + v = 20 + r = SystemRandom() + while v == 20: + v = r.randint(0, 256) + self.set_order(v) + self.eeprom_save_order() + def _read(self, id: Union[Reg, Data], len: int = 1): """A convenient abstraction for reading data back.""" @@ -182,24 +231,32 @@ class ClusterCTRLDriver(object): # Return the (mostly) meaningful return code return self._read(Reg.DATA0) + def _id(self, id: PiID) -> int: + """Validate a logical ID and convert it to a numeric one.""" + assert self.min_pi <= id <= self.max_pi + assert self.get_order() == id.ctrl_id + return id.pi_id + @property def min_pi(self): """Get the minimum supported Pi ID on this controller.""" - return 1 + return PiID(self.get_order(), 1) @property @once def max_pi(self): """Get the maximum supported Pi ID on this controller.""" - return self._read(Reg.MAXPI) + return PiID(self.get_order(), self._read(Reg.MAXPI)) @property def pi_ids(self): """Iterate over the IDs of Pis which could be connected to this controller.""" - return range(self.min_pi, self.max_pi + 1) + order = self.get_order() + for i in range(1, 6): + yield PiID(order, i) @property def type(self) -> BoardType: @@ -275,22 +332,22 @@ class ClusterCTRLDriver(object): #################################################################################################### # Power management #################################################################################################### - def power_on(self, id: int): + def power_on(self, id: PiID): """Power on a given slot by ID.""" - assert 0 < id <= self.max_pi + id = self._id(id) return self._call(Cmd.ON, id) - def power_off(self, id: int): + def power_off(self, id: PiID): """Power off a given slot by ID.""" - assert 0 < id <= self.max_pi + id = self._id(id) return self._call(Cmd.OFF, id) - def power_status(self, id: int): + def power_status(self, id: PiID): """Read the status of a given slot by ID.""" - assert 0 < id <= self.max_pi + id = self._id(id) return self._call(Cmd.GET_PSTATUS, id) def power_all_on(self): @@ -348,28 +405,29 @@ class ClusterCTRLDriver(object): def set_order(self, order: int): """Set an 'order' (Controller ID) value.""" - assert 0 < order <= 255 + assert 0 < order <= 255, "Order must be in the single byte range" + assert order != 20, "20 is the uninitialized order value, use something else" return self._call(Cmd.SET_ORDER, order) #################################################################################################### # USB booting #################################################################################################### - def usbboot_on(self, id: int): + def usbboot_on(self, id: PiID): """Enable USB booting for the given Pi.""" - assert 0 < id <= self.max_pi + id = self._id(id) return self._call(Cmd.USBBOOT_EN, id) - def usbboot_off(self, id: int): + def usbboot_off(self, id: PiID): """Disable USB booting for the given Pi.""" - assert 0 < id <= self.max_pi + id = self._id(id) return self._call(Cmd.USBBOOT_DIS, id) - def usbboot_status(self, id: int): + def usbboot_status(self, id: PiID): """Get the current USB booting status for the given Pi.""" - assert 0 < id <= self.max_pi + id = self._id(id) return self._call(Cmd.GET_USTATUS, id) #################################################################################################### @@ -384,7 +442,8 @@ class ClusterCTRLDriver(object): def adc_ids(self): return range(1, self.max_adc + 1) - def read_adc(self, id: int): + def read_adc(self, id: PiID): + id = self._id(id) self._call(Cmd.GET_DATA, Data.ADC_READ.value, id) # Now this is screwy. # DATA0 gets set to 0 or 1, indicating the voldage type @@ -412,19 +471,17 @@ class ClusterCTRLDriver(object): if self._call(Cmd.GET_DATA, Data.ADC_TEMP.value) == 2: return self._repack(self._read(Reg.DATA2, 2)) - -class ClusterHATDriver(ClusterCTRLDriver): - """The ClusterHAT controller supports some verbs not supported by the basic ClusterCTRL board.""" - - # FIXME: The ClusterHAT also has some CONSIDERABLE differences in how it does I/O, due to leveraging RPi GPIO not - # just i2c. Whether this is essential or incidental is unclear. - - def led_on(self, id: int): + #################################################################################################### + # Operations with inconsistent platform support + #################################################################################################### + def led_on(self, id: PiID): """Turn on an LED by ID.""" + id = self._id(id) return self._call(Cmd.LED_EN, id) - def led_off(self, id: int): + def led_off(self, id: PiID): """Turn off an LED by ID.""" + id = self._id(id) return self._call(Cmd.LED_DIS, id) diff --git a/tools/python/requirements.txt b/tools/python/requirements.txt index 7e311f0..6e9ce55 100644 --- a/tools/python/requirements.txt +++ b/tools/python/requirements.txt @@ -75,6 +75,7 @@ requests-toolbelt==0.9.1 requirements-parser==0.2.0 setuptools==51.0.0 six==1.15.0 +smbus2==0.4.1 snowballstemmer==2.1.0 sortedcontainers==2.3.0 soupsieve==2.2.1