WIP - refactoring towards a better state

This commit is contained in:
Reid 'arrdem' McKenzie 2021-12-12 20:55:39 -07:00
parent c589cbd6c5
commit 3e4d02f2f4
6 changed files with 162 additions and 229 deletions

View file

@ -7,9 +7,6 @@ zapp_binary(
main = "src/python/aloe/__main__.py",
deps = [
":lib",
py_requirement("graphviz"),
py_requirement("icmplib"),
py_requirement("pytz"),
py_requirement("requests"),
],
)

View file

@ -18,91 +18,77 @@ import queue
from queue import Queue
import sys
from threading import Event, Lock, Thread
from time import sleep, time
from time import sleep as _sleep, time
from .icmp import *
from .icmp import _ping
from .cursedlogger import CursesHandler
import pytz
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
parser.add_argument("hosts", nargs="+")
INTERVAL = 0.5
Z = pytz.timezone("America/Denver")
class HostState(object):
"""A model of a (bounded) time series of host state.
"""
def __init__(self,
hostname: str,
history = [],
history_size = 60 * 60 * 24,
is_up: bool = False,
lost: int = 0,
up: float = 0.0):
self._state = ringbuffer(maxlen=history_size)
self._is_up = is_up
self._lost = lost
self._up = up
class MonitoredHost(object):
def __init__(self, hostname: str, timeout: timedelta, id=None):
self._hostname = hostname
self._timeout = timeout
self._sequence = request_sequence(hostname, timeout, id)
self._lock = Lock()
self._state = ringbuffer(maxlen=360)
self._is_up = False
self._lost = 0
self._up = 0
for resp in history:
self.append(resp)
def __call__(self, shutdown: Event, q: Queue):
"""Monitor a given host by throwing requests into the queue; intended to be a Thread target."""
def append(self, resp):
if resp and not self._is_up:
# log.debug(f"Host {self._hostname} is up!")
self._is_up = self._is_up or resp
self._up = resp._time
while not shutdown.is_set():
req = next(self._sequence)
resp = _ping(q, req)
elif resp and self._is_up:
# log.debug(f"Host {self._hostname} holding up...")
pass
if resp and not self._is_up:
# log.debug(f"Host {self._hostname} is up!")
self._is_up = self._is_up or resp
self._up = resp._time
elif not resp and self._is_up:
# log.debug(f"Host {self._hostname} is down!")
self._is_up = None
self._up = None
elif resp and self._is_up:
# log.debug(f"Host {self._hostname} holding up...")
pass
elif not resp and not self._is_up:
pass
elif not resp and self._is_up:
# log.debug(f"Host {self._hostname} is down!")
self._is_up = None
self._up = None
if not resp:
self._lost += 1
elif not resp and not self._is_up:
pass
if self._state and not self._state[0]:
self._lost -= 1
with self._lock:
if not resp:
self._lost += 1
if self._state and not self._state[0]:
self._lost -= 1
# self._state = (self._state + [req])[:-3600]
self._state.append(resp)
sleep(1)
self._state.append(resp)
def last(self):
with self._lock:
return next(reversed(self._state), None)
return next(reversed(self._state), None)
def last_window(self, duration: timedelta = None):
with self._lock:
l = []
t = time() - duration.total_seconds()
for i in reversed(self._state):
if not i or i._time > t:
l.insert(0, i)
else:
break
return l
l = []
t = time() - duration.total_seconds()
for i in reversed(self._state):
if not i or i._time > t:
l.insert(0, i)
else:
break
return l
def loss(self, duration: timedelta):
log = self.last_window(duration)
@ -123,6 +109,102 @@ class MonitoredHost(object):
return datetime.fromtimestamp(self._up)
class MonitoredHost(object):
"""A shim (arguably a lambda) for generating a timeline of host state."""
def __init__(self, hostname: str, timeout: timedelta, id=None):
self._hostname = hostname
self._timeout = timeout
self._sequence = request_sequence(hostname, timeout, id)
self._lock = Lock()
self._state = HostState(hostname)
def __call__(self, shutdown: Event, q: Queue):
"""Monitor a given host by throwing requests into the queue; intended to be a Thread target."""
while not shutdown.is_set():
req = next(self._sequence)
resp = _ping(q, req)
self._state.append(resp)
sleep(shutdown, 1)
@property
def state(self):
return self._state
def retrace(shutdown, q, opts, hl, hosts):
threads = {}
def create_host(distance, address):
if address not in hosts:
log.info(f"Monitoring {address}...")
with hl:
hosts[(distance, address)] = monitor = MonitoredHost(address, timedelta(seconds=8))
threads[(distance, address)] = t = Thread(target=monitor, args=(shutdown, q))
t.start()
else:
log.debug(f"Already monitoring {address}...")
while not shutdown.is_set():
for h in opts.hosts:
# FIXME: Use a real topology model
for distance, hop in zip(count(1), traceroute(q, h)):
if ping(q, hop.address).is_alive:
create_host(distance, hop.address)
sleep(shutdown, 60 * 5)
def render(shutdown, q, stdscr, hl, hosts):
dt = timedelta(seconds=8)
with open("incidents.txt", "w") as fp:
incident = False
while not shutdown.is_set():
rows, cols = stdscr.getmaxyx()
down = 0
now = datetime.now()
i = 0
with hl:
for i, ((distance, name), host) in zip(count(1), sorted(hosts.items(), key=lambda x: x[0][0])):
loss = host.state.loss(dt) * 100
state = host.state.last()
if not state:
down += 1
last_seen = "Down"
else:
last_seen = f"{host.state.last_seen(now).total_seconds():.2f}s ago"
if up := host.state.up(dt):
up = f" up: {(now - up).total_seconds():.2f}"
else:
up = ""
stdscr.addstr(i, 2, f"{distance: <2} {name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols))
stdscr.border()
stdscr.refresh()
msg = None
if down >= 3 and not incident:
incident = True
msg = f"{datetime.now()} - {down} hosts down"
elif down < 3 and incident:
incident = False
msg = f"{datetime.now()} - network recovered"
if i != 0 and msg:
log.info(msg)
fp.write(msg + "\n")
fp.flush()
sleep(shutdown, 1)
def main():
stdscr = curses.initscr()
maxy, maxx = stdscr.getmaxyx()
@ -149,87 +231,18 @@ def main():
hosts = {}
hl = Lock()
threads = {}
def create_host(address):
if address not in hosts:
log.info(f"Monitoring {address}...")
with hl:
hosts[address] = monitor = MonitoredHost(address, timedelta(seconds=8))
threads[address] = t = Thread(target=monitor, args=(shutdown, q))
t.start()
else:
log.debug(f"Already monitoring {address}...")
stdscr.refresh()
def render():
dt = timedelta(seconds=8)
with open("incidents.txt", "w") as fp:
incident = False
while not shutdown.is_set():
rows, cols = stdscr.getmaxyx()
down = 0
now = datetime.now()
i = 0
with hl:
for host, i in zip(hosts.values(), count(1)):
name = host._hostname
loss = host.loss(dt) * 100
state = host.last()
if not state:
down += 1
last_seen = "Down"
else:
last_seen = f"{host.last_seen(now).total_seconds():.2f}s ago"
if up := host.up(dt):
up = f" up: {(now - up).total_seconds():.2f}"
else:
up = ""
stdscr.addstr(i, 2, f"{name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols))
stdscr.border()
stdscr.refresh()
msg = None
if down >= 3 and not incident:
incident = True
msg = f"{datetime.now()} - {down} hosts down"
elif down < 3 and incident:
incident = False
msg = f"{datetime.now()} - network recovered"
if i != 0 and msg:
log.info(msg)
fp.write(msg + "\n")
fp.flush()
sleep(1)
rt = Thread(target=render)
rt = Thread(target=render, args=(shutdown, q, stdscr, hl, hosts))
rt.start()
def retrace():
while not shutdown.is_set():
for h in opts.hosts:
# FIXME: Use a real topology model
for hop in traceroute(q, h):
if ping(q, hop.address).is_alive:
create_host(hop.address)
sleep(60 * 5)
tt = Thread(target=retrace)
tt = Thread(target=retrace, args=(shutdown, q, opts, hl, hosts))
tt.start()
try:
while True:
sleep(1)
sleep(shutdown, 1)
except (KeyboardInterrupt, SystemExit):
pass
finally:
curses.endwin()
sys.stdout.flush()

View file

@ -1,3 +1,5 @@
"""A CURSES screen targeted log record handler."""
import logging
import curses
@ -9,6 +11,7 @@ class CursesHandler(logging.Handler):
def __init__(self, screen):
logging.Handler.__init__(self)
self._screen = screen
# FIXME: This should be dynamic not static.
self._buff = ringbuffer(maxlen=screen.getmaxyx()[0] - 2)
def emit(self, record):
@ -23,5 +26,6 @@ class CursesHandler(logging.Handler):
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)

View file

@ -6,7 +6,7 @@ import logging
import queue
from random import randint
from threading import Event, Lock
from time import sleep
from time import sleep as _sleep
from icmplib.exceptions import (
ICMPLibError,
@ -35,6 +35,16 @@ ICMPRequest.__repr__ = better_repr
ICMPReply.__repr__ = better_repr
def sleep(event, duration, interval=0.1):
total = 0
while total < duration:
if event.is_set():
raise SystemExit()
else:
_sleep(interval)
total += interval
class ICMPRequestResponse(object):
"""A thread-safe request/response structure for pinging a host."""
@ -113,7 +123,7 @@ def icmp_worker(shutdown: Event, q: queue.Queue):
del state[key]
# Sleep one
sleep(0.1)
sleep(shutdown, 0.1)
def traceroute(q: queue.Queue,
@ -151,11 +161,11 @@ def traceroute(q: queue.Queue,
q.put(request)
while not request.ready():
sleep(0.1)
_sleep(0.1)
_reply = request.get()
if _reply is ICMPRequestResponse.TIMEOUT:
sleep(0.1)
_sleep(0.1)
continue
elif _reply:
@ -215,7 +225,7 @@ def request_sequence(hostname: str,
def _ping(q: queue.Queue, request: ICMPRequestResponse):
q.put(request)
while not request.ready():
sleep(0.1)
_sleep(0.1)
_response = request.get()
if _response is not ICMPRequestResponse.TIMEOUT:

View file

@ -1,23 +0,0 @@
"""An implementation of ring buffers which supports efficient reverse scans."""
class Ringbuffer(object):
def __init__(self, size: int, seed: list = None, fill: object = None):
if seed:
assert len(seed) <= size
self._state = ((seed or []) + [fill] * size)[:size]
self._size = size
self._start = 0
self._end = len(seed) if seed else 0
def append(self, obj):
self._state[self._end % self._size] = obj
self._end = (self._end + 1) % self._size
self._start = (self._start + 1) % self._size
def __iter__(self):
if self._start < self._end:
yield from iter(self._state[self._start:self._end])
else:
yield from iter(self._state[self._start:] + self._state[:self._end])
def __reversed__(self):

View file

@ -1,68 +0,0 @@
"""
A model of a (radial) network topology.
"""
from collections import defaultdict
import logging
from typing import List
import graphviz
from icmplib import Hop
import requests
log = logging.getLogger(__name__)
class Topology(object):
LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0)
def __init__(self):
self._graph = defaultdict(set)
self._nodes = {self.LOCALHOST.address: self.LOCALHOST}
def next_hops(self, address: str) -> List[str]:
return list(self._graph.get(address))
def node(self, address: str) -> Hop:
return self._nodes.get(address)
def add_traceroute(self, trace):
for e in trace:
if e.address not in self._nodes:
self._nodes[e.address] = e
else:
e0 = self._nodes[e.address]
e0._packets_sent += e._packets_sent
e0._rtts.extend(e.rtts)
e0._distance = min(e.distance, e0.distance)
hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [
(e.address, e.distance) for e in trace
]
i1 = iter(hosts)
i2 = iter(hosts)
next(i2)
for (parent, _), (child, _) in zip(i1, i2):
self._graph[parent].add(child)
def render(self):
g = graphviz.Digraph()
for n in sorted(self._nodes.values(), key=lambda n: n.distance):
g.node(n.address)
for next in self._graph[n.address]:
g.edge(n.address, next)
# Lol. Lmao.
return requests.post(
"https://dot-to-ascii.ggerganov.com/dot-to-ascii.php",
params={"boxart": 1, "src": g.source},
).text
def __iter__(self):
return iter(sorted(self._nodes.values(), key=lambda n: n.distance))
def __delitem__(self, key):
del self._graph[key]
del self._nodes[key]