From 3e4d02f2f4651b7ac45baa15ec433f190fadbc71 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Sun, 12 Dec 2021 20:55:39 -0700 Subject: [PATCH] WIP - refactoring towards a better state --- projects/aloe/BUILD | 3 - projects/aloe/src/python/aloe/__main__.py | 273 +++++++++--------- projects/aloe/src/python/aloe/cursedlogger.py | 4 + projects/aloe/src/python/aloe/icmp.py | 20 +- projects/aloe/src/python/aloe/ringbuffer.py | 23 -- projects/aloe/src/python/aloe/topology.py | 68 ----- 6 files changed, 162 insertions(+), 229 deletions(-) delete mode 100644 projects/aloe/src/python/aloe/ringbuffer.py delete mode 100644 projects/aloe/src/python/aloe/topology.py diff --git a/projects/aloe/BUILD b/projects/aloe/BUILD index 93104b3..97107d9 100644 --- a/projects/aloe/BUILD +++ b/projects/aloe/BUILD @@ -7,9 +7,6 @@ zapp_binary( main = "src/python/aloe/__main__.py", deps = [ ":lib", - py_requirement("graphviz"), py_requirement("icmplib"), - py_requirement("pytz"), - py_requirement("requests"), ], ) diff --git a/projects/aloe/src/python/aloe/__main__.py b/projects/aloe/src/python/aloe/__main__.py index 0cb9ef7..1e465b0 100644 --- a/projects/aloe/src/python/aloe/__main__.py +++ b/projects/aloe/src/python/aloe/__main__.py @@ -18,91 +18,77 @@ import queue from queue import Queue import sys from threading import Event, Lock, Thread -from time import sleep, time +from time import sleep as _sleep, time from .icmp import * from .icmp import _ping from .cursedlogger import CursesHandler -import pytz - log = logging.getLogger(__name__) parser = argparse.ArgumentParser() parser.add_argument("hosts", nargs="+") -INTERVAL = 0.5 - -Z = pytz.timezone("America/Denver") - class HostState(object): """A model of a (bounded) time series of host state. """ + def __init__(self, + hostname: str, + history = [], + history_size = 60 * 60 * 24, + is_up: bool = False, + lost: int = 0, + up: float = 0.0): + self._state = ringbuffer(maxlen=history_size) + self._is_up = is_up + self._lost = lost + self._up = up -class MonitoredHost(object): - def __init__(self, hostname: str, timeout: timedelta, id=None): - self._hostname = hostname - self._timeout = timeout - self._sequence = request_sequence(hostname, timeout, id) - self._lock = Lock() - self._state = ringbuffer(maxlen=360) - self._is_up = False - self._lost = 0 - self._up = 0 + for resp in history: + self.append(resp) - def __call__(self, shutdown: Event, q: Queue): - """Monitor a given host by throwing requests into the queue; intended to be a Thread target.""" + def append(self, resp): + if resp and not self._is_up: + # log.debug(f"Host {self._hostname} is up!") + self._is_up = self._is_up or resp + self._up = resp._time - while not shutdown.is_set(): - req = next(self._sequence) - resp = _ping(q, req) + elif resp and self._is_up: + # log.debug(f"Host {self._hostname} holding up...") + pass - if resp and not self._is_up: - # log.debug(f"Host {self._hostname} is up!") - self._is_up = self._is_up or resp - self._up = resp._time + elif not resp and self._is_up: + # log.debug(f"Host {self._hostname} is down!") + self._is_up = None + self._up = None - elif resp and self._is_up: - # log.debug(f"Host {self._hostname} holding up...") - pass + elif not resp and not self._is_up: + pass - elif not resp and self._is_up: - # log.debug(f"Host {self._hostname} is down!") - self._is_up = None - self._up = None + if not resp: + self._lost += 1 - elif not resp and not self._is_up: - pass + if self._state and not self._state[0]: + self._lost -= 1 - with self._lock: - if not resp: - self._lost += 1 - if self._state and not self._state[0]: - self._lost -= 1 - - # self._state = (self._state + [req])[:-3600] - self._state.append(resp) - - sleep(1) + self._state.append(resp) def last(self): - with self._lock: - return next(reversed(self._state), None) + return next(reversed(self._state), None) def last_window(self, duration: timedelta = None): - with self._lock: - l = [] - t = time() - duration.total_seconds() - for i in reversed(self._state): - if not i or i._time > t: - l.insert(0, i) - else: - break - return l + l = [] + t = time() - duration.total_seconds() + for i in reversed(self._state): + if not i or i._time > t: + l.insert(0, i) + else: + break + return l def loss(self, duration: timedelta): log = self.last_window(duration) @@ -123,6 +109,102 @@ class MonitoredHost(object): return datetime.fromtimestamp(self._up) +class MonitoredHost(object): + """A shim (arguably a lambda) for generating a timeline of host state.""" + + def __init__(self, hostname: str, timeout: timedelta, id=None): + self._hostname = hostname + self._timeout = timeout + self._sequence = request_sequence(hostname, timeout, id) + self._lock = Lock() + self._state = HostState(hostname) + + def __call__(self, shutdown: Event, q: Queue): + """Monitor a given host by throwing requests into the queue; intended to be a Thread target.""" + + while not shutdown.is_set(): + req = next(self._sequence) + resp = _ping(q, req) + self._state.append(resp) + sleep(shutdown, 1) + + @property + def state(self): + return self._state + + +def retrace(shutdown, q, opts, hl, hosts): + threads = {} + + def create_host(distance, address): + if address not in hosts: + log.info(f"Monitoring {address}...") + with hl: + hosts[(distance, address)] = monitor = MonitoredHost(address, timedelta(seconds=8)) + threads[(distance, address)] = t = Thread(target=monitor, args=(shutdown, q)) + t.start() + + else: + log.debug(f"Already monitoring {address}...") + + + while not shutdown.is_set(): + for h in opts.hosts: + # FIXME: Use a real topology model + for distance, hop in zip(count(1), traceroute(q, h)): + if ping(q, hop.address).is_alive: + create_host(distance, hop.address) + + sleep(shutdown, 60 * 5) + + +def render(shutdown, q, stdscr, hl, hosts): + dt = timedelta(seconds=8) + + with open("incidents.txt", "w") as fp: + incident = False + while not shutdown.is_set(): + rows, cols = stdscr.getmaxyx() + down = 0 + now = datetime.now() + i = 0 + with hl: + for i, ((distance, name), host) in zip(count(1), sorted(hosts.items(), key=lambda x: x[0][0])): + loss = host.state.loss(dt) * 100 + state = host.state.last() + if not state: + down += 1 + last_seen = "Down" + else: + last_seen = f"{host.state.last_seen(now).total_seconds():.2f}s ago" + + if up := host.state.up(dt): + up = f" up: {(now - up).total_seconds():.2f}" + else: + up = "" + + stdscr.addstr(i, 2, f"{distance: <2} {name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols)) + + stdscr.border() + stdscr.refresh() + + msg = None + if down >= 3 and not incident: + incident = True + msg = f"{datetime.now()} - {down} hosts down" + + elif down < 3 and incident: + incident = False + msg = f"{datetime.now()} - network recovered" + + if i != 0 and msg: + log.info(msg) + fp.write(msg + "\n") + fp.flush() + + sleep(shutdown, 1) + + def main(): stdscr = curses.initscr() maxy, maxx = stdscr.getmaxyx() @@ -149,87 +231,18 @@ def main(): hosts = {} hl = Lock() - threads = {} - def create_host(address): - if address not in hosts: - log.info(f"Monitoring {address}...") - with hl: - hosts[address] = monitor = MonitoredHost(address, timedelta(seconds=8)) - threads[address] = t = Thread(target=monitor, args=(shutdown, q)) - t.start() - - else: - log.debug(f"Already monitoring {address}...") - - stdscr.refresh() - - def render(): - dt = timedelta(seconds=8) - - with open("incidents.txt", "w") as fp: - incident = False - while not shutdown.is_set(): - rows, cols = stdscr.getmaxyx() - down = 0 - now = datetime.now() - i = 0 - with hl: - for host, i in zip(hosts.values(), count(1)): - name = host._hostname - loss = host.loss(dt) * 100 - state = host.last() - if not state: - down += 1 - last_seen = "Down" - else: - last_seen = f"{host.last_seen(now).total_seconds():.2f}s ago" - - if up := host.up(dt): - up = f" up: {(now - up).total_seconds():.2f}" - else: - up = "" - - stdscr.addstr(i, 2, f"{name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols)) - - stdscr.border() - stdscr.refresh() - - msg = None - if down >= 3 and not incident: - incident = True - msg = f"{datetime.now()} - {down} hosts down" - - elif down < 3 and incident: - incident = False - msg = f"{datetime.now()} - network recovered" - - if i != 0 and msg: - log.info(msg) - fp.write(msg + "\n") - fp.flush() - - sleep(1) - - rt = Thread(target=render) + rt = Thread(target=render, args=(shutdown, q, stdscr, hl, hosts)) rt.start() - def retrace(): - while not shutdown.is_set(): - for h in opts.hosts: - # FIXME: Use a real topology model - for hop in traceroute(q, h): - if ping(q, hop.address).is_alive: - create_host(hop.address) - - sleep(60 * 5) - - tt = Thread(target=retrace) + tt = Thread(target=retrace, args=(shutdown, q, opts, hl, hosts)) tt.start() try: while True: - sleep(1) + sleep(shutdown, 1) + except (KeyboardInterrupt, SystemExit): + pass finally: curses.endwin() sys.stdout.flush() diff --git a/projects/aloe/src/python/aloe/cursedlogger.py b/projects/aloe/src/python/aloe/cursedlogger.py index 1efacb0..450b2d8 100644 --- a/projects/aloe/src/python/aloe/cursedlogger.py +++ b/projects/aloe/src/python/aloe/cursedlogger.py @@ -1,3 +1,5 @@ +"""A CURSES screen targeted log record handler.""" + import logging import curses @@ -9,6 +11,7 @@ class CursesHandler(logging.Handler): def __init__(self, screen): logging.Handler.__init__(self) self._screen = screen + # FIXME: This should be dynamic not static. self._buff = ringbuffer(maxlen=screen.getmaxyx()[0] - 2) def emit(self, record): @@ -23,5 +26,6 @@ class CursesHandler(logging.Handler): except (KeyboardInterrupt, SystemExit): raise + except: self.handleError(record) diff --git a/projects/aloe/src/python/aloe/icmp.py b/projects/aloe/src/python/aloe/icmp.py index 501c001..8d7e020 100644 --- a/projects/aloe/src/python/aloe/icmp.py +++ b/projects/aloe/src/python/aloe/icmp.py @@ -6,7 +6,7 @@ import logging import queue from random import randint from threading import Event, Lock -from time import sleep +from time import sleep as _sleep from icmplib.exceptions import ( ICMPLibError, @@ -35,6 +35,16 @@ ICMPRequest.__repr__ = better_repr ICMPReply.__repr__ = better_repr +def sleep(event, duration, interval=0.1): + total = 0 + while total < duration: + if event.is_set(): + raise SystemExit() + else: + _sleep(interval) + total += interval + + class ICMPRequestResponse(object): """A thread-safe request/response structure for pinging a host.""" @@ -113,7 +123,7 @@ def icmp_worker(shutdown: Event, q: queue.Queue): del state[key] # Sleep one - sleep(0.1) + sleep(shutdown, 0.1) def traceroute(q: queue.Queue, @@ -151,11 +161,11 @@ def traceroute(q: queue.Queue, q.put(request) while not request.ready(): - sleep(0.1) + _sleep(0.1) _reply = request.get() if _reply is ICMPRequestResponse.TIMEOUT: - sleep(0.1) + _sleep(0.1) continue elif _reply: @@ -215,7 +225,7 @@ def request_sequence(hostname: str, def _ping(q: queue.Queue, request: ICMPRequestResponse): q.put(request) while not request.ready(): - sleep(0.1) + _sleep(0.1) _response = request.get() if _response is not ICMPRequestResponse.TIMEOUT: diff --git a/projects/aloe/src/python/aloe/ringbuffer.py b/projects/aloe/src/python/aloe/ringbuffer.py deleted file mode 100644 index fdfca84..0000000 --- a/projects/aloe/src/python/aloe/ringbuffer.py +++ /dev/null @@ -1,23 +0,0 @@ -"""An implementation of ring buffers which supports efficient reverse scans.""" - -class Ringbuffer(object): - def __init__(self, size: int, seed: list = None, fill: object = None): - if seed: - assert len(seed) <= size - self._state = ((seed or []) + [fill] * size)[:size] - self._size = size - self._start = 0 - self._end = len(seed) if seed else 0 - - def append(self, obj): - self._state[self._end % self._size] = obj - self._end = (self._end + 1) % self._size - self._start = (self._start + 1) % self._size - - def __iter__(self): - if self._start < self._end: - yield from iter(self._state[self._start:self._end]) - else: - yield from iter(self._state[self._start:] + self._state[:self._end]) - - def __reversed__(self): diff --git a/projects/aloe/src/python/aloe/topology.py b/projects/aloe/src/python/aloe/topology.py deleted file mode 100644 index 0991c7e..0000000 --- a/projects/aloe/src/python/aloe/topology.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -A model of a (radial) network topology. -""" - -from collections import defaultdict -import logging -from typing import List - -import graphviz -from icmplib import Hop -import requests - - -log = logging.getLogger(__name__) - - -class Topology(object): - LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0) - - def __init__(self): - self._graph = defaultdict(set) - self._nodes = {self.LOCALHOST.address: self.LOCALHOST} - - def next_hops(self, address: str) -> List[str]: - return list(self._graph.get(address)) - - def node(self, address: str) -> Hop: - return self._nodes.get(address) - - def add_traceroute(self, trace): - for e in trace: - if e.address not in self._nodes: - self._nodes[e.address] = e - else: - e0 = self._nodes[e.address] - e0._packets_sent += e._packets_sent - e0._rtts.extend(e.rtts) - e0._distance = min(e.distance, e0.distance) - - hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [ - (e.address, e.distance) for e in trace - ] - i1 = iter(hosts) - i2 = iter(hosts) - next(i2) - - for (parent, _), (child, _) in zip(i1, i2): - self._graph[parent].add(child) - - def render(self): - g = graphviz.Digraph() - for n in sorted(self._nodes.values(), key=lambda n: n.distance): - g.node(n.address) - for next in self._graph[n.address]: - g.edge(n.address, next) - - # Lol. Lmao. - return requests.post( - "https://dot-to-ascii.ggerganov.com/dot-to-ascii.php", - params={"boxart": 1, "src": g.source}, - ).text - - def __iter__(self): - return iter(sorted(self._nodes.values(), key=lambda n: n.distance)) - - def __delitem__(self, key): - del self._graph[key] - del self._nodes[key]