From bee293b9d220e54b442be4cdcc21fc39a372535d Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Mon, 11 Oct 2021 22:26:44 -0600 Subject: [PATCH] Hoist out helper fns Note that since __main__ is in global scope, no closure conversion is required --- projects/hatctl/src/python/hatctl/__main__.py | 296 +++++++++--------- 1 file changed, 152 insertions(+), 144 deletions(-) diff --git a/projects/hatctl/src/python/hatctl/__main__.py b/projects/hatctl/src/python/hatctl/__main__.py index 542c560..b661496 100644 --- a/projects/hatctl/src/python/hatctl/__main__.py +++ b/projects/hatctl/src/python/hatctl/__main__.py @@ -111,6 +111,158 @@ nfsboot = "/var/lib/clusterctrl/boot/" nfsroot = "/var/lib/clusterctrl/nfs/" +def send_cmd(c, cmd, data0=None, data1=None, data2=None, data3=None, data4=None, data5=None, data6=None, data7=None): + """Send command to ClusterCTRL via I2C.""" + #print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3, data4, data5, data6, data7)) + if (data7 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA7, data7) + if (data6 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA6, data6) + if (data5 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA5, data5) + if (data4 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA4, data4) + if (data3 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA3, data3) + if (data2 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA2, data2) + if (data1 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA1, data1) + if (data0 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA0, data0) + try: + c[1].write_byte_data(I2C_ADDRESS, REG_CMD, cmd) + except IOError: + return False + + +def read_reg(c, offset, len=1): + """Read register from ClusterCTRL via I2C.""" + + if (len>1): + tmp = c[1].read_i2c_block_data(I2C_ADDRESS, offset, len) + else: + tmp = c[1].read_byte_data(I2C_ADDRESS, offset) + return tmp + + +def get_throttled(): + """Get throttled status.""" + if (not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath, os.X_OK)): + return "NA" + return ((os.popen(vcgencmdpath + " get_throttled").readline()).split("=", 1)[-1].strip()) + + +def usbpathfrombus(bus): + """Get USB path (eg 1-1.4.1) for I2C bus.""" + + for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"): + parts = device.split("/") + path = parts[6].split(":")[0] + id = parts[7][4:] + if int(id) == bus: + return path + + return False + + +def getusbpaths(): + """Build list of pi zero numbers to get USB path of.""" + + paths = {} + zeros = [] + + if (args > 2): + for zero in sys.argv[2:]: + if (zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi)): + print("ERROR: Valid options are p1-p"+str(maxpi)) + sys.exit(1) + zeros.append(int(zero[1:])) + + else: + zeros = range(1, maxpi+1) + + cache_clusterhat = None # USB path to HUB on Cluster HAT + cache_clusterctrl = {} # Cache of ClusterCTRL USB path prefixes + + for zero in zeros: + lastpi = 0 # max pX for the current device + # Get USB path to pi device + if (clusterhat): + lastpi+=clusterhat_size + if (zero<=lastpi): + if (version == 1): + if "clusterhatv1" in config: + paths[str(zero)] = config["clusterhatv1"]+"."+str(5-zero) + if (version == 2): + if cache_clusterhat == None: + # Detect Cluster HAT by turning the HUB on / off / on + # First ensure the hub is turned on + if (version_minor == 0): + hub.on() + else: + hub.off() + time.sleep(1) + # Get list of USB hubs with the correct pid/vid + import usb.core as prescan + devices = {} + hubs = prescan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1) + for clusterhathub in hubs: + devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "pre" + pre_count = len(devices) + # Turn hub off + if (version_minor == 0): + hub.off() + else: + hub.on() + time.sleep(1) + import usb.core as postscan + hubs = postscan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1) + for clusterhathub in hubs: + devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "post" + post_count = len(devices) + # Check we haven't gained an extra USB hubs + if pre_count == post_count: + found = 0 + for path, state in devices.iteritems(): + if (state=="pre"): + found=found+1 + cache_clusterhat=path + # Turn hub back on + if (version_minor == 0): + hub.on() + else: + hub.off() + # If more than one hub went awol then we don't know which one it should be + if found != 1: cache_clusterhat=None + if (cache_clusterhat != None): paths[str(zero)] = cache_clusterhat+"."+str(5-zero) + if (clusterctrl): + for c in ctrl: + lastpi+=c[3] + if (zero<=lastpi and zero > lastpi-c[3]): + if (c[0] not in cache_clusterctrl): + # Get USB controllers path + usbpathname = usbpathfrombus(c[2]) + # Get path to controller + send_cmd(c, CMD_GETPATH, 0) + # Remove controllers path from usbpathname + pathdata = "" + for tmp in read_reg(c, REG_DATA7, len=8): + if tmp!=255: + if (len(pathdata)>0): pathdata=pathdata+"." + pathdata=pathdata+str(tmp) + usbpathname=usbpathname[:-len(pathdata)] + cache_clusterctrl[c[0]] = usbpathname + # Append path to Px + send_cmd(c, CMD_GETPATH, zero-lastpi+c[3]) + pathdata = "" + for tmp in read_reg(c, REG_DATA7, len=8): + if tmp!=255: + if (len(pathdata)>0): pathdata=pathdata+"." + pathdata=pathdata+str(tmp) + paths[str(zero)] = cache_clusterctrl[c[0]]+pathdata + return paths + +def is_float(n): + try: + float(n) + return True + except ValueError: + return False + + if __name__ == "__main__": args = len(sys.argv) @@ -182,150 +334,6 @@ if __name__ == "__main__": print("Unable to load config, or invalid config loaded", file=sys.stderr) sys.exit(1) - # Functions - # Send command to ClusterCTRL via I2C - def send_cmd(c, cmd, data0=None, data1=None, data2=None, data3=None, data4=None, data5=None, data6=None, data7=None): - #print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3, data4, data5, data6, data7)) - if (data7 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA7, data7) - if (data6 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA6, data6) - if (data5 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA5, data5) - if (data4 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA4, data4) - if (data3 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA3, data3) - if (data2 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA2, data2) - if (data1 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA1, data1) - if (data0 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA0, data0) - try: - c[1].write_byte_data(I2C_ADDRESS, REG_CMD, cmd) - except IOError: - return False - - # Read register from ClusterCTRL via I2C - def read_reg(c, offset, len=1): - if (len>1): - tmp = c[1].read_i2c_block_data(I2C_ADDRESS, offset, len) - else: - tmp = c[1].read_byte_data(I2C_ADDRESS, offset) - return tmp - - # Get throttled status - def get_throttled(): - if (not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath, os.X_OK)): - return "NA" - return ((os.popen(vcgencmdpath + " get_throttled").readline()).split("=", 1)[-1].strip()) - - # Get USB path (eg 1-1.4.1) for I2C bus - def usbpathfrombus(bus): - for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"): - parts = device.split("/") - path = parts[6].split(":")[0] - id = parts[7][4:] - if int(id) == bus: - return path - return False - - # Build list of pi zero numbers to get USB path of - def getusbpaths(): - paths = {} - zeros = [] - - if (args > 2): - for zero in sys.argv[2:]: - if (zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi)): - print("ERROR: Valid options are p1-p"+str(maxpi)) - sys.exit(1) - zeros.append(int(zero[1:])) - - else: - zeros = range(1, maxpi+1) - - cache_clusterhat = None # USB path to HUB on Cluster HAT - cache_clusterctrl = {} # Cache of ClusterCTRL USB path prefixes - - for zero in zeros: - lastpi = 0 # max pX for the current device - # Get USB path to pi device - if (clusterhat): - lastpi+=clusterhat_size - if (zero<=lastpi): - if (version == 1): - if "clusterhatv1" in config: - paths[str(zero)] = config["clusterhatv1"]+"."+str(5-zero) - if (version == 2): - if cache_clusterhat == None: - # Detect Cluster HAT by turning the HUB on / off / on - # First ensure the hub is turned on - if (version_minor == 0): - hub.on() - else: - hub.off() - time.sleep(1) - # Get list of USB hubs with the correct pid/vid - import usb.core as prescan - devices = {} - hubs = prescan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1) - for clusterhathub in hubs: - devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "pre" - pre_count = len(devices) - # Turn hub off - if (version_minor == 0): - hub.off() - else: - hub.on() - time.sleep(1) - import usb.core as postscan - hubs = postscan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1) - for clusterhathub in hubs: - devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "post" - post_count = len(devices) - # Check we haven't gained an extra USB hubs - if pre_count == post_count: - found = 0 - for path, state in devices.iteritems(): - if (state=="pre"): - found=found+1 - cache_clusterhat=path - # Turn hub back on - if (version_minor == 0): - hub.on() - else: - hub.off() - # If more than one hub went awol then we don't know which one it should be - if found != 1: cache_clusterhat=None - if (cache_clusterhat != None): paths[str(zero)] = cache_clusterhat+"."+str(5-zero) - if (clusterctrl): - for c in ctrl: - lastpi+=c[3] - if (zero<=lastpi and zero > lastpi-c[3]): - if (c[0] not in cache_clusterctrl): - # Get USB controllers path - usbpathname = usbpathfrombus(c[2]) - # Get path to controller - send_cmd(c, CMD_GETPATH, 0) - # Remove controllers path from usbpathname - pathdata = "" - for tmp in read_reg(c, REG_DATA7, len=8): - if tmp!=255: - if (len(pathdata)>0): pathdata=pathdata+"." - pathdata=pathdata+str(tmp) - usbpathname=usbpathname[:-len(pathdata)] - cache_clusterctrl[c[0]] = usbpathname - # Append path to Px - send_cmd(c, CMD_GETPATH, zero-lastpi+c[3]) - pathdata = "" - for tmp in read_reg(c, REG_DATA7, len=8): - if tmp!=255: - if (len(pathdata)>0): pathdata=pathdata+"." - pathdata=pathdata+str(tmp) - paths[str(zero)] = cache_clusterctrl[c[0]]+pathdata - return paths - - def is_float(n): - try: - float(n) - return True - except ValueError: - return False - ########## # Init # ##########