Hoist out helper fns

Note that since __main__ is in global scope, no closure conversion is required
This commit is contained in:
Reid 'arrdem' McKenzie 2021-10-11 22:26:44 -06:00
parent 78d017223c
commit bee293b9d2

View file

@ -111,80 +111,8 @@ nfsboot = "/var/lib/clusterctrl/boot/"
nfsroot = "/var/lib/clusterctrl/nfs/" nfsroot = "/var/lib/clusterctrl/nfs/"
if __name__ == "__main__": def send_cmd(c, cmd, data0=None, data1=None, data2=None, data3=None, data4=None, data5=None, data6=None, data7=None):
args = len(sys.argv) """Send command to ClusterCTRL via I2C."""
if (args == 1 or sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "-h" or sys.argv[1] == "/?"):
print("""Usage :{0} <cmd>
<devices> can be a single device 'p1' or a list 'p2 p3 p5'
<order> is the order listed by '{0} status' (default 20)
# Power on/off all or listed device(s)
{0} on|off [<devices>]
# Show status of ClusterHAT/CTRL
{0} status
# Get number of controllable Pi
{0} maxpi
# Create/update symlinks for rpiboot [root]"
sudo {0} init
# Turn ALERT LED on/off for all or listed device(s)"
{0} alert on|off [<devices>]
# Enable LED (Power/pX/etc.)
{0} led on
# Disable LED (Power/pX/etc.)
{0} led off
# Turns on/off or resets the USB HUB
{0} hub off|on|reset
## The following are only available on ClusterCTRL devices
# Set order on device <old> to <new>
{0} setorder <old> <new>
# Get USB path to Px
{0} getpath <device>
# Turns FAN on/off for CTRL with <order>
{0} fan on|off <order>
# Save current settings to EEPROM
{0} save <order>
# Save current order to EEPROM
{0} saveorder <order>
# Save current Power On State to EEPROM
{0} savepos <order>
# Save factory default settings to EEPROM
{0} savedefaults <order>
""" .format(sys.argv[0]))
sys.exit()
# Read configruation file
config = {}
if os.path.isfile("/etc/default/clusterctrl"):
with open ("/etc/default/clusterctrl") as configfile:
for line in configfile:
if (line[:1] != "#"):
k, v = line.partition("=")[::2]
config[k.strip().lower()] = v.split("#")[0].strip(" \"'\n\t")
# If we're not a controller of some sort exit cleanly
if ("type" not in config or not (config["type"] == "c" or config["type"] == "cnat")):
print("Unable to load config, or invalid config loaded", file=sys.stderr)
sys.exit(1)
# Functions
# Send command to ClusterCTRL via I2C
def send_cmd(c, cmd, data0=None, data1=None, data2=None, data3=None, data4=None, data5=None, data6=None, data7=None):
#print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3, data4, data5, data6, data7)) #print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3, data4, data5, data6, data7))
if (data7 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA7, data7) if (data7 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA7, data7)
if (data6 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA6, data6) if (data6 is not None): c[1].write_byte_data(I2C_ADDRESS, REG_DATA6, data6)
@ -199,32 +127,40 @@ if __name__ == "__main__":
except IOError: except IOError:
return False return False
# Read register from ClusterCTRL via I2C
def read_reg(c, offset, len=1): def read_reg(c, offset, len=1):
"""Read register from ClusterCTRL via I2C."""
if (len>1): if (len>1):
tmp = c[1].read_i2c_block_data(I2C_ADDRESS, offset, len) tmp = c[1].read_i2c_block_data(I2C_ADDRESS, offset, len)
else: else:
tmp = c[1].read_byte_data(I2C_ADDRESS, offset) tmp = c[1].read_byte_data(I2C_ADDRESS, offset)
return tmp return tmp
# Get throttled status
def get_throttled(): def get_throttled():
"""Get throttled status."""
if (not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath, os.X_OK)): if (not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath, os.X_OK)):
return "NA" return "NA"
return ((os.popen(vcgencmdpath + " get_throttled").readline()).split("=", 1)[-1].strip()) return ((os.popen(vcgencmdpath + " get_throttled").readline()).split("=", 1)[-1].strip())
# Get USB path (eg 1-1.4.1) for I2C bus
def usbpathfrombus(bus): def usbpathfrombus(bus):
"""Get USB path (eg 1-1.4.1) for I2C bus."""
for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"): for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"):
parts = device.split("/") parts = device.split("/")
path = parts[6].split(":")[0] path = parts[6].split(":")[0]
id = parts[7][4:] id = parts[7][4:]
if int(id) == bus: if int(id) == bus:
return path return path
return False return False
# Build list of pi zero numbers to get USB path of
def getusbpaths(): def getusbpaths():
"""Build list of pi zero numbers to get USB path of."""
paths = {} paths = {}
zeros = [] zeros = []
@ -319,13 +255,85 @@ if __name__ == "__main__":
paths[str(zero)] = cache_clusterctrl[c[0]]+pathdata paths[str(zero)] = cache_clusterctrl[c[0]]+pathdata
return paths return paths
def is_float(n): def is_float(n):
try: try:
float(n) float(n)
return True return True
except ValueError: except ValueError:
return False return False
if __name__ == "__main__":
args = len(sys.argv)
if (args == 1 or sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "-h" or sys.argv[1] == "/?"):
print("""Usage :{0} <cmd>
<devices> can be a single device 'p1' or a list 'p2 p3 p5'
<order> is the order listed by '{0} status' (default 20)
# Power on/off all or listed device(s)
{0} on|off [<devices>]
# Show status of ClusterHAT/CTRL
{0} status
# Get number of controllable Pi
{0} maxpi
# Create/update symlinks for rpiboot [root]"
sudo {0} init
# Turn ALERT LED on/off for all or listed device(s)"
{0} alert on|off [<devices>]
# Enable LED (Power/pX/etc.)
{0} led on
# Disable LED (Power/pX/etc.)
{0} led off
# Turns on/off or resets the USB HUB
{0} hub off|on|reset
## The following are only available on ClusterCTRL devices
# Set order on device <old> to <new>
{0} setorder <old> <new>
# Get USB path to Px
{0} getpath <device>
# Turns FAN on/off for CTRL with <order>
{0} fan on|off <order>
# Save current settings to EEPROM
{0} save <order>
# Save current order to EEPROM
{0} saveorder <order>
# Save current Power On State to EEPROM
{0} savepos <order>
# Save factory default settings to EEPROM
{0} savedefaults <order>
""" .format(sys.argv[0]))
sys.exit()
# Read configruation file
config = {}
if os.path.isfile("/etc/default/clusterctrl"):
with open ("/etc/default/clusterctrl") as configfile:
for line in configfile:
if (line[:1] != "#"):
k, v = line.partition("=")[::2]
config[k.strip().lower()] = v.split("#")[0].strip(" \"'\n\t")
# If we're not a controller of some sort exit cleanly
if ("type" not in config or not (config["type"] == "c" or config["type"] == "cnat")):
print("Unable to load config, or invalid config loaded", file=sys.stderr)
sys.exit(1)
########## ##########
# Init # # Init #
########## ##########