Hoist out helper fns

Note that since __main__ is in global scope, no closure conversion is required
This commit is contained in:
Reid 'arrdem' McKenzie 2021-10-11 22:26:44 -06:00
parent b18edf1c4a
commit 522f1b6628

View file

@ -111,6 +111,158 @@ nfsboot = "/var/lib/clusterctrl/boot/"
nfsroot = "/var/lib/clusterctrl/nfs/" nfsroot = "/var/lib/clusterctrl/nfs/"
def send_cmd(c, cmd, data0=None, data1=None, data2=None, data3=None, data4=None, data5=None, data6=None, data7=None):
"""Send command to ClusterCTRL via I2C."""
#print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3, data4, data5, data6, data7))
if (data7 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA7, data7)
if (data6 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA6, data6)
if (data5 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA5, data5)
if (data4 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA4, data4)
if (data3 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA3, data3)
if (data2 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA2, data2)
if (data1 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA1, data1)
if (data0 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA0, data0)
try:
c[1].write_byte_data(I2C_ADDRESS, REG_CMD, cmd)
except IOError:
return False
def read_reg(c, offset, len=1):
"""Read register from ClusterCTRL via I2C."""
if (len>1):
tmp = c[1].read_i2c_block_data(I2C_ADDRESS, offset, len)
else:
tmp = c[1].read_byte_data(I2C_ADDRESS, offset)
return tmp
def get_throttled():
"""Get throttled status."""
if (not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath, os.X_OK)):
return "NA"
return ((os.popen(vcgencmdpath + " get_throttled").readline()).split("=", 1)[-1].strip())
def usbpathfrombus(bus):
"""Get USB path (eg 1-1.4.1) for I2C bus."""
for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"):
parts = device.split("/")
path = parts[6].split(":")[0]
id = parts[7][4:]
if int(id) == bus:
return path
return False
def getusbpaths():
"""Build list of pi zero numbers to get USB path of."""
paths = {}
zeros = []
if (args > 2):
for zero in sys.argv[2:]:
if (zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi)):
print("ERROR: Valid options are p1-p"+str(maxpi))
sys.exit(1)
zeros.append(int(zero[1:]))
else:
zeros = range(1, maxpi+1)
cache_clusterhat = None # USB path to HUB on Cluster HAT
cache_clusterctrl = {} # Cache of ClusterCTRL USB path prefixes
for zero in zeros:
lastpi = 0 # max pX for the current device
# Get USB path to pi device
if (clusterhat):
lastpi+=clusterhat_size
if (zero<=lastpi):
if (version == 1):
if "clusterhatv1" in config:
paths[str(zero)] = config["clusterhatv1"]+"."+str(5-zero)
if (version == 2):
if cache_clusterhat == None:
# Detect Cluster HAT by turning the HUB on / off / on
# First ensure the hub is turned on
if (version_minor == 0):
hub.on()
else:
hub.off()
time.sleep(1)
# Get list of USB hubs with the correct pid/vid
import usb.core as prescan
devices = {}
hubs = prescan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1)
for clusterhathub in hubs:
devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "pre"
pre_count = len(devices)
# Turn hub off
if (version_minor == 0):
hub.off()
else:
hub.on()
time.sleep(1)
import usb.core as postscan
hubs = postscan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1)
for clusterhathub in hubs:
devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "post"
post_count = len(devices)
# Check we haven't gained an extra USB hubs
if pre_count == post_count:
found = 0
for path, state in devices.iteritems():
if (state=="pre"):
found=found+1
cache_clusterhat=path
# Turn hub back on
if (version_minor == 0):
hub.on()
else:
hub.off()
# If more than one hub went awol then we don't know which one it should be
if found != 1: cache_clusterhat=None
if (cache_clusterhat != None): paths[str(zero)] = cache_clusterhat+"."+str(5-zero)
if (clusterctrl):
for c in ctrl:
lastpi+=c[3]
if (zero<=lastpi and zero > lastpi-c[3]):
if (c[0] not in cache_clusterctrl):
# Get USB controllers path
usbpathname = usbpathfrombus(c[2])
# Get path to controller
send_cmd(c, CMD_GETPATH, 0)
# Remove controllers path from usbpathname
pathdata = ""
for tmp in read_reg(c, REG_DATA7, len=8):
if tmp!=255:
if (len(pathdata)>0): pathdata=pathdata+"."
pathdata=pathdata+str(tmp)
usbpathname=usbpathname[:-len(pathdata)]
cache_clusterctrl[c[0]] = usbpathname
# Append path to Px
send_cmd(c, CMD_GETPATH, zero-lastpi+c[3])
pathdata = ""
for tmp in read_reg(c, REG_DATA7, len=8):
if tmp!=255:
if (len(pathdata)>0): pathdata=pathdata+"."
pathdata=pathdata+str(tmp)
paths[str(zero)] = cache_clusterctrl[c[0]]+pathdata
return paths
def is_float(n):
try:
float(n)
return True
except ValueError:
return False
if __name__ == "__main__": if __name__ == "__main__":
args = len(sys.argv) args = len(sys.argv)
@ -182,150 +334,6 @@ if __name__ == "__main__":
print("Unable to load config, or invalid config loaded", file=sys.stderr) print("Unable to load config, or invalid config loaded", file=sys.stderr)
sys.exit(1) sys.exit(1)
# Functions
# Send command to ClusterCTRL via I2C
def send_cmd(c, cmd, data0=None, data1=None, data2=None, data3=None, data4=None, data5=None, data6=None, data7=None):
#print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3, data4, data5, data6, data7))
if (data7 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA7, data7)
if (data6 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA6, data6)
if (data5 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA5, data5)
if (data4 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA4, data4)
if (data3 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA3, data3)
if (data2 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA2, data2)
if (data1 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA1, data1)
if (data0 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA0, data0)
try:
c[1].write_byte_data(I2C_ADDRESS, REG_CMD, cmd)
except IOError:
return False
# Read register from ClusterCTRL via I2C
def read_reg(c, offset, len=1):
if (len>1):
tmp = c[1].read_i2c_block_data(I2C_ADDRESS, offset, len)
else:
tmp = c[1].read_byte_data(I2C_ADDRESS, offset)
return tmp
# Get throttled status
def get_throttled():
if (not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath, os.X_OK)):
return "NA"
return ((os.popen(vcgencmdpath + " get_throttled").readline()).split("=", 1)[-1].strip())
# Get USB path (eg 1-1.4.1) for I2C bus
def usbpathfrombus(bus):
for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"):
parts = device.split("/")
path = parts[6].split(":")[0]
id = parts[7][4:]
if int(id) == bus:
return path
return False
# Build list of pi zero numbers to get USB path of
def getusbpaths():
paths = {}
zeros = []
if (args > 2):
for zero in sys.argv[2:]:
if (zero[0] != "p" or (int(zero[1:]) < 1 or int(zero[1:]) > maxpi)):
print("ERROR: Valid options are p1-p"+str(maxpi))
sys.exit(1)
zeros.append(int(zero[1:]))
else:
zeros = range(1, maxpi+1)
cache_clusterhat = None # USB path to HUB on Cluster HAT
cache_clusterctrl = {} # Cache of ClusterCTRL USB path prefixes
for zero in zeros:
lastpi = 0 # max pX for the current device
# Get USB path to pi device
if (clusterhat):
lastpi+=clusterhat_size
if (zero<=lastpi):
if (version == 1):
if "clusterhatv1" in config:
paths[str(zero)] = config["clusterhatv1"]+"."+str(5-zero)
if (version == 2):
if cache_clusterhat == None:
# Detect Cluster HAT by turning the HUB on / off / on
# First ensure the hub is turned on
if (version_minor == 0):
hub.on()
else:
hub.off()
time.sleep(1)
# Get list of USB hubs with the correct pid/vid
import usb.core as prescan
devices = {}
hubs = prescan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1)
for clusterhathub in hubs:
devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "pre"
pre_count = len(devices)
# Turn hub off
if (version_minor == 0):
hub.off()
else:
hub.on()
time.sleep(1)
import usb.core as postscan
hubs = postscan.find(idVendor=0x05e3, idProduct=0x0608, find_all=1)
for clusterhathub in hubs:
devices[str(clusterhathub.bus)+"-"+".".join(map(str, clusterhathub.port_numbers))] = "post"
post_count = len(devices)
# Check we haven't gained an extra USB hubs
if pre_count == post_count:
found = 0
for path, state in devices.iteritems():
if (state=="pre"):
found=found+1
cache_clusterhat=path
# Turn hub back on
if (version_minor == 0):
hub.on()
else:
hub.off()
# If more than one hub went awol then we don't know which one it should be
if found != 1: cache_clusterhat=None
if (cache_clusterhat != None): paths[str(zero)] = cache_clusterhat+"."+str(5-zero)
if (clusterctrl):
for c in ctrl:
lastpi+=c[3]
if (zero<=lastpi and zero > lastpi-c[3]):
if (c[0] not in cache_clusterctrl):
# Get USB controllers path
usbpathname = usbpathfrombus(c[2])
# Get path to controller
send_cmd(c, CMD_GETPATH, 0)
# Remove controllers path from usbpathname
pathdata = ""
for tmp in read_reg(c, REG_DATA7, len=8):
if tmp!=255:
if (len(pathdata)>0): pathdata=pathdata+"."
pathdata=pathdata+str(tmp)
usbpathname=usbpathname[:-len(pathdata)]
cache_clusterctrl[c[0]] = usbpathname
# Append path to Px
send_cmd(c, CMD_GETPATH, zero-lastpi+c[3])
pathdata = ""
for tmp in read_reg(c, REG_DATA7, len=8):
if tmp!=255:
if (len(pathdata)>0): pathdata=pathdata+"."
pathdata=pathdata+str(tmp)
paths[str(zero)] = cache_clusterctrl[c[0]]+pathdata
return paths
def is_float(n):
try:
float(n)
return True
except ValueError:
return False
########## ##########
# Init # # Init #
########## ##########