2021-08-03 14:43:30 +00:00
|
|
|
#!python3
|
|
|
|
|
|
|
|
"""Evil monitoring.
|
|
|
|
|
|
|
|
Ping hosts, syslogging at INFO if they're up and happy, otherwise using Telnet scripting to force
|
|
|
|
reset them and syslogging at CRIT with what the uptime was prior to forced reboot.
|
|
|
|
|
|
|
|
Hosts are debounced, so that they have a chance to return before monitoring resumes.
|
|
|
|
|
|
|
|
No effort is made to detect network conditions or poweroffs.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
import logging
|
|
|
|
from logging.handlers import SysLogHandler
|
|
|
|
import os
|
|
|
|
import random
|
|
|
|
import signal
|
|
|
|
import socket
|
|
|
|
import subprocess
|
|
|
|
import sys
|
|
|
|
from sys import exit
|
|
|
|
from telnetlib import Telnet
|
|
|
|
from threading import Event, Lock, Thread
|
|
|
|
from time import sleep
|
|
|
|
|
2021-09-20 00:05:22 +00:00
|
|
|
from kazoo.exceptions import (
|
|
|
|
ConnectionLoss,
|
|
|
|
LockTimeout,
|
|
|
|
SessionExpiredError,
|
|
|
|
)
|
|
|
|
from kazoo.handlers.threading import (
|
|
|
|
KazooTimeoutError,
|
|
|
|
)
|
2021-08-03 14:43:30 +00:00
|
|
|
from kazoo.recipe.lock import Lock as KazooLock
|
|
|
|
from kook.client import KookClient, lock
|
|
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger("arrdem.overwatchd")
|
|
|
|
|
|
|
|
|
|
|
|
CONFIG = {
|
|
|
|
# APC PDU credentials
|
|
|
|
"pdu_username": "apc",
|
|
|
|
"pdu_password": "debda7f140 -c",
|
|
|
|
|
|
|
|
# Hosts recover in about 40s,
|
|
|
|
# But only stop responding to pings for about 6-8s.
|
|
|
|
"debounce": 40,
|
|
|
|
|
|
|
|
# Once a host is up, 5s of no ping is indicative.
|
|
|
|
"threshold": 5,
|
|
|
|
|
|
|
|
# Number of seconds to wait before refreshing inventory.
|
|
|
|
"loop_trip": 60.0,
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def zdec(i: int):
|
|
|
|
"""Decrement, stopping at 0."""
|
|
|
|
|
|
|
|
if i <= 1:
|
|
|
|
return 0
|
|
|
|
else:
|
|
|
|
return i - 1
|
|
|
|
|
|
|
|
|
|
|
|
def ping(hostname: str,
|
|
|
|
count: int = 2,
|
|
|
|
timeout: int = 1):
|
|
|
|
"""Send count packets to a hostname, with a timeout of timeout"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
return subprocess.check_call(["ping", "-q", "-c", str(count), "-W", str(timeout), hostname],
|
|
|
|
stderr=subprocess.DEVNULL,
|
|
|
|
stdout=subprocess.DEVNULL) == 0
|
|
|
|
except subprocess.CalledProcessError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def check_port(hostname: str,
|
|
|
|
timeout: int = 1,
|
|
|
|
port: int = 22,
|
|
|
|
banner: bytes = b"SSH"):
|
|
|
|
try:
|
|
|
|
conn = Telnet(hostname, port)
|
|
|
|
offset, match, data = conn.expect([banner], timeout=timeout)
|
|
|
|
conn.close()
|
|
|
|
return match is not None
|
|
|
|
except ConnectionRefusedError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def do_reboot(client: KookClient, pdu_uri: str, port: int):
|
|
|
|
"""Get a shared lock, telnet to the PDU, reset the port and log out."""
|
|
|
|
|
|
|
|
pdu_hostname, pdu_port = pdu_uri.split(":", 1)
|
|
|
|
pdu_host = client.host(pdu_hostname)
|
|
|
|
log.info(f"Attempting to force a reset port {port} using {pdu_uri}")
|
|
|
|
|
|
|
|
def l(text):
|
|
|
|
return (text + "\r").encode("utf-8")
|
|
|
|
|
|
|
|
def expect(conn, text):
|
|
|
|
offset, match, data = conn.expect([bytes(text)], timeout=1)
|
|
|
|
if offset is None:
|
|
|
|
raise Exception(f"Unable to match pattern {text} in conn {conn}")
|
|
|
|
else:
|
|
|
|
return
|
|
|
|
|
|
|
|
def apc_login(conn):
|
|
|
|
expect(conn, b"User Name")
|
|
|
|
conn.write(l(CONFIG["pdu_username"]))
|
|
|
|
expect(conn, b"Password")
|
|
|
|
conn.write(l(CONFIG["pdu_password"]))
|
|
|
|
|
|
|
|
def apc_command(conn, cmd):
|
|
|
|
expect(conn, b"APC>")
|
|
|
|
conn.write(l(cmd))
|
|
|
|
|
|
|
|
# To ensure only one process logs into the PDU at once
|
|
|
|
with lock(pdu_host.lock):
|
|
|
|
conn = Telnet(pdu_hostname, int(pdu_port))
|
|
|
|
apc_login(conn)
|
|
|
|
apc_command(conn, "reboot " + str(port))
|
|
|
|
apc_command(conn, "quit")
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
def host_ignored(host):
|
|
|
|
return {"true": True, "false": False}.get(host.canonicalized_vars().get("overwatchd_ignored"))
|
|
|
|
|
|
|
|
|
|
|
|
def monitor_lock(client, hostname):
|
|
|
|
return KazooLock(client.client, f"/overwatch/host/{hostname}", f"{os.getpid()}@{socket.getfqdn()}")
|
|
|
|
|
|
|
|
|
|
|
|
def monitor(client: KookClient, shutdown: Event, hostname: str):
|
|
|
|
# FIXME (arrdem 2019-06-25):
|
|
|
|
# Attrs could change / host could move and we wouldn't handle that.
|
|
|
|
|
|
|
|
log.info(f"Monitoring {hostname}")
|
|
|
|
|
|
|
|
threshold = CONFIG["threshold"]
|
|
|
|
debounce = timedelta(seconds=CONFIG["debounce"])
|
|
|
|
loop_time = CONFIG["loop_trip"]
|
|
|
|
|
|
|
|
_monitor_lock = monitor_lock(client, hostname)
|
|
|
|
while not shutdown.is_set():
|
|
|
|
try:
|
|
|
|
# Rather tha directly contending forever, we contend on a loop so that
|
|
|
|
# we'll shut down workers in a reasonable amount of time.
|
|
|
|
with lock(_monitor_lock, timeout=2):
|
|
|
|
# Loop variables
|
|
|
|
now = start = datetime.today()
|
|
|
|
counter = 0
|
|
|
|
|
|
|
|
while not shutdown.is_set():
|
|
|
|
host = client.host(hostname)
|
|
|
|
|
|
|
|
# The host has been deleted, abort
|
|
|
|
if host is None:
|
|
|
|
return
|
|
|
|
|
|
|
|
# FIXME (arrdem 2019-08-04):
|
|
|
|
# This could be slow???
|
|
|
|
attrs = host.canonicalized_vars()
|
|
|
|
host_address = attrs.get("host_address")
|
|
|
|
|
|
|
|
# Update the date info
|
|
|
|
now = datetime.today()
|
|
|
|
delta = now - start
|
|
|
|
|
|
|
|
if delta < debounce:
|
|
|
|
sleep(2)
|
|
|
|
|
|
|
|
elif counter >= threshold:
|
|
|
|
# Bounce the box, wait for it to become healthy again and hand off
|
|
|
|
uptime = delta.total_seconds() - counter
|
|
|
|
|
|
|
|
# Use a timeout so that contending host(s) give up.
|
|
|
|
with lock(host.lock):
|
|
|
|
log.critical(f"{hostname} detected unhealthy for {counter}s after {uptime}s up, forcing reboot!")
|
|
|
|
do_reboot(client, attrs.get("pdu_uri"), attrs.get("pdu_outlet"))
|
|
|
|
# Hand off to another worker, who will have to debounce first
|
|
|
|
break
|
|
|
|
|
|
|
|
elif not host.is_locked() and not host_ignored(host):
|
|
|
|
healthy = True
|
|
|
|
|
|
|
|
# FIXME (arrdem 2019-06-26):
|
|
|
|
# Factor these healthchecks out - ideally into a separate system
|
|
|
|
if not ping(host_address):
|
|
|
|
log.warning(f"{hostname} did not return ping")
|
|
|
|
healthy = False
|
|
|
|
|
|
|
|
elif not check_port(host_address, port=22, banner=b"SSH"):
|
|
|
|
log.warning(f"{hostname} did not respond to ssh port knock")
|
|
|
|
healthy = False
|
|
|
|
|
|
|
|
if not healthy:
|
|
|
|
log.warning(f"{hostname} has no active lock, incrementing")
|
|
|
|
counter += 1
|
|
|
|
|
|
|
|
else:
|
|
|
|
log.debug(f"{hostname} seems healthy...")
|
|
|
|
# Otherwise we zdec the score.
|
|
|
|
counter = zdec(counter)
|
|
|
|
|
|
|
|
# delta > debounce implied by if ordering
|
|
|
|
if delta.total_seconds() % 60 // 1 == 0:
|
|
|
|
log.info(f"{hostname} healthy for {delta.total_seconds()}s")
|
|
|
|
|
|
|
|
# If we've taken a full round of keeping an eye on this host and someone else is ready hand off
|
|
|
|
if counter == 0 and delta.total_seconds() > loop_time and len(_monitor_lock.contenders()) > 0:
|
|
|
|
log.info(f"Letting someone else keep an eye on {hostname}")
|
|
|
|
break
|
|
|
|
|
|
|
|
sleep(2)
|
|
|
|
|
|
|
|
sleep(2)
|
|
|
|
|
|
|
|
except (SessionExpiredError, ConnectionLoss):
|
|
|
|
log.critical(f"Monitoring for host {hostname} lost ZK, retrying....")
|
|
|
|
while not shutdown.is_set():
|
|
|
|
try:
|
|
|
|
client.restart()
|
|
|
|
break
|
|
|
|
except KazooTimeoutError:
|
|
|
|
continue
|
|
|
|
|
|
|
|
except LockTimeout:
|
|
|
|
sleep(random.random())
|
|
|
|
|
|
|
|
|
|
|
|
def cli_monitor(client):
|
|
|
|
"""CLI entry point. Monitor all available inventory."""
|
|
|
|
|
|
|
|
children_lock = Lock()
|
|
|
|
children = {}
|
|
|
|
shutdown = Event()
|
|
|
|
|
|
|
|
def _shutdown(*args):
|
|
|
|
shutdown.set()
|
|
|
|
with children_lock:
|
|
|
|
for c in children.values():
|
|
|
|
c.join()
|
|
|
|
exit(0)
|
|
|
|
|
|
|
|
signal.signal(signal.SIGINT, _shutdown)
|
|
|
|
signal.signal(signal.SIGQUIT, _shutdown)
|
|
|
|
|
|
|
|
def monitor_maybe(host):
|
|
|
|
attrs = host.canonicalized_vars()
|
|
|
|
if attrs.get("host_address") and attrs.get("pdu_outlet") and attrs.get("pdu_uri"):
|
|
|
|
log.info(f"Host {host} has requisite k/vs, monitoring...")
|
|
|
|
t = Thread(target=monitor,
|
|
|
|
args=(client, shutdown, host.name,))
|
|
|
|
t.start()
|
|
|
|
children[host.name] = t
|
|
|
|
return True
|
|
|
|
|
|
|
|
def intake_hosts(*args, **kwargs):
|
|
|
|
with children_lock:
|
|
|
|
hosts = list(client.hosts(watch=intake_hosts))
|
|
|
|
random.shuffle(hosts)
|
|
|
|
for host in hosts:
|
|
|
|
if host.name not in children:
|
|
|
|
monitor_maybe(host)
|
|
|
|
|
|
|
|
while not shutdown.is_set():
|
|
|
|
with children_lock:
|
|
|
|
for hostname, thread in list(children.items()):
|
|
|
|
if not thread.is_alive():
|
|
|
|
thread.join()
|
|
|
|
del children[hostname]
|
|
|
|
|
|
|
|
intake_hosts()
|
|
|
|
|
|
|
|
sleep(CONFIG.get("loop_trip"))
|
|
|
|
|
|
|
|
|
|
|
|
def cli_spy(client):
|
|
|
|
"""CLI entry point. Spy on who's doing the monitoring."""
|
|
|
|
|
|
|
|
tree = "/overwatch/host"
|
|
|
|
lock = Lock()
|
|
|
|
locks = {}
|
|
|
|
|
|
|
|
def _locks_thread(*args, **kwargs):
|
|
|
|
with lock:
|
|
|
|
for hostname in client.client.get_children(tree, watch=_locks_thread):
|
|
|
|
if hostname not in locks:
|
|
|
|
locks[hostname] = monitor_lock(client, hostname)
|
|
|
|
|
|
|
|
_locks_thread()
|
|
|
|
|
|
|
|
while True:
|
|
|
|
with lock:
|
|
|
|
for hostname, host_lock in locks.items():
|
|
|
|
contenders = host_lock.contenders()
|
|
|
|
if contenders:
|
|
|
|
holder, *contenders = contenders
|
|
|
|
if contenders:
|
|
|
|
print(f"{hostname: <16} is monitored by {holder}, with {len(contenders)} contenders")
|
|
|
|
else:
|
|
|
|
print(f"{hostname: <16} is monitored by {holder} uncontended")
|
|
|
|
else:
|
|
|
|
print(f"{hostname: <16} is unmonitored")
|
|
|
|
|
|
|
|
sleep(2)
|
|
|
|
os.system("clear")
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# Make kazoo shut up some
|
|
|
|
logging.getLogger("kazoo").setLevel(logging.WARNING)
|
|
|
|
|
|
|
|
# We'll be quiet too some
|
|
|
|
log.setLevel(logging.INFO)
|
|
|
|
|
|
|
|
# And forward to syslog
|
|
|
|
fmt = logging.Formatter("%(asctime)s %(relativeCreated)6d %(threadName)s - %(name)s - %(levelname)s - %(message)s")
|
|
|
|
slh = SysLogHandler("/dev/log")
|
|
|
|
#slh = logging.StreamHandler()
|
|
|
|
slh.setFormatter(fmt)
|
|
|
|
log.addHandler(slh)
|
|
|
|
|
|
|
|
client = KookClient()
|
|
|
|
|
|
|
|
if "spy" in sys.argv:
|
|
|
|
cli_spy(client)
|
|
|
|
|
|
|
|
elif "monitor" in sys.argv:
|
|
|
|
cli_monitor(client)
|
|
|
|
|
|
|
|
else:
|
|
|
|
print("Usage: overwatchd [monitor | spy]")
|
|
|
|
exit(1)
|