diff --git a/projects/aloe/src/python/aloe/__main__.py b/projects/aloe/src/python/aloe/__main__.py index b6ddb34..0cb9ef7 100644 --- a/projects/aloe/src/python/aloe/__main__.py +++ b/projects/aloe/src/python/aloe/__main__.py @@ -1,5 +1,7 @@ """Aloe - A shitty weathermapping tool. +Think MTR but with the ability to detect/declare incidents and emit logs. + Periodically traceroutes the egress network, and then walks pings out the egress network recording times and hosts which failed to respond. Expects a network in excess of 90% packet delivery, but with variable timings. Intended to probe for when packet delivery latencies radically degrade and maintain a report file. @@ -7,211 +9,233 @@ when packet delivery latencies radically degrade and maintain a report file. """ import argparse -from collections import defaultdict, deque -from datetime import datetime, timedelta -from itertools import cycle +from collections import deque as ringbuffer +import curses +from datetime import timedelta +from itertools import count import logging -from multiprocessing import Process, Queue -from random import randint import queue +from queue import Queue import sys -from time import sleep -from typing import List +from threading import Event, Lock, Thread +from time import sleep, time + +from .icmp import * +from .icmp import _ping +from .cursedlogger import CursesHandler -import graphviz -from icmplib import Hop, traceroute -from icmplib.utils import * -import requests import pytz -from .urping import ping, urping log = logging.getLogger(__name__) parser = argparse.ArgumentParser() parser.add_argument("hosts", nargs="+") - -class Topology(object): - LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0) - - def __init__(self): - self._graph = defaultdict(set) - self._nodes = {self.LOCALHOST.address: self.LOCALHOST} - - def next_hops(self, address: str) -> List[str]: - return list(self._graph.get(address)) - - def node(self, address: str) -> Hop: - return self._nodes.get(address) - - def add_traceroute(self, trace): - for e in trace: - if e.address not in self._nodes: - self._nodes[e.address] = e - else: - e0 = self._nodes[e.address] - e0._packets_sent += e._packets_sent - e0._rtts.extend(e.rtts) - e0._distance = min(e.distance, e0.distance) - - hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [ - (e.address, e.distance) for e in trace - ] - i1 = iter(hosts) - i2 = iter(hosts) - next(i2) - - for (parent, _), (child, _) in zip(i1, i2): - self._graph[parent].add(child) - - def render(self): - g = graphviz.Digraph() - for n in sorted(self._nodes.values(), key=lambda n: n.distance): - g.node(n.address) - for next in self._graph[n.address]: - g.edge(n.address, next) - - # Lol. Lmao. - return requests.post( - "https://dot-to-ascii.ggerganov.com/dot-to-ascii.php", - params={"boxart": 1, "src": g.source}, - ).text - - def __iter__(self): - return iter(sorted(self._nodes.values(), key=lambda n: n.distance)) - - def __delitem__(self, key): - del self._graph[key] - del self._nodes[key] - INTERVAL = 0.5 + Z = pytz.timezone("America/Denver") -def compute_topology(hostlist, topology=None): - """Walk a series of traceroute tuples, computing a 'worst expected latency' topology from them.""" - topology = topology or Topology() - for h in hostlist: - trace = traceroute(h) - # Restrict the trace to hosts which ICMP ping - trace = [e for e in trace if ping(e.address, interval=INTERVAL, count=3).is_alive] - topology.add_traceroute(trace) +class HostState(object): + """A model of a (bounded) time series of host state. - return topology + """ -def pinger(host, queue, next=None): - # Mokney patch the RTT tracking - host._rtts = deque(host._rtts, maxlen=100) - id = randint(1, 1<<16 - 1) - sequence = 0 +class MonitoredHost(object): + def __init__(self, hostname: str, timeout: timedelta, id=None): + self._hostname = hostname + self._timeout = timeout + self._sequence = request_sequence(hostname, timeout, id) + self._lock = Lock() + self._state = ringbuffer(maxlen=360) + self._is_up = False + self._lost = 0 + self._up = 0 - while True: - timeout = min(h.avg_rtt / 1000.0, 0.5) # rtt is in ms but timeout is in sec. - start = datetime.now(tz=Z) + def __call__(self, shutdown: Event, q: Queue): + """Monitor a given host by throwing requests into the queue; intended to be a Thread target.""" - res = ping(host.address, timeout=timeout, interval=INTERVAL, count=3, id=id, sequence=sequence) - sequence += res._packets_sent + while not shutdown.is_set(): + req = next(self._sequence) + resp = _ping(q, req) - queue.put((start, res)) - sleep(INTERVAL) - if res.is_alive: - host._rtts.extend(res._rtts) - host._packets_sent += res._packets_sent + if resp and not self._is_up: + # log.debug(f"Host {self._hostname} is up!") + self._is_up = self._is_up or resp + self._up = resp._time + + elif resp and self._is_up: + # log.debug(f"Host {self._hostname} holding up...") + pass + + elif not resp and self._is_up: + # log.debug(f"Host {self._hostname} is down!") + self._is_up = None + self._up = None + + elif not resp and not self._is_up: + pass + + with self._lock: + if not resp: + self._lost += 1 + if self._state and not self._state[0]: + self._lost -= 1 + + # self._state = (self._state + [req])[:-3600] + self._state.append(resp) + + sleep(1) + + def last(self): + with self._lock: + return next(reversed(self._state), None) + + def last_window(self, duration: timedelta = None): + with self._lock: + l = [] + t = time() - duration.total_seconds() + for i in reversed(self._state): + if not i or i._time > t: + l.insert(0, i) + else: + break + return l + + def loss(self, duration: timedelta): + log = self.last_window(duration) + if log: + return log.count(None) / len(log) + else: + return 0.0 + + def is_up(self, duration: timedelta, threshold = 0.25): + return self.loss(duration) <= threshold + + def last_seen(self, now: datetime) -> timedelta: + if state := self.last(): + return now - datetime.fromtimestamp(state._time) + + def up(self, duration: datetime): + if self._up: + return datetime.fromtimestamp(self._up) + + +def main(): + stdscr = curses.initscr() + maxy, maxx = stdscr.getmaxyx() + + begin_x = 2; begin_y = maxy - 12 + height = 10; width = maxx - 4 + logscr = curses.newwin(height, width, begin_y, begin_x) + + handler = CursesHandler(logscr) + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + handler.setFormatter(formatter) + log.addHandler(handler) + log.setLevel(logging.DEBUG) + + stdscr = curses.newwin(maxy - height - 2, width, 0, begin_x) + + opts, args = parser.parse_known_args() + + q = queue.Queue() + shutdown = Event() + + p = Thread(target=icmp_worker, args=(shutdown, q,)) + p.start() + + hosts = {} + hl = Lock() + threads = {} + + def create_host(address): + if address not in hosts: + log.info(f"Monitoring {address}...") + with hl: + hosts[address] = monitor = MonitoredHost(address, timedelta(seconds=8)) + threads[address] = t = Thread(target=monitor, args=(shutdown, q)) + t.start() + + else: + log.debug(f"Already monitoring {address}...") + + stdscr.refresh() + + def render(): + dt = timedelta(seconds=8) + + with open("incidents.txt", "w") as fp: + incident = False + while not shutdown.is_set(): + rows, cols = stdscr.getmaxyx() + down = 0 + now = datetime.now() + i = 0 + with hl: + for host, i in zip(hosts.values(), count(1)): + name = host._hostname + loss = host.loss(dt) * 100 + state = host.last() + if not state: + down += 1 + last_seen = "Down" + else: + last_seen = f"{host.last_seen(now).total_seconds():.2f}s ago" + + if up := host.up(dt): + up = f" up: {(now - up).total_seconds():.2f}" + else: + up = "" + + stdscr.addstr(i, 2, f"{name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols)) + + stdscr.border() + stdscr.refresh() + + msg = None + if down >= 3 and not incident: + incident = True + msg = f"{datetime.now()} - {down} hosts down" + + elif down < 3 and incident: + incident = False + msg = f"{datetime.now()} - network recovered" + + if i != 0 and msg: + log.info(msg) + fp.write(msg + "\n") + fp.flush() + + sleep(1) + + rt = Thread(target=render) + rt.start() + + def retrace(): + while not shutdown.is_set(): + for h in opts.hosts: + # FIXME: Use a real topology model + for hop in traceroute(q, h): + if ping(q, hop.address).is_alive: + create_host(hop.address) + + sleep(60 * 5) + + tt = Thread(target=retrace) + tt.start() + + try: + while True: + sleep(1) + finally: + curses.endwin() + sys.stdout.flush() + sys.stderr.flush() + shutdown.set() if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - opts, args = parser.parse_known_args() - - now = start = datetime.now(tz=Z) - reconfigure_delay = timedelta(minutes=5) - configure_at = now - reconfigure_delay - flush_delay = timedelta(seconds=1) - flush_at = now + flush_delay - - recovered_duration = timedelta(seconds=5) - dead_duration = timedelta(minutes=30) - - topology = None - - q = Queue() - workers = {} - last_seen = {} - state = {} - - spinner = cycle("|/-\\") - - with open("incidents.txt", "a") as fp: - fp.write("RESTART\n") - while True: - now = datetime.now(tz=Z) - - if flush_at <= now: - fp.flush() - flush_at = now + flush_delay - - if configure_at <= now: - log.info("Attempting to reconfigure network topology...") - try: - topology = compute_topology(opts.hosts, topology) - configure_at = now + reconfigure_delay - log.info("Graph -\n" + topology.render()) - - for h in topology: - if h.distance == 0: - continue - - if h.address in workers: - continue - - else: - n = next(iter(topology.next_hops(h.address)), None) - p = workers[h.address] = Process(target=pinger, args=(h, q, n)) - p.start() - - except Exception as e: - log.exception(e) - - try: - # Revert to "logical now" of whenever the last ping results came in. - now, res = q.get(timeout=0.1) - last = last_seen.get(res.address) - delta = now - last if last else None - - sys.stderr.write("\r" + next(spinner) + " " + f"ICMPResponse({res.address}, {res._rtts}, {res._packets_sent})" + " " * 20) - sys.stderr.flush() - - if res.address not in workers: - pass - - elif res.is_alive: - last_seen[res.address] = now - if last and delta > recovered_duration: - state[res.address] = True - fp.write( - f"RECOVERED\t{res.address}\t{now.isoformat()}\t{delta.total_seconds()}\n" - ) - elif not last: - state[res.address] = True - fp.write(f"UP\t{res.address}\t{now.isoformat()}\n") - - elif not res.is_alive: - if last and delta > dead_duration: - workers[res.address].terminate() - del workers[res.address] - del topology[res.address] - del last_seen[res.address] - del state[res.address] - fp.write( - f"DEAD\t{res.address}\t{now.isoformat()}\t{delta.total_seconds()}\n" - ) - - elif last and delta > recovered_duration and state[res.address]: - fp.write(f"DOWN\t{res.address}\t{now.isoformat()}\n") - state[res.address] = False - - except queue.Empty: - sys.stderr.write("\r" + next(spinner)) - sys.stderr.flush() + main() diff --git a/projects/aloe/src/python/aloe/cursedlogger.py b/projects/aloe/src/python/aloe/cursedlogger.py new file mode 100644 index 0000000..1efacb0 --- /dev/null +++ b/projects/aloe/src/python/aloe/cursedlogger.py @@ -0,0 +1,27 @@ +import logging +import curses + +from collections import deque as ringbuffer +from itertools import count + + +class CursesHandler(logging.Handler): + def __init__(self, screen): + logging.Handler.__init__(self) + self._screen = screen + self._buff = ringbuffer(maxlen=screen.getmaxyx()[0] - 2) + + def emit(self, record): + try: + msg = self.format(record) + "\n" + self._buff.append(msg) + self._screen.clear() + for i, msg in zip(count(1), self._buff): + self._screen.addstr(i, 2, msg) + self._screen.border() + self._screen.refresh() + + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) diff --git a/projects/aloe/src/python/aloe/icmp.py b/projects/aloe/src/python/aloe/icmp.py new file mode 100644 index 0000000..501c001 --- /dev/null +++ b/projects/aloe/src/python/aloe/icmp.py @@ -0,0 +1,242 @@ +"""Tools sitting between aloe and icmplib.""" + +from datetime import datetime, timedelta +from itertools import islice as take +import logging +import queue +from random import randint +from threading import Event, Lock +from time import sleep + +from icmplib.exceptions import ( + ICMPLibError, + ICMPSocketError, + TimeoutExceeded, +) +from icmplib.models import ( + Hop, + Host, + ICMPReply, + ICMPRequest, +) +from icmplib.sockets import ICMPv4Socket +from icmplib.utils import is_hostname, resolve + + +log = logging.getLogger(__name__) + + +def better_repr(self): + elems = ", ".join(f"{slot}={getattr(self, slot)}" for slot in self.__slots__) + return f"<{type(self).__name__} ({elems})>" + + +ICMPRequest.__repr__ = better_repr +ICMPReply.__repr__ = better_repr + + +class ICMPRequestResponse(object): + """A thread-safe request/response structure for pinging a host.""" + + PENDING = object() + TIMEOUT = object() + + def __init__(self, address, request: ICMPRequest, timeout: timedelta): + _timeout = datetime.now() + timeout + self._address = address + self._request = request + self._timeout = _timeout + self._lock = Lock() + self._response = self.PENDING + + def ready(self): + """Determine if this request is still waiting.""" + return self.get() is not self.PENDING + + def get(self): + """Get the response, unless the timeout has passed in which case return a timeout.""" + + with self._lock: + if self._response is not self.PENDING: + return self._response + elif self._timeout < datetime.now(): + return self.TIMEOUT + else: + return self._response + + def set(self, response): + """Set the response, unless the timeout has passed in which case set a timeout.""" + + if isinstance(response, ICMPReply): + with self._lock: + if self._timeout < datetime.now(): + self._response = self.TIMEOUT + else: + # rtt = (reply.time - request.time) * 1000 + self._response = response + + +def icmp_worker(shutdown: Event, q: queue.Queue): + """A worker thread which processes ICMP requests; sending packets and listening for matching responses.""" + + state = {} + + with ICMPv4Socket(None, True) as sock: + while not shutdown.is_set(): + # Send one + try: + item = q.get(block=False, timeout=0.1) + request = item._request + state[(request._id, request._sequence)] = item + # log.info(f"Sending request {item._request!r}") + sock.send(item._request) + except (ICMPLibError, ICMPSocketError, queue.Empty): + pass + + # Recieve one + try: + if response := sock.receive(None, 0.1): + key = (response.id, response.sequence) + if key in state: + # log.info(f"Got response {response!r}") + state[key].set(response) + del state[key] + else: + # log.warning(f"Recieved non-matching response {response!r}") + pass + except (ICMPLibError, ICMPSocketError): + pass + + # GC one + if key := next(iter(state.keys()), None): + if state[key].ready(): + del state[key] + + # Sleep one + sleep(0.1) + + +def traceroute(q: queue.Queue, + address: str, + first_hop: int = 1, + max_hops: int = 32, + count: int = 3, + id: int = None, + family: int = None): + if is_hostname(address): + address = resolve(address, family)[0] + + mask = ((1<<16) - 1) + id = id or randint(1, mask) & 0xFFFF + ttl = first_hop + host_reached = False + hops = [] + + while not host_reached and ttl <= max_hops: + reply = None + packets_sent = 0 + rtts = [] + + for sequence in range(count): + request = ICMPRequestResponse( + address, + ICMPRequest( + destination=address, + id=id, + sequence=sequence, + ttl=ttl + ), + timedelta(seconds=1), + ) + + q.put(request) + while not request.ready(): + sleep(0.1) + + _reply = request.get() + if _reply is ICMPRequestResponse.TIMEOUT: + sleep(0.1) + continue + + elif _reply: + reply = reply or _reply + try: + reply.raise_for_status() + host_reached = True + except ICMPLibError: + pass + + rtt = (reply.time - request._request.time) * 1000 + rtts.append(rtt) + + if reply: + hops.append( + Hop( + address=reply.source, + packets_sent=packets_sent, + rtts=rtts, + distance=ttl + ) + ) + + ttl += 1 + + return hops + + +def request_sequence(hostname: str, + timeout: timedelta, + id: int = None, + family: int = None): + """Generate a sequence of requests monitoring a specific, usable as a request source for a ping.""" + + if is_hostname(hostname): + destination = resolve(hostname, family)[0] + else: + destination = hostname + + mask = ((1<<16) - 1) + id = id or randint(1, mask) & 0xFFFF + sequence = 1 + + while True: + yield ICMPRequestResponse( + hostname, + ICMPRequest( + destination=destination, + id=id, + sequence=sequence & mask, + ), + timeout + ) + sequence += 1 + + +def _ping(q: queue.Queue, request: ICMPRequestResponse): + q.put(request) + while not request.ready(): + sleep(0.1) + + _response = request.get() + if _response is not ICMPRequestResponse.TIMEOUT: + return _response + + +def ping(q: queue.Queue, + address: str, + count: int = 3, + id: int = None, + family: int = None) -> Host: + """Ping a host N times.""" + + rtts = [] + for request in take(request_sequence(address, timedelta(seconds=1)), count): + if reply := _ping(q, request): + rtt = (reply.time - request._request.time) * 1000 + rtts.append(rtt) + + return Host( + address=address, + packets_sent=count, + rtts=rtts, + ) diff --git a/projects/aloe/src/python/aloe/ringbuffer.py b/projects/aloe/src/python/aloe/ringbuffer.py new file mode 100644 index 0000000..fdfca84 --- /dev/null +++ b/projects/aloe/src/python/aloe/ringbuffer.py @@ -0,0 +1,23 @@ +"""An implementation of ring buffers which supports efficient reverse scans.""" + +class Ringbuffer(object): + def __init__(self, size: int, seed: list = None, fill: object = None): + if seed: + assert len(seed) <= size + self._state = ((seed or []) + [fill] * size)[:size] + self._size = size + self._start = 0 + self._end = len(seed) if seed else 0 + + def append(self, obj): + self._state[self._end % self._size] = obj + self._end = (self._end + 1) % self._size + self._start = (self._start + 1) % self._size + + def __iter__(self): + if self._start < self._end: + yield from iter(self._state[self._start:self._end]) + else: + yield from iter(self._state[self._start:] + self._state[:self._end]) + + def __reversed__(self): diff --git a/projects/aloe/src/python/aloe/topology.py b/projects/aloe/src/python/aloe/topology.py new file mode 100644 index 0000000..0991c7e --- /dev/null +++ b/projects/aloe/src/python/aloe/topology.py @@ -0,0 +1,68 @@ +""" +A model of a (radial) network topology. +""" + +from collections import defaultdict +import logging +from typing import List + +import graphviz +from icmplib import Hop +import requests + + +log = logging.getLogger(__name__) + + +class Topology(object): + LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0) + + def __init__(self): + self._graph = defaultdict(set) + self._nodes = {self.LOCALHOST.address: self.LOCALHOST} + + def next_hops(self, address: str) -> List[str]: + return list(self._graph.get(address)) + + def node(self, address: str) -> Hop: + return self._nodes.get(address) + + def add_traceroute(self, trace): + for e in trace: + if e.address not in self._nodes: + self._nodes[e.address] = e + else: + e0 = self._nodes[e.address] + e0._packets_sent += e._packets_sent + e0._rtts.extend(e.rtts) + e0._distance = min(e.distance, e0.distance) + + hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [ + (e.address, e.distance) for e in trace + ] + i1 = iter(hosts) + i2 = iter(hosts) + next(i2) + + for (parent, _), (child, _) in zip(i1, i2): + self._graph[parent].add(child) + + def render(self): + g = graphviz.Digraph() + for n in sorted(self._nodes.values(), key=lambda n: n.distance): + g.node(n.address) + for next in self._graph[n.address]: + g.edge(n.address, next) + + # Lol. Lmao. + return requests.post( + "https://dot-to-ascii.ggerganov.com/dot-to-ascii.php", + params={"boxart": 1, "src": g.source}, + ).text + + def __iter__(self): + return iter(sorted(self._nodes.values(), key=lambda n: n.distance)) + + def __delitem__(self, key): + del self._graph[key] + del self._nodes[key] diff --git a/projects/aloe/src/python/aloe/urping.py b/projects/aloe/src/python/aloe/urping.py deleted file mode 100644 index 4fba62a..0000000 --- a/projects/aloe/src/python/aloe/urping.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python3 - -import logging -from time import sleep -import traceback -from textwrap import indent -from random import randint -import sys - -from icmplib.exceptions import ( - ICMPLibError, - TimeExceeded, - TimeoutExceeded, -) -from icmplib.models import Host, Hop, ICMPRequest, ICMPReply -from icmplib.sockets import ( - ICMPv4Socket, - ICMPv6Socket, -) -from icmplib.utils import * - - -log = logging.getLogger(__name__) - - -def better_repr(self): - elems = ", ".join(f"{slot}={getattr(self, slot)}" for slot in self.__slots__) - return f"<{type(self).__name__} ({elems})>" - - -ICMPRequest.__repr__ = better_repr -ICMPReply.__repr__ = better_repr - - -def urping(address: str, - hops: int, - via: str, - family=None, - count=3, - fudge=4, - id=None, - interval=0.2, - timeout=2, - source=None, - **kwargs) -> Hop: - """Ur-ping by (ab)using ICMP TTLs. - - Craft an ICMP message which would go one (or more) hops FARTHER than the `address` host, routed towards `via`. - Send `count * fudge` packets, looking for responses from `address`. - Responses from `address` are considered; and a `Hop` is built from those results. - Other responses from other hosts are discarded. - - """ - - if is_hostname(via): - via = resolve(via, family)[0] - - if is_hostname(address): - address = resolve(address, falmiy)[0] - - if is_ipv6_address(via): - _Socket = ICMPv6Socket - else: - _Socket = ICMPv4Socket - - ttl = hops - hop = Hop(address, 0, [], hops) - packets_sent = 0 - - with _Socket(source) as sock: - for _ in range(count * fudge): - request = ICMPRequest( - destination=via, - # Note that we act like this is a new stream with a new ID and sequence each time to try and fool multipathing. - id=id or unique_identifier(), - sequence=0, - ttl=ttl, - **kwargs) - - try: - sock.send(request) - packets_sent += 1 - - reply = None - reply = sock.receive(request, timeout) - rtt = (reply.time - request.time) * 1000 - - reply.raise_for_status() - - assert reply.id == request.id - assert reply.sequence == request.sequence - assert reply.source == address - - hop._packets_sent += 1 - hop._rtts.append(rtt) - - if hop._packets_sent >= count: - break - - except AssertionError: - log.warning("Got response from unexpected node %s (expected %s) %r for request %4", reply.source, address, reply, request) - - except (TimeoutExceeded, TimeExceeded): - pass - - except ICMPLibError as e: - log.exception(e) - break - - sleep(interval) - - return hop - - -def ping(address, count=4, interval=1, timeout=2, id=None, source=None, - family=None, privileged=True, sequence=0, **kwargs): - """A simple, if paranoid, ping.""" - if is_hostname(address): - address = resolve(address, family)[0] - - if is_ipv6_address(address): - _Socket = ICMPv6Socket - else: - _Socket = ICMPv4Socket - - id = id or randint(1, 1<<16 - 1) & 0xFFFF - packets_sent = 0 - rtts = [] - - with _Socket(source, privileged) as sock: - for base in range(count): - sequence = (sequence + base) & 0xFFFF - if base > 0: - sleep(interval) - - request = ICMPRequest( - destination=address, - id=id, - sequence=sequence, - **kwargs) - - try: - sock.send(request) - packets_sent += 1 - - reply = sock.receive(request, timeout) - reply.raise_for_status() - - assert reply.id == request.id - assert reply.sequence == request.sequence - assert reply.source == address - - rtt = (reply.time - request.time) * 1000 - rtts.append(rtt) - - except AssertionError as e: - exc_type, exc_value, exc_traceback = sys.exc_info() - log.warning("Got erroneous response:\n request: %r\n reply: %r\n err: |\n%s", request, reply, indent(traceback.format_exc(), " ")) - - except ICMPLibError: - pass - - return Host(address, packets_sent, rtts)