WIP - refactoring towards a better state
This commit is contained in:
parent
2e7ab627d4
commit
38287accf9
6 changed files with 162 additions and 229 deletions
|
@ -7,9 +7,6 @@ zapp_binary(
|
|||
main = "src/python/aloe/__main__.py",
|
||||
deps = [
|
||||
":lib",
|
||||
py_requirement("graphviz"),
|
||||
py_requirement("icmplib"),
|
||||
py_requirement("pytz"),
|
||||
py_requirement("requests"),
|
||||
],
|
||||
)
|
||||
|
|
|
@ -18,49 +18,40 @@ import queue
|
|||
from queue import Queue
|
||||
import sys
|
||||
from threading import Event, Lock, Thread
|
||||
from time import sleep, time
|
||||
from time import sleep as _sleep, time
|
||||
|
||||
from .icmp import *
|
||||
from .icmp import _ping
|
||||
from .cursedlogger import CursesHandler
|
||||
|
||||
import pytz
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("hosts", nargs="+")
|
||||
|
||||
INTERVAL = 0.5
|
||||
|
||||
Z = pytz.timezone("America/Denver")
|
||||
|
||||
|
||||
class HostState(object):
|
||||
"""A model of a (bounded) time series of host state.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
hostname: str,
|
||||
history = [],
|
||||
history_size = 60 * 60 * 24,
|
||||
is_up: bool = False,
|
||||
lost: int = 0,
|
||||
up: float = 0.0):
|
||||
self._state = ringbuffer(maxlen=history_size)
|
||||
self._is_up = is_up
|
||||
self._lost = lost
|
||||
self._up = up
|
||||
|
||||
class MonitoredHost(object):
|
||||
def __init__(self, hostname: str, timeout: timedelta, id=None):
|
||||
self._hostname = hostname
|
||||
self._timeout = timeout
|
||||
self._sequence = request_sequence(hostname, timeout, id)
|
||||
self._lock = Lock()
|
||||
self._state = ringbuffer(maxlen=360)
|
||||
self._is_up = False
|
||||
self._lost = 0
|
||||
self._up = 0
|
||||
|
||||
def __call__(self, shutdown: Event, q: Queue):
|
||||
"""Monitor a given host by throwing requests into the queue; intended to be a Thread target."""
|
||||
|
||||
while not shutdown.is_set():
|
||||
req = next(self._sequence)
|
||||
resp = _ping(q, req)
|
||||
for resp in history:
|
||||
self.append(resp)
|
||||
|
||||
def append(self, resp):
|
||||
if resp and not self._is_up:
|
||||
# log.debug(f"Host {self._hostname} is up!")
|
||||
self._is_up = self._is_up or resp
|
||||
|
@ -78,23 +69,18 @@ class MonitoredHost(object):
|
|||
elif not resp and not self._is_up:
|
||||
pass
|
||||
|
||||
with self._lock:
|
||||
if not resp:
|
||||
self._lost += 1
|
||||
|
||||
if self._state and not self._state[0]:
|
||||
self._lost -= 1
|
||||
|
||||
# self._state = (self._state + [req])[:-3600]
|
||||
self._state.append(resp)
|
||||
|
||||
sleep(1)
|
||||
|
||||
def last(self):
|
||||
with self._lock:
|
||||
return next(reversed(self._state), None)
|
||||
|
||||
def last_window(self, duration: timedelta = None):
|
||||
with self._lock:
|
||||
l = []
|
||||
t = time() - duration.total_seconds()
|
||||
for i in reversed(self._state):
|
||||
|
@ -123,6 +109,102 @@ class MonitoredHost(object):
|
|||
return datetime.fromtimestamp(self._up)
|
||||
|
||||
|
||||
class MonitoredHost(object):
|
||||
"""A shim (arguably a lambda) for generating a timeline of host state."""
|
||||
|
||||
def __init__(self, hostname: str, timeout: timedelta, id=None):
|
||||
self._hostname = hostname
|
||||
self._timeout = timeout
|
||||
self._sequence = request_sequence(hostname, timeout, id)
|
||||
self._lock = Lock()
|
||||
self._state = HostState(hostname)
|
||||
|
||||
def __call__(self, shutdown: Event, q: Queue):
|
||||
"""Monitor a given host by throwing requests into the queue; intended to be a Thread target."""
|
||||
|
||||
while not shutdown.is_set():
|
||||
req = next(self._sequence)
|
||||
resp = _ping(q, req)
|
||||
self._state.append(resp)
|
||||
sleep(shutdown, 1)
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
return self._state
|
||||
|
||||
|
||||
def retrace(shutdown, q, opts, hl, hosts):
|
||||
threads = {}
|
||||
|
||||
def create_host(distance, address):
|
||||
if address not in hosts:
|
||||
log.info(f"Monitoring {address}...")
|
||||
with hl:
|
||||
hosts[(distance, address)] = monitor = MonitoredHost(address, timedelta(seconds=8))
|
||||
threads[(distance, address)] = t = Thread(target=monitor, args=(shutdown, q))
|
||||
t.start()
|
||||
|
||||
else:
|
||||
log.debug(f"Already monitoring {address}...")
|
||||
|
||||
|
||||
while not shutdown.is_set():
|
||||
for h in opts.hosts:
|
||||
# FIXME: Use a real topology model
|
||||
for distance, hop in zip(count(1), traceroute(q, h)):
|
||||
if ping(q, hop.address).is_alive:
|
||||
create_host(distance, hop.address)
|
||||
|
||||
sleep(shutdown, 60 * 5)
|
||||
|
||||
|
||||
def render(shutdown, q, stdscr, hl, hosts):
|
||||
dt = timedelta(seconds=8)
|
||||
|
||||
with open("incidents.txt", "w") as fp:
|
||||
incident = False
|
||||
while not shutdown.is_set():
|
||||
rows, cols = stdscr.getmaxyx()
|
||||
down = 0
|
||||
now = datetime.now()
|
||||
i = 0
|
||||
with hl:
|
||||
for i, ((distance, name), host) in zip(count(1), sorted(hosts.items(), key=lambda x: x[0][0])):
|
||||
loss = host.state.loss(dt) * 100
|
||||
state = host.state.last()
|
||||
if not state:
|
||||
down += 1
|
||||
last_seen = "Down"
|
||||
else:
|
||||
last_seen = f"{host.state.last_seen(now).total_seconds():.2f}s ago"
|
||||
|
||||
if up := host.state.up(dt):
|
||||
up = f" up: {(now - up).total_seconds():.2f}"
|
||||
else:
|
||||
up = ""
|
||||
|
||||
stdscr.addstr(i, 2, f"{distance: <2} {name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols))
|
||||
|
||||
stdscr.border()
|
||||
stdscr.refresh()
|
||||
|
||||
msg = None
|
||||
if down >= 3 and not incident:
|
||||
incident = True
|
||||
msg = f"{datetime.now()} - {down} hosts down"
|
||||
|
||||
elif down < 3 and incident:
|
||||
incident = False
|
||||
msg = f"{datetime.now()} - network recovered"
|
||||
|
||||
if i != 0 and msg:
|
||||
log.info(msg)
|
||||
fp.write(msg + "\n")
|
||||
fp.flush()
|
||||
|
||||
sleep(shutdown, 1)
|
||||
|
||||
|
||||
def main():
|
||||
stdscr = curses.initscr()
|
||||
maxy, maxx = stdscr.getmaxyx()
|
||||
|
@ -149,87 +231,18 @@ def main():
|
|||
|
||||
hosts = {}
|
||||
hl = Lock()
|
||||
threads = {}
|
||||
|
||||
def create_host(address):
|
||||
if address not in hosts:
|
||||
log.info(f"Monitoring {address}...")
|
||||
with hl:
|
||||
hosts[address] = monitor = MonitoredHost(address, timedelta(seconds=8))
|
||||
threads[address] = t = Thread(target=monitor, args=(shutdown, q))
|
||||
t.start()
|
||||
|
||||
else:
|
||||
log.debug(f"Already monitoring {address}...")
|
||||
|
||||
stdscr.refresh()
|
||||
|
||||
def render():
|
||||
dt = timedelta(seconds=8)
|
||||
|
||||
with open("incidents.txt", "w") as fp:
|
||||
incident = False
|
||||
while not shutdown.is_set():
|
||||
rows, cols = stdscr.getmaxyx()
|
||||
down = 0
|
||||
now = datetime.now()
|
||||
i = 0
|
||||
with hl:
|
||||
for host, i in zip(hosts.values(), count(1)):
|
||||
name = host._hostname
|
||||
loss = host.loss(dt) * 100
|
||||
state = host.last()
|
||||
if not state:
|
||||
down += 1
|
||||
last_seen = "Down"
|
||||
else:
|
||||
last_seen = f"{host.last_seen(now).total_seconds():.2f}s ago"
|
||||
|
||||
if up := host.up(dt):
|
||||
up = f" up: {(now - up).total_seconds():.2f}"
|
||||
else:
|
||||
up = ""
|
||||
|
||||
stdscr.addstr(i, 2, f"{name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols))
|
||||
|
||||
stdscr.border()
|
||||
stdscr.refresh()
|
||||
|
||||
msg = None
|
||||
if down >= 3 and not incident:
|
||||
incident = True
|
||||
msg = f"{datetime.now()} - {down} hosts down"
|
||||
|
||||
elif down < 3 and incident:
|
||||
incident = False
|
||||
msg = f"{datetime.now()} - network recovered"
|
||||
|
||||
if i != 0 and msg:
|
||||
log.info(msg)
|
||||
fp.write(msg + "\n")
|
||||
fp.flush()
|
||||
|
||||
sleep(1)
|
||||
|
||||
rt = Thread(target=render)
|
||||
rt = Thread(target=render, args=(shutdown, q, stdscr, hl, hosts))
|
||||
rt.start()
|
||||
|
||||
def retrace():
|
||||
while not shutdown.is_set():
|
||||
for h in opts.hosts:
|
||||
# FIXME: Use a real topology model
|
||||
for hop in traceroute(q, h):
|
||||
if ping(q, hop.address).is_alive:
|
||||
create_host(hop.address)
|
||||
|
||||
sleep(60 * 5)
|
||||
|
||||
tt = Thread(target=retrace)
|
||||
tt = Thread(target=retrace, args=(shutdown, q, opts, hl, hosts))
|
||||
tt.start()
|
||||
|
||||
try:
|
||||
while True:
|
||||
sleep(1)
|
||||
sleep(shutdown, 1)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
pass
|
||||
finally:
|
||||
curses.endwin()
|
||||
sys.stdout.flush()
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
"""A CURSES screen targeted log record handler."""
|
||||
|
||||
import logging
|
||||
import curses
|
||||
|
||||
|
@ -9,6 +11,7 @@ class CursesHandler(logging.Handler):
|
|||
def __init__(self, screen):
|
||||
logging.Handler.__init__(self)
|
||||
self._screen = screen
|
||||
# FIXME: This should be dynamic not static.
|
||||
self._buff = ringbuffer(maxlen=screen.getmaxyx()[0] - 2)
|
||||
|
||||
def emit(self, record):
|
||||
|
@ -23,5 +26,6 @@ class CursesHandler(logging.Handler):
|
|||
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
|
||||
except:
|
||||
self.handleError(record)
|
||||
|
|
|
@ -6,7 +6,7 @@ import logging
|
|||
import queue
|
||||
from random import randint
|
||||
from threading import Event, Lock
|
||||
from time import sleep
|
||||
from time import sleep as _sleep
|
||||
|
||||
from icmplib.exceptions import (
|
||||
ICMPLibError,
|
||||
|
@ -35,6 +35,16 @@ ICMPRequest.__repr__ = better_repr
|
|||
ICMPReply.__repr__ = better_repr
|
||||
|
||||
|
||||
def sleep(event, duration, interval=0.1):
|
||||
total = 0
|
||||
while total < duration:
|
||||
if event.is_set():
|
||||
raise SystemExit()
|
||||
else:
|
||||
_sleep(interval)
|
||||
total += interval
|
||||
|
||||
|
||||
class ICMPRequestResponse(object):
|
||||
"""A thread-safe request/response structure for pinging a host."""
|
||||
|
||||
|
@ -113,7 +123,7 @@ def icmp_worker(shutdown: Event, q: queue.Queue):
|
|||
del state[key]
|
||||
|
||||
# Sleep one
|
||||
sleep(0.1)
|
||||
sleep(shutdown, 0.1)
|
||||
|
||||
|
||||
def traceroute(q: queue.Queue,
|
||||
|
@ -151,11 +161,11 @@ def traceroute(q: queue.Queue,
|
|||
|
||||
q.put(request)
|
||||
while not request.ready():
|
||||
sleep(0.1)
|
||||
_sleep(0.1)
|
||||
|
||||
_reply = request.get()
|
||||
if _reply is ICMPRequestResponse.TIMEOUT:
|
||||
sleep(0.1)
|
||||
_sleep(0.1)
|
||||
continue
|
||||
|
||||
elif _reply:
|
||||
|
@ -215,7 +225,7 @@ def request_sequence(hostname: str,
|
|||
def _ping(q: queue.Queue, request: ICMPRequestResponse):
|
||||
q.put(request)
|
||||
while not request.ready():
|
||||
sleep(0.1)
|
||||
_sleep(0.1)
|
||||
|
||||
_response = request.get()
|
||||
if _response is not ICMPRequestResponse.TIMEOUT:
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
"""An implementation of ring buffers which supports efficient reverse scans."""
|
||||
|
||||
class Ringbuffer(object):
|
||||
def __init__(self, size: int, seed: list = None, fill: object = None):
|
||||
if seed:
|
||||
assert len(seed) <= size
|
||||
self._state = ((seed or []) + [fill] * size)[:size]
|
||||
self._size = size
|
||||
self._start = 0
|
||||
self._end = len(seed) if seed else 0
|
||||
|
||||
def append(self, obj):
|
||||
self._state[self._end % self._size] = obj
|
||||
self._end = (self._end + 1) % self._size
|
||||
self._start = (self._start + 1) % self._size
|
||||
|
||||
def __iter__(self):
|
||||
if self._start < self._end:
|
||||
yield from iter(self._state[self._start:self._end])
|
||||
else:
|
||||
yield from iter(self._state[self._start:] + self._state[:self._end])
|
||||
|
||||
def __reversed__(self):
|
|
@ -1,68 +0,0 @@
|
|||
"""
|
||||
A model of a (radial) network topology.
|
||||
"""
|
||||
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
import graphviz
|
||||
from icmplib import Hop
|
||||
import requests
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Topology(object):
|
||||
LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0)
|
||||
|
||||
def __init__(self):
|
||||
self._graph = defaultdict(set)
|
||||
self._nodes = {self.LOCALHOST.address: self.LOCALHOST}
|
||||
|
||||
def next_hops(self, address: str) -> List[str]:
|
||||
return list(self._graph.get(address))
|
||||
|
||||
def node(self, address: str) -> Hop:
|
||||
return self._nodes.get(address)
|
||||
|
||||
def add_traceroute(self, trace):
|
||||
for e in trace:
|
||||
if e.address not in self._nodes:
|
||||
self._nodes[e.address] = e
|
||||
else:
|
||||
e0 = self._nodes[e.address]
|
||||
e0._packets_sent += e._packets_sent
|
||||
e0._rtts.extend(e.rtts)
|
||||
e0._distance = min(e.distance, e0.distance)
|
||||
|
||||
hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [
|
||||
(e.address, e.distance) for e in trace
|
||||
]
|
||||
i1 = iter(hosts)
|
||||
i2 = iter(hosts)
|
||||
next(i2)
|
||||
|
||||
for (parent, _), (child, _) in zip(i1, i2):
|
||||
self._graph[parent].add(child)
|
||||
|
||||
def render(self):
|
||||
g = graphviz.Digraph()
|
||||
for n in sorted(self._nodes.values(), key=lambda n: n.distance):
|
||||
g.node(n.address)
|
||||
for next in self._graph[n.address]:
|
||||
g.edge(n.address, next)
|
||||
|
||||
# Lol. Lmao.
|
||||
return requests.post(
|
||||
"https://dot-to-ascii.ggerganov.com/dot-to-ascii.php",
|
||||
params={"boxart": 1, "src": g.source},
|
||||
).text
|
||||
|
||||
def __iter__(self):
|
||||
return iter(sorted(self._nodes.values(), key=lambda n: n.distance))
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self._graph[key]
|
||||
del self._nodes[key]
|
Loading…
Reference in a new issue