From a0766fc42e0d6bd2be5b177c6d0d34039f48c01d Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Sat, 20 Nov 2021 22:05:45 -0700 Subject: [PATCH] Overhaul to a considerably more useful multiprocess state --- projects/aloe/BUILD | 3 + projects/aloe/src/python/aloe/__main__.py | 195 ++++++++++++------- projects/aloe/src/python/aloe/lib.py | 91 +++++++++ projects/aloe/src/python/ping.py | 18 -- projects/aloe/src/python/traceroute.py | 48 ----- projects/aloe/test/python/test_traceroute.py | 31 --- tools/python/requirements.txt | 2 + 7 files changed, 222 insertions(+), 166 deletions(-) create mode 100644 projects/aloe/src/python/aloe/lib.py delete mode 100644 projects/aloe/src/python/ping.py delete mode 100644 projects/aloe/src/python/traceroute.py delete mode 100644 projects/aloe/test/python/test_traceroute.py diff --git a/projects/aloe/BUILD b/projects/aloe/BUILD index 2e06bd6..d7a236d 100644 --- a/projects/aloe/BUILD +++ b/projects/aloe/BUILD @@ -7,5 +7,8 @@ zapp_binary( main = "src/python/aloe/__main__.py", deps = [ ":lib", + py_requirement("graphviz"), + py_requirement("icmplib"), + py_requirement("requests"), ], ) diff --git a/projects/aloe/src/python/aloe/__main__.py b/projects/aloe/src/python/aloe/__main__.py index 8425b9c..134f310 100644 --- a/projects/aloe/src/python/aloe/__main__.py +++ b/projects/aloe/src/python/aloe/__main__.py @@ -6,15 +6,19 @@ when packet delivery latencies radically degrade and maintain a report file. """ -import sys import argparse -import logging +from collections import defaultdict, deque from datetime import datetime, timedelta -from ping import ping -from traceroute import TraceElem, traceroute -from subprocess import CalledProcessError -from typing import NamedTuple -from collections import defaultdict +import logging +from multiprocessing import Process, Queue +import queue +import sys +from typing import List + +from .lib import * + +import graphviz +import requests log = logging.getLogger(__name__) @@ -35,103 +39,156 @@ def distinct(iter): return l -class Host(NamedTuple): - hostname: str - ip: str - rank: int - latency: timedelta - samples: int = 1 - - def mean_latency(self): - return self.latency / self.samples - - class Topology(object): - LOCALHOST = Host("localhost", "127.0.0.1", 0, timedelta(seconds=0.1)) + LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0) def __init__(self): - self._graph = defaultdict(set) # Dict[ip, List[ip]] - self._nodes = {self.LOCALHOST.ip: self.LOCALHOST} # Dict[ip, Host] + self._graph = defaultdict(set) # Dict[address, List[address]] + self._nodes = {self.LOCALHOST.address: self.LOCALHOST} # Dict[address, Host] + + def next_hops(self, address: str) -> List[str]: + return list(self._graph.get(address)) + + def node(self, address: str) -> Hop: + return self._nodes.get(address) def add_traceroute(self, trace): - trace = list(trace) - hosts = [] - newhosts = [self.LOCALHOST.ip] - rank = 0 for e in trace: - if e.ip not in self._nodes: - self._nodes[e.ip] = Host(e.hostname, e.ip, e.rank, e.latency, 1) + if e.address not in self._nodes: + self._nodes[e.address] = e else: - self._nodes[e.ip] = Host(e.hostname, e.ip, e.rank, e.latency + self._nodes[e.ip].latency, self._nodes[e.ip].samples + 1) + e0 = self._nodes[e.address] + e0._packets_sent += e._packets_sent + e0._rtts.extend(e.rtts) + e0._distance = max(e.distance, e0.distance) - if e.rank > rank: - if newhosts: - for h2 in newhosts: - for h1 in hosts: - self._graph[h1].add(h2) - hosts = newhosts - newhosts = [] - rank = e.rank + hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [(e.address, e.distance) for e in trace] + i1 = iter(hosts) + i2 = iter(hosts) + next(i2) - if e.rank == rank: - newhosts.append(e.ip) + for (parent, _), (child, _) in zip(i1, i2): + self._graph[parent].add(child) def render(self): - for n in sorted(self._nodes.values(), key=lambda n: n.rank): - print(f"{n.hostname} ({n.ip}) => {self._graph[n.ip]}") + g = graphviz.Digraph() + for n in sorted(self._nodes.values(), key=lambda n: n.distance): + g.node(n.address) + for next in self._graph[n.address]: + g.edge(n.address, next) + # Lol. Lmao. + return requests.post("https://dot-to-ascii.ggerganov.com/dot-to-ascii.php", params={"boxart":1, "src": g.source}).text + # return g.source -def compute_topology(hostlist): + def __iter__(self): + return iter(sorted(self._nodes.values(), key=lambda n: n.distance)) + + def __delitem__(self, key): + del self._graph[key] + del self._nodes[key] + +def compute_topology(hostlist, topology=None): """Walk a series of traceroute tuples, computing a 'worst expected latency' topology from them.""" - topology = Topology() + topology = topology or Topology() for h in hostlist: - topology.add_traceroute(traceroute(h)) + trace = traceroute(h) + # Restrict the trace to hosts which ICMP ping + trace = [e for e in trace if ping(e.address, count=1).is_alive] + topology.add_traceroute(trace) - return sorted(topology._nodes.values(), key=lambda n: n.rank) + return topology + + +def cycle(iter): + while True: + for e in iter: + yield e + + +def pinger(host, id, queue): + # Mokney patch the RTT tracking + host._rtts = deque(host._rtts, maxlen=100) + while True: + timeout = h.avg_rtt * 2 / 1000.0 # rtt is in ms but timeout is in sec. + start = datetime.now() + res = ping(host.address, id=id, timeout=timeout, count=3) + queue.put((start, res)) + if res.is_alive: + host._rtts.extend(res._rtts) + host._packets_sent += res._packets_sent if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) opts, args = parser.parse_known_args() now = start = datetime.now() reconfigure_delay = timedelta(minutes=5) configure_at = now - reconfigure_delay + flush_delay = timedelta(seconds=5) + flush_at = now + flush_delay - topology = [] + recovered_duration = timedelta(seconds=5) + dead_duration = timedelta(seconds=30) + + topology = None + id = unique_identifier() + + q = Queue() + workers = {} + last_seen = {} + + spinner = cycle("|/-\\") with open("incidents.txt", "a") as fp: while True: now = datetime.now() + if flush_at <= now: + fp.flush() + flush_at = now + flush_delay + if configure_at <= now: log.info("Attempting to reconfigure network topology...") - try: - topology = compute_topology(opts.hosts) - configure_at = now + reconfigure_delay - for h in topology: - log.info(f"in topology {h}") - except CalledProcessError: + topology = compute_topology(opts.hosts, topology) + configure_at = now + reconfigure_delay + log.info("Graph -\n" + topology.render()) + + for h in topology: + if h.distance < 1 or h.distance > 6: + continue + + if h.address in workers: + continue + + else: + p = workers[h.address] = Process(target=pinger, args=(h, id, q)) + p.start() + + try: + timestamp, res = q.get(timeout=0.1) + last = last_seen.get(res.address) + + if res.address not in workers: pass - for h in topology: - if h.rank == 0: - continue + elif res.is_alive: + if last and (delta := timestamp - last) > recovered_duration: + fp.write(f"RECOVERED\t{res.address}\t{timestamp.isoformat()}\t{delta.total_seconds()}\n") + last_seen[res.address] = timestamp - fail = False - try: - if ping(h.ip, timeout=h.mean_latency() * 2) != 0: - fail = True - except Exception as e: - fail = True - log.exception(e) + elif not res.is_alive: + if last and (delta := timestamp - last) > dead_duration: + workers[h.address].terminate() + del workers[h.address] + del topology[h.address] + fp.write(f"DEAD\t{res.address}\t{timestamp.isoformat()}\t{delta.total_seconds()}\n") - if fail: - msg = f"{datetime.now()} failed to reach {h.hostname} ({h.ip})" - log.warning(msg) - fp.write(msg + "\n") + else: + fp.write(f"DOWN\t{res.address}\t{timestamp.isoformat()}\n") - else: - sys.stderr.write('.') - sys.stderr.flush() + except queue.Empty: + sys.stderr.write("\r" + next(spinner)) + sys.stderr.flush() diff --git a/projects/aloe/src/python/aloe/lib.py b/projects/aloe/src/python/aloe/lib.py new file mode 100644 index 0000000..e04c1fd --- /dev/null +++ b/projects/aloe/src/python/aloe/lib.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 + +import logging +from time import sleep + +from icmplib import Hop +from icmplib.exceptions import ( + ICMPLibError, + TimeExceeded, +) +from icmplib.models import Hop, ICMPRequest +from icmplib.sockets import ( + ICMPv4Socket, + ICMPv6Socket, +) +from icmplib.utils import * + + +log = logging.getLogger(__name__) + + +def urping(address: str, hops: int, via: str, family=None, count=3, fudge=4, interval=0.05, timeout=2, source=None, **kwargs) -> Hop: + """Ur-ping by (ab)using ICMP TTLs. + + Craft an ICMP message which would go one (or more) hops FARTHER than the `address` host, routed towards `via`. + Send `count * fudge` packets, looking for responses from `address`. + Responses from `address` are considered; and a `Hop` is built from those results. + Other responses from other hosts are discarded. + + """ + + if is_hostname(via): + via = resolve(via, family)[0] + + if is_hostname(address): + address = resolve(address, falmiy)[0] + + if is_ipv6_address(via): + _Socket = ICMPv6Socket + else: + _Socket = ICMPv4Socket + + ttl = hops + hop = Hop(address, 0, []) + packets_sent = 0 + + with _Socket(source) as sock: + for _ in range(count * fudge): + request = ICMPRequest( + destination=via, + # Note that we act like this is a new stream with a new ID and sequence each time to try and fool multipathing. + id=unique_identifier(), + sequence=0, + ttl=ttl, + **kwargs) + + try: + sock.send(request) + packets_sent += 1 + + reply = None + reply = sock.receive(request, timeout) + rtt = (reply.time - request.time) * 1000 + + reply.raise_for_status() + + except TimeExceeded: + sleep(interval) + + except ICMPLibError as e: + log.exception(e) + break + + if not reply: + log.debug(f"got no reply from {address}") + else: + # Because the default repr() is crap + reply_str = "ICMPReply({})".format(", ".join(slot + "=" + str(getattr(reply, slot)) for slot in reply.__slots__)) + log.debug(f"Pinging {address} (distance {hops}) via {via} got reply {reply_str}") + + if reply and reply.source != address: + continue + + elif reply: + hop._packets_sent += 1 + hop._rtts.append(rtt) + + if hop and hop._packets_sent >= count: + break + + return hop diff --git a/projects/aloe/src/python/ping.py b/projects/aloe/src/python/ping.py deleted file mode 100644 index 402f8aa..0000000 --- a/projects/aloe/src/python/ping.py +++ /dev/null @@ -1,18 +0,0 @@ -"""A shitty ping wrapper.""" - - -from datetime import timedelta -from subprocess import check_call, DEVNULL - - -def ping(host: str, - count: int = 3, - interval: float = 0.3, - timeout: timedelta = timedelta(seconds=3)): - return check_call(["ping", "-q", - "-i", str(interval), - "-c", str(count), - "-W", str(timeout.total_seconds()), - host], - stdout=DEVNULL, - stderr=DEVNULL,) diff --git a/projects/aloe/src/python/traceroute.py b/projects/aloe/src/python/traceroute.py deleted file mode 100644 index d026f93..0000000 --- a/projects/aloe/src/python/traceroute.py +++ /dev/null @@ -1,48 +0,0 @@ -"""A shitty traceroute wrapper.""" - -from datetime import timedelta -import re -from subprocess import ( - CalledProcessError, - check_call, - check_output, - DEVNULL, -) -from typing import Iterator, List, NamedTuple - - -class TraceElem(NamedTuple): - hostname: str - ip: str - latency: timedelta - rank: int - - -_LINE = re.compile(r"\*|(((?P[-_\w\d\.]*)\s+\((?P[a-f\d\.:]*)\)\s+)?(?P[\d\.]*) ms)") - - -def _parse_traceroute(lines: List[str]) -> Iterator[TraceElem]: - for rank, l in zip(range(1, 1<<64), lines): - ip = None - hostname = None - for m in re.finditer(_LINE, l): - if m.group("latency"): - ip = m.group("ip") or ip - hostname = m.group("hostname") or hostname - yield TraceElem(hostname, ip, timedelta(milliseconds=float(m.group("latency"))), rank) - - -def traceroute(host: str, icmp=True, timeout=timedelta(seconds=5)) -> Iterator[TraceElem]: - # FIXME: Make ICMP mode an option, not on always - yield from _parse_traceroute( - check_output(["traceroute", - # Set wait; note use of total_seconds which is "real" valued. - "-w", str(timeout.total_seconds()), - # Use ICMP probes same as PING. - # This means all probed hosts will be pingable/ping compliant. - # May miss parts of the topology as a result. - "-I", - host], - stderr=DEVNULL,) - .decode("utf-8") - .splitlines()) diff --git a/projects/aloe/test/python/test_traceroute.py b/projects/aloe/test/python/test_traceroute.py deleted file mode 100644 index 3a78199..0000000 --- a/projects/aloe/test/python/test_traceroute.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python3 - -from typing import List -from traceroute import _parse_traceroute, TraceElem -from datetime import timedelta - -import pytest - - -def parse_traceroute(lines): - """Testing helper.""" - return list(_parse_traceroute(lines)) - - -@pytest.mark.parametrize("example, expected", [ - # Basic case, one match - ("3 10.60.142.2 (10.60.142.2) 117.502 ms", - [TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.502))]), - # Multiple matches on one line - ("3 10.60.142.2 (10.60.142.2) 117.502 ms 10.60.142.3 (10.60.142.3) 75.624 ms 10.60.142.2 (10.60.142.2) 117.709 ms", - [TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.502)), - TraceElem("10.60.142.3", "10.60.142.3", timedelta(milliseconds=75.624)), - TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.709))]), - # Context sensitive case - traceroute doesn't always print the host & IP. - ("7 ae-501-ar01.denver.co.denver.comcast.net (96.216.22.130) 41.920 ms 41.893 ms 74.385 ms", - [TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=41.920)), - TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=41.893)), - TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=74.385))]), -]) -def test_examples(example: str, expected: List[TraceElem]): - assert parse_traceroute(example.splitlines()) == expected diff --git a/tools/python/requirements.txt b/tools/python/requirements.txt index 234d2d5..e34aa63 100644 --- a/tools/python/requirements.txt +++ b/tools/python/requirements.txt @@ -16,7 +16,9 @@ docutils==0.17 ExifRead==2.3.2 flake8==3.9.2 Flask==2.0.1 +graphviz==0.18.2 hypothesis==6.14.5 +icmplib==3.0.2 idna==2.10 imagesize==1.2.0 importlib-metadata==4.0.1