Merry Chipmunks
This commit is contained in:
parent
cd28e3dafe
commit
adf58a2b7d
18 changed files with 878 additions and 253 deletions
projects
cabal/src/cabal
lcdbackpack
tickertape
56
projects/cabal/src/cabal/__main__.py
Normal file
56
projects/cabal/src/cabal/__main__.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import socket
|
||||
import socketserver
|
||||
from pathlib import Path
|
||||
from subprocess import check_output
|
||||
import shlex
|
||||
import os
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
CONFIG = None
|
||||
|
||||
|
||||
class RequestHandler(socketserver.BaseRequestHandler):
|
||||
def setup(self) -> None:
|
||||
logging.info("Start request.")
|
||||
|
||||
def handle(self) -> None:
|
||||
conn = self.request
|
||||
while True:
|
||||
data = conn.recv(1024).decode("utf-8")
|
||||
|
||||
if not data:
|
||||
break
|
||||
|
||||
data = shlex.split(data)
|
||||
|
||||
logging.info(f"recv: {data!r}")
|
||||
if command_allowed(data):
|
||||
resp = check_output()
|
||||
conn.sendall(resp)
|
||||
|
||||
else:
|
||||
logging.error("Invalid request %r", data)
|
||||
|
||||
def finish(self) -> None:
|
||||
logging.info("Finish request.")
|
||||
|
||||
|
||||
class Server(socketserver.ThreadingUnixStreamServer):
|
||||
def server_activate(self) -> None:
|
||||
logging.info("Server started on %s", self.server_address)
|
||||
super().server_activate()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
socket_path = Path(sys.argv[1])
|
||||
|
||||
if socket_path.exists():
|
||||
socket_path.unlink()
|
||||
|
||||
with Server(str(socket_path), RequestHandler) as server:
|
||||
server.serve_forever()
|
7
projects/lcdbackpack/BUILD.bazel
Normal file
7
projects/lcdbackpack/BUILD.bazel
Normal file
|
@ -0,0 +1,7 @@
|
|||
py_project(
|
||||
name = "lcdbackpack",
|
||||
lib_deps = [
|
||||
py_requirement("click"),
|
||||
py_requirement("pyserial"),
|
||||
],
|
||||
)
|
347
projects/lcdbackpack/src/lcdbackpack/__init__.py
Normal file
347
projects/lcdbackpack/src/lcdbackpack/__init__.py
Normal file
|
@ -0,0 +1,347 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import serial
|
||||
import sys
|
||||
from time import sleep
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
|
||||
|
||||
class LcdBackpack:
|
||||
"""
|
||||
The LcdBackpack class exposes the commands available on the Adafruit LCD USB/Serial Backpack via simple methods.
|
||||
"""
|
||||
|
||||
# Quoting from the official firmware implementation -
|
||||
CHANGESPLASH = 0x40 # COL * ROW chars!
|
||||
DISPLAY_ON = 0x42 # backlight. 1 argument afterwards, in minutes
|
||||
AUTOWRAPLINE_ON = 0x43
|
||||
AUTOWRAPLINE_OFF = 0x44
|
||||
DISPLAY_OFF = 0x46
|
||||
SETCURSOR_POSITION = 0x47 # 2 args: col, row
|
||||
SETCURSOR_HOME = 0x48
|
||||
UNDERLINECURSOR_ON = 0x4A
|
||||
UNDERLINECURSOR_OFF = 0x4B
|
||||
MOVECURSOR_BACK = 0x4C
|
||||
MOVECURSOR_FORWARD = 0x4D
|
||||
CUSTOM_CHARACTER = 0x4E # 9 args: char #, 8 bytes data
|
||||
SET_CONTRAST = 0x50 # 1 arg
|
||||
AUTOSCROLL_ON = 0x51
|
||||
AUTOSCROLL_OFF = 0x52
|
||||
BLOCKCURSOR_ON = 0x53
|
||||
BLOCKCURSOR_OFF = 0x54
|
||||
GPO_OFF = 0x56
|
||||
GPO_ON = 0x57
|
||||
CLEAR = 0x58
|
||||
SETSAVE_CONTRAST = 0x91 # 1 arg
|
||||
SET_BRIGHTNESS = 0x99 # 1 arg: scale
|
||||
SETSAVE_BRIGHTNESS = 0x98 # 1 arg: scale
|
||||
LOADCUSTOMCHARBANK = 0xC0 # 9 args: char #, 8 bytes data
|
||||
SAVECUSTOMCHARBANK = 0xC1 # 9 args: char #, 8 bytes data
|
||||
GPO_START_ONOFF = 0xC3
|
||||
RGBBACKLIGHT = 0xD0 # 3 args - R G B
|
||||
SETSIZE = 0xD1 # 2 args - Cols & Rows
|
||||
TESTBAUD = 0xD2 # zero args, prints baud rate to uart
|
||||
STARTL_COMMAND = 0xFE # magic byte indicating a subsequent command list
|
||||
|
||||
def __init__(self, serial_device, baud_rate=115200, rows=16, cols=2):
|
||||
self._baud_rate = baud_rate
|
||||
self._serial_device = os.path.realpath(serial_device)
|
||||
self._ser: Optional[serial.Serial] = None
|
||||
self._rows = rows
|
||||
self._cols = cols
|
||||
|
||||
def __enter__(self) -> "LcdBackpack":
|
||||
"""
|
||||
Connects to the serial port.
|
||||
"""
|
||||
self.connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if self.connected:
|
||||
self.disconnect()
|
||||
|
||||
def connect(self):
|
||||
"""
|
||||
Manually open a connection.
|
||||
"""
|
||||
self._ser = serial.Serial(self._serial_device, self._baud_rate, timeout=1)
|
||||
|
||||
def disconnect(self):
|
||||
"""
|
||||
Closes the serial port connection.
|
||||
"""
|
||||
if self._ser is not None and self._ser.is_open:
|
||||
self._ser.close()
|
||||
self._ser = None
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
"""
|
||||
Returns the state of the serial connection.
|
||||
:return: True if the serial port is open (connected), False otherwise.
|
||||
"""
|
||||
return self._ser is not None and self._ser.is_open
|
||||
|
||||
def _write_command(self, command_list):
|
||||
"""
|
||||
Writes the given command list to the LCD back pack.
|
||||
:param command_list: The commands to be written to the LCD back pack.
|
||||
"""
|
||||
assert self.connected, "Connection required"
|
||||
self._ser.write(bytes([LcdBackpack.STARTL_COMMAND] + command_list))
|
||||
|
||||
def display_on(self):
|
||||
"""
|
||||
Switches the LCD backlight on.
|
||||
"""
|
||||
self._write_command([LcdBackpack.DISPLAY_ON, 0])
|
||||
|
||||
def display_off(self):
|
||||
"""
|
||||
Switches the LCD backlight off.
|
||||
"""
|
||||
self._write_command([LcdBackpack.DISPLAY_OFF])
|
||||
|
||||
def set_brightness(self, brightness: int):
|
||||
"""
|
||||
Sets the brightness of the LCD backlight.
|
||||
:param brightness: integer value from 0 - 255
|
||||
"""
|
||||
assert 0 <= brightness <= 255
|
||||
self._write_command([LcdBackpack.SET_BRIGHTNESS, brightness])
|
||||
|
||||
def set_contrast(self, contrast: int):
|
||||
"""
|
||||
Sets the contrast of the LCD character text.
|
||||
:param contrast: integer value from 0 - 255
|
||||
"""
|
||||
assert 0 <= contrast <= 255
|
||||
self._write_command([LcdBackpack.SET_CONTRAST, contrast])
|
||||
|
||||
def set_autoscroll(self, auto_scroll: bool):
|
||||
"""
|
||||
Sets the autoscrolling capability to the value provided.
|
||||
:param auto_scroll: true/false
|
||||
"""
|
||||
if auto_scroll:
|
||||
self._write_command([LcdBackpack.AUTOSCROLL_ON])
|
||||
else:
|
||||
self._write_command([LcdBackpack.AUTOSCROLL_OFF])
|
||||
|
||||
def set_autowrapline(self, auto_wrap: bool):
|
||||
"""
|
||||
Sets the autowrapping capability to the value provided.
|
||||
:param auto_wrap: true/false
|
||||
"""
|
||||
if auto_wrap:
|
||||
self._write_command([LcdBackpack.AUTOWRAPLINE_ON])
|
||||
else:
|
||||
self._write_command([LcdBackpack.AUTOWRAPLINE_OFF])
|
||||
|
||||
def set_cursor_position(self, column: int, row: int):
|
||||
"""
|
||||
Moves the cursor to the provided position.
|
||||
:param column: integer value for column posititon starting from 1
|
||||
:param row: integer value for row position starting from 1
|
||||
"""
|
||||
assert 1 <= column <= 255
|
||||
assert 1 <= row <= 255
|
||||
self._write_command([LcdBackpack.SETCURSOR_POSITION, column, row])
|
||||
|
||||
def set_cursor_home(self):
|
||||
"""
|
||||
Moves the cursor to the "home" positon: column = 1, row = 1.
|
||||
"""
|
||||
self._write_command([LcdBackpack.SETCURSOR_HOME])
|
||||
|
||||
def cursor_forward(self):
|
||||
"""
|
||||
Moves the cursor forward one character.
|
||||
"""
|
||||
self._write_command([LcdBackpack.MOVECURSOR_FORWARD])
|
||||
|
||||
def cursor_back(self):
|
||||
"""
|
||||
Moves the cursor backward one character.
|
||||
"""
|
||||
self._write_command([LcdBackpack.MOVECURSOR_BACK])
|
||||
|
||||
def set_underline_cursor(self, underline_cursor: bool):
|
||||
"""
|
||||
Enables/disables the underline cursor.
|
||||
:param underline_cursor: true/false
|
||||
"""
|
||||
if underline_cursor:
|
||||
self._write_command([LcdBackpack.UNDERLINECURSOR_ON])
|
||||
else:
|
||||
self._write_command([LcdBackpack.UNDERLINECURSOR_OFF])
|
||||
|
||||
def set_block_cursor(self, block_cursor: bool):
|
||||
"""
|
||||
Enables/disables the block cursor.
|
||||
:param block_cursor:
|
||||
"""
|
||||
if block_cursor:
|
||||
self._write_command([LcdBackpack.BLOCKCURSOR_OFF])
|
||||
else:
|
||||
self._write_command([LcdBackpack.BLOCKCURSOR_ON])
|
||||
|
||||
def set_backlight_rgb(self, red: int, green: int, blue: int):
|
||||
"""
|
||||
Sets the RGB LCD backlight to the colour provided by red, green and blue values.
|
||||
:param red: integer value 0 - 255
|
||||
:param green: integer value 0 - 255
|
||||
:param blue: integer value 0 - 255
|
||||
"""
|
||||
assert 0 <= red <= 255
|
||||
assert 0 <= green <= 255
|
||||
assert 0 <= blue <= 255
|
||||
self._write_command([LcdBackpack.RGBBACKLIGHT, red, green, blue])
|
||||
|
||||
def set_backlight_red(self):
|
||||
"""
|
||||
Sets the backlight of an RGB LCD display to be red.
|
||||
"""
|
||||
self.set_backlight_rgb(0xFF, 0, 0)
|
||||
|
||||
def set_backlight_green(self):
|
||||
"""
|
||||
Sets the backlight of an RGB LCD display to be green.
|
||||
"""
|
||||
self.set_backlight_rgb(0, 0xFF, 0)
|
||||
|
||||
def set_backlight_blue(self):
|
||||
"""
|
||||
Sets the backlight of an RGB LCD display to be blue.
|
||||
"""
|
||||
self.set_backlight_rgb(0, 0, 0xFF)
|
||||
|
||||
def set_backlight_white(self):
|
||||
"""
|
||||
Sets the backlight of an RGB LCD display to be white.
|
||||
"""
|
||||
self.set_backlight_rgb(0xFF, 0xFF, 0xFF)
|
||||
|
||||
def set_lcd_size(self, columns: int, rows: int):
|
||||
"""
|
||||
Sets the size of the LCD display (columns, rows).
|
||||
:param columns: the number of columns
|
||||
:param rows: the number of rows.
|
||||
"""
|
||||
assert 0 <= columns <= 255
|
||||
assert 0 <= rows <= 255
|
||||
self._write_command([LcdBackpack.SETSIZE, columns, rows])
|
||||
|
||||
def set_gpio_high(self, gpio: int):
|
||||
"""
|
||||
Sets the given GPIO pin on the LCD back pack HIGH (5V).
|
||||
:param gpio: the GPIO pin to set HIGH (1 - 4)
|
||||
"""
|
||||
assert 1 <= gpio <= 4
|
||||
self._write_command([LcdBackpack.GPO_ON, gpio])
|
||||
|
||||
def set_gpio_low(self, gpio: int):
|
||||
"""
|
||||
Sets the given GPIO pin on the LCD back pack LOW (5V).
|
||||
:param gpio: the GPIO pin to set LOW (1 - 4)
|
||||
"""
|
||||
assert 1 <= gpio <= 4
|
||||
self._write_command([LcdBackpack.GPO_OFF, gpio])
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
Clears all the characters from the display.
|
||||
"""
|
||||
self._write_command([LcdBackpack.CLEAR])
|
||||
|
||||
def write(self, string):
|
||||
"""
|
||||
Writes the given text on the LCD display.
|
||||
:param string: the text to be written on the display.
|
||||
"""
|
||||
if self._ser is None or self._ser.closed:
|
||||
raise serial.SerialException('Not connected')
|
||||
|
||||
self._ser.write(str.encode(string))
|
||||
|
||||
def set_splash_screen(self, string):
|
||||
"""
|
||||
Sets the LCD splash screen.
|
||||
:param string: the text to be displayed at LCD start up
|
||||
:param lcd_chars: the total characters of the LCD display
|
||||
"""
|
||||
self._write_command([LcdBackpack.CHANGESPLASH] + list(str.encode(string).ljust(self._cols*self._rows)))
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("-d", "--device", "device", default="/dev/ttyACM0")
|
||||
@click.option("-b", "--baud", "baud", type=int, default=115200)
|
||||
@click.pass_context
|
||||
def cli(ctx, device, baud):
|
||||
ctx.obj = LcdBackpack(device, baud)
|
||||
|
||||
if ctx.invoked_subcommand is None:
|
||||
ctx.invoke(do_write)
|
||||
|
||||
|
||||
@cli.command("on")
|
||||
@click.pass_context
|
||||
def do_on(ctx):
|
||||
with ctx.obj as dev:
|
||||
dev.display_on()
|
||||
|
||||
|
||||
@cli.command("off")
|
||||
@click.pass_context
|
||||
def do_off(ctx):
|
||||
with ctx.obj as dev:
|
||||
dev.display_off()
|
||||
|
||||
|
||||
@cli.command("backlight")
|
||||
@click.argument("brightness", type=int)
|
||||
@click.pass_context
|
||||
def do_backlight(ctx, brightness):
|
||||
with ctx.obj as dev:
|
||||
dev.set_brightness(brightness)
|
||||
|
||||
|
||||
@cli.command("contrast")
|
||||
@click.argument("contrast", type=int)
|
||||
@click.pass_context
|
||||
def do_contrast(ctx, contrast):
|
||||
with ctx.obj as dev:
|
||||
dev.set_contrast(contrast)
|
||||
|
||||
|
||||
@cli.command("clear")
|
||||
@click.pass_context
|
||||
def do_clear(ctx):
|
||||
with ctx.obj as dev:
|
||||
dev.clear()
|
||||
|
||||
|
||||
@cli.command("write")
|
||||
@click.option("-d", "--delay", "delay", type=int, default=100, help="Number of ms to delay between sending lines")
|
||||
@click.pass_context
|
||||
def do_write(ctx, delay):
|
||||
with ctx.obj as dev:
|
||||
buff = [""] * dev._cols
|
||||
for line in sys.stdin:
|
||||
line = line[:dev._rows].strip()
|
||||
buff = buff[1:] + [line]
|
||||
|
||||
dev.clear()
|
||||
for l in buff:
|
||||
dev.write(l)
|
||||
dev.write("\r")
|
||||
|
||||
sleep(delay/1e3)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
9
projects/tickertape/BUILD.bazel
Normal file
9
projects/tickertape/BUILD.bazel
Normal file
|
@ -0,0 +1,9 @@
|
|||
py_project(
|
||||
name = "tickertape",
|
||||
shebang = "/usr/bin/env python3",
|
||||
main = "src/tickertape/__main__.py",
|
||||
main_deps = [
|
||||
"//projects/lcdbackpack",
|
||||
py_requirement("click"),
|
||||
],
|
||||
)
|
5
projects/tickertape/Dockerfile
Normal file
5
projects/tickertape/Dockerfile
Normal file
|
@ -0,0 +1,5 @@
|
|||
FROM docker.io/library/python:3.13
|
||||
|
||||
COPY tickertape.zapp /usr/bin/tickertape
|
||||
|
||||
ENTRYPOINT ["/usr/bin/tickertape"]
|
55
projects/tickertape/PKGBUILD
Normal file
55
projects/tickertape/PKGBUILD
Normal file
|
@ -0,0 +1,55 @@
|
|||
pkgname=("tickertape")
|
||||
pkgver="0.1.0"
|
||||
pkgrel=1
|
||||
epoch=1
|
||||
pkgdesc="A LCD ticker for host details"
|
||||
|
||||
arch=("any")
|
||||
|
||||
depends=(
|
||||
python
|
||||
)
|
||||
|
||||
source=(
|
||||
tickertape.service
|
||||
config.toml
|
||||
)
|
||||
|
||||
sha256sums=(
|
||||
"SKIP"
|
||||
"SKIP"
|
||||
)
|
||||
|
||||
makedepends=(
|
||||
python
|
||||
bazel
|
||||
)
|
||||
|
||||
# Input file verification
|
||||
verify() {
|
||||
true
|
||||
}
|
||||
|
||||
# Input file patching
|
||||
prepare() {
|
||||
true
|
||||
}
|
||||
|
||||
# Building
|
||||
build() {
|
||||
cd ..
|
||||
bazel build :tickertape.zapp
|
||||
cp -f $(bazel run --run_under=echo :tickertape.zapp) $srcdir/
|
||||
}
|
||||
|
||||
# Post-build verification (tests)
|
||||
check() {
|
||||
true
|
||||
}
|
||||
|
||||
# Emplacing artifacts
|
||||
package() {
|
||||
install -Dm755 tickertape.zapp "${pkgdir}"/usr/bin/tickertape
|
||||
install -Dm644 tickertape.service "${pkgdir}"/usr/lib/systemd/system/tickertape.service
|
||||
install -Dm644 config.toml "${pkgdir}"/etc/tickertape/config.toml
|
||||
}
|
17
projects/tickertape/mkdocker.sh
Normal file
17
projects/tickertape/mkdocker.sh
Normal file
|
@ -0,0 +1,17 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
set -eoux -o pipefail
|
||||
|
||||
cd "$(dirname "$(realpath "$0")")"
|
||||
tmpdir=$(mktemp -d)
|
||||
target="//projects/tickertape:tickertape.zapp"
|
||||
|
||||
bazel build "${target}"
|
||||
zapp=$(realpath $(bazel run --run_under=echo "${target}"))
|
||||
|
||||
cp Dockerfile "$tmpdir/"
|
||||
cp -r "$zapp" "$tmpdir/"
|
||||
|
||||
cd "${tmpdir}"
|
||||
docker build "$@" -f Dockerfile -t registry.tirefireind.us/arrdem/tickertape:latest .
|
||||
docker push registry.tirefireind.us/arrdem/tickertape:latest
|
BIN
projects/tickertape/pkg/tickertape/usr/bin/tickertape
Executable file
BIN
projects/tickertape/pkg/tickertape/usr/bin/tickertape
Executable file
Binary file not shown.
|
@ -0,0 +1,8 @@
|
|||
[Unit]
|
||||
Description=A tickertape for Adafruit LCD displays
|
||||
|
||||
[Service]
|
||||
ExecStart=/usr/bin/tickertape --config /etc/tickertape/config.toml
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
1
projects/tickertape/src/tickertape.service
Symbolic link
1
projects/tickertape/src/tickertape.service
Symbolic link
|
@ -0,0 +1 @@
|
|||
/home/arrdem/Documents/hobby/programming/source/projects/tickertape/tickertape.service
|
247
projects/tickertape/src/tickertape/__main__.py
Normal file
247
projects/tickertape/src/tickertape/__main__.py
Normal file
|
@ -0,0 +1,247 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from datetime import datetime
|
||||
import ipaddress
|
||||
from itertools import islice
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
from shutil import which
|
||||
import signal
|
||||
import socket
|
||||
from subprocess import check_call, check_output
|
||||
from time import sleep
|
||||
import tomllib
|
||||
from typing import Iterable
|
||||
|
||||
import click
|
||||
from lcdbackpack import LcdBackpack
|
||||
|
||||
|
||||
SPLASH_TEXT = "%-16s\r%-16s" % ("TireFire", " Industries")
|
||||
DEFAULT_CONFIG = {
|
||||
"tickertape": {
|
||||
"splash_text": SPLASH_TEXT,
|
||||
"clock_rate": 3,
|
||||
"character_rate": 0.75,
|
||||
},
|
||||
"lcd": {
|
||||
"path": "/dev/serial/by-id/usb-239a_Adafruit_Industries-if00",
|
||||
"height": 2,
|
||||
"width": 16,
|
||||
"brightness": 16,
|
||||
},
|
||||
"proc": {"path": "/proc"},
|
||||
"dev": {"path": "/dev"},
|
||||
}
|
||||
|
||||
|
||||
def hostname() -> str:
|
||||
hn = os.getenv("TICKERTAPE_HOSTNAME") or socket.gethostname()
|
||||
|
||||
def _handler(m):
|
||||
if m.group(2) and len(m.group(2)) > 3:
|
||||
return f"{m.group(1)}..."
|
||||
else:
|
||||
return m.group(0)
|
||||
|
||||
return re.sub("(.{1,13})(.*)", _handler, hn)
|
||||
|
||||
|
||||
def defaultaddr(
|
||||
subnets: Iterable[ipaddress.IPv4Network] = (ipaddress.IPv4Network("10.0.0.0/8"),)
|
||||
) -> str:
|
||||
# Get all the IPv4 addresses for this host
|
||||
addresses = [
|
||||
ipaddress.IPv4Address(it)
|
||||
for it in socket.gethostbyname_ex(socket.gethostname())[-1]
|
||||
]
|
||||
# Search out
|
||||
addresses = [
|
||||
it
|
||||
for it in addresses
|
||||
if any(net.network_address <= it <= net.broadcast_address for net in subnets)
|
||||
]
|
||||
|
||||
for ip in addresses:
|
||||
yield f"Addr: {ip}"
|
||||
|
||||
|
||||
def render_duration(uptime_seconds):
|
||||
uptime_seconds = int(uptime_seconds)
|
||||
|
||||
sec = uptime_seconds % 60
|
||||
|
||||
minutes = (uptime_seconds // 60) % 60
|
||||
minutes = f"{minutes}m" if minutes > 0 else ""
|
||||
|
||||
hours = uptime_seconds // 60 // 60 % 24
|
||||
hours = f"{hours}h" if hours > 0 else ""
|
||||
|
||||
days = uptime_seconds // 60 // 60 // 24
|
||||
days = f"{days}d" if days > 0 else ""
|
||||
|
||||
return f"{days}{hours}{minutes}{sec}s"
|
||||
|
||||
|
||||
def uptime(proc: Path):
|
||||
with open(proc / "uptime", "r") as f:
|
||||
uptime_seconds = int(float(f.readline().split()[0]))
|
||||
|
||||
yield f"Up: {render_duration(uptime_seconds)}"
|
||||
|
||||
|
||||
def load():
|
||||
load1, load5, load15 = os.getloadavg()
|
||||
yield f"Load: 1m {load1:.2f}"
|
||||
yield f"Load: 5m {load5:.2f}"
|
||||
yield f"Load: 15m {load15:.2f}"
|
||||
|
||||
|
||||
def df(
|
||||
dirs: Iterable[Path] = (
|
||||
Path("/"),
|
||||
Path("/boot"),
|
||||
Path("/srv"),
|
||||
)
|
||||
):
|
||||
for dir in dirs:
|
||||
if dir.is_dir():
|
||||
dev, blocks, used, avail, pct, mtpt = re.split(
|
||||
r"\s+", check_output(["df", str(dir)]).decode("utf-8").splitlines()[1]
|
||||
)
|
||||
yield f"{dir} usage: {pct}"
|
||||
|
||||
|
||||
def zfs():
|
||||
if zpool := which("zpool"):
|
||||
status = json.loads(check_output([zpool, "status", "--json"]))
|
||||
|
||||
for pool_name, pool_stats in status["pools"].items():
|
||||
yield f"Zpool {pool_name}:"
|
||||
yield f" State: {pool_stats['state']}"
|
||||
yield f" Errors: {pool_stats['error_count']}"
|
||||
yield f" Used: {pool_stats['vdevs'][pool_name]['alloc_space']}"
|
||||
yield f" Total: {pool_stats['vdevs'][pool_name]['total_space']}"
|
||||
last_scan_date = datetime.strptime(
|
||||
pool_stats["scan_stats"]["end_time"], "%a %b %d %H:%M:%S %p %Z %Y"
|
||||
)
|
||||
delta = datetime.now(tz=last_scan_date.tzinfo) - last_scan_date
|
||||
yield f" Scanned: {render_duration(delta.total_seconds())} ago"
|
||||
|
||||
|
||||
def window(text: str, n=2):
|
||||
"""
|
||||
Returns a sliding window (of width n) over data from the iterable.
|
||||
|
||||
s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
|
||||
"""
|
||||
offset = 0
|
||||
while offset + n <= len(text):
|
||||
yield text[offset : offset + n]
|
||||
offset += 1
|
||||
else:
|
||||
if len(text) < n:
|
||||
yield text
|
||||
|
||||
|
||||
def recursive_merge(dict1, dict2):
|
||||
for key, value in dict2.items():
|
||||
if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict):
|
||||
dict1[key] = recursive_merge(dict1[key], value)
|
||||
else:
|
||||
dict1[key] = value
|
||||
return dict1
|
||||
|
||||
|
||||
@click.option(
|
||||
"--config",
|
||||
"config",
|
||||
type=Path,
|
||||
help="A configuration file",
|
||||
default=Path("/etc/tickertape/config.toml"),
|
||||
)
|
||||
@click.command()
|
||||
def main(config: Path):
|
||||
|
||||
# Config loading
|
||||
config_data = {}
|
||||
|
||||
if config.exists():
|
||||
with open(config, "rb") as fp:
|
||||
config_data: Config = tomllib.load(fp)
|
||||
|
||||
config_data = recursive_merge(config_data, DEFAULT_CONFIG)
|
||||
|
||||
dev = Path(os.getenv("TICKERTAPE_DEVDIR") or config_data["dev"]["path"])
|
||||
proc = Path(os.getenv("TICKERTAPE_PROCDIR") or config_data["proc"]["path"])
|
||||
|
||||
hn = hostname()
|
||||
|
||||
# Functions to iterators over lines (or scrolls )
|
||||
lower = [
|
||||
defaultaddr,
|
||||
lambda: uptime(proc),
|
||||
load,
|
||||
df,
|
||||
zfs,
|
||||
]
|
||||
|
||||
# seconds between display refreshes
|
||||
clock_rate = config_data["tickertape"]["clock_rate"]
|
||||
character_rate = config_data["tickertape"]["character_rate"]
|
||||
|
||||
lcd_width = config_data["lcd"]["width"]
|
||||
lcd_height = config_data["lcd"]["height"]
|
||||
|
||||
with LcdBackpack(config_data["lcd"]["path"]) as dev:
|
||||
# Screen setup
|
||||
dev.set_splash_screen(config_data["tickertape"]["splash_text"])
|
||||
dev.set_lcd_size(
|
||||
lcd_width,
|
||||
lcd_height,
|
||||
)
|
||||
dev.set_brightness(config_data["lcd"]["brightness"])
|
||||
dev.set_autoscroll(False)
|
||||
|
||||
# Display the splashscreen
|
||||
dev.clear()
|
||||
dev.set_cursor_position(1, 1)
|
||||
dev.write(SPLASH_TEXT)
|
||||
sleep(clock_rate)
|
||||
|
||||
# Set the hostname row
|
||||
dev.set_cursor_position(1, 1)
|
||||
dev.clear()
|
||||
dev.write(hn)
|
||||
|
||||
def sigterm_handler(_signo, _stack_frame):
|
||||
dev.clear()
|
||||
exit(0)
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm_handler)
|
||||
signal.signal(signal.SIGINT, sigterm_handler)
|
||||
|
||||
while True:
|
||||
for h in lower:
|
||||
# FIXME: Support dependency injection or args to the handlers?
|
||||
for line in h():
|
||||
# Lines could be longer than 16 characters and if they are
|
||||
# we want to shift characters down the line. This is an
|
||||
# inefficient but idiomatic way to achieve that.
|
||||
for view in window(line, n=lcd_width):
|
||||
# Reset the cursor
|
||||
dev.set_cursor_position(1, 2)
|
||||
# Lay down exactly 16 characters, space padded to flush any state
|
||||
dev.write("%-16s" % view)
|
||||
# Pause between sequential characters
|
||||
sleep(character_rate)
|
||||
|
||||
else:
|
||||
# And then wait the clock interval
|
||||
sleep(clock_rate - character_rate)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
8
projects/tickertape/tickertape.service
Normal file
8
projects/tickertape/tickertape.service
Normal file
|
@ -0,0 +1,8 @@
|
|||
[Unit]
|
||||
Description=A tickertape for Adafruit LCD displays
|
||||
|
||||
[Service]
|
||||
ExecStart=/usr/bin/tickertape --config /etc/tickertape/config.toml
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
Loading…
Add table
Add a link
Reference in a new issue