Overhaul to a considerably more useful multiprocess state

This commit is contained in:
Reid 'arrdem' McKenzie 2021-11-20 22:05:45 -07:00
parent a4cd4568cf
commit a0766fc42e
7 changed files with 222 additions and 166 deletions

View file

@ -7,5 +7,8 @@ zapp_binary(
main = "src/python/aloe/__main__.py", main = "src/python/aloe/__main__.py",
deps = [ deps = [
":lib", ":lib",
py_requirement("graphviz"),
py_requirement("icmplib"),
py_requirement("requests"),
], ],
) )

View file

@ -6,15 +6,19 @@ when packet delivery latencies radically degrade and maintain a report file.
""" """
import sys
import argparse import argparse
import logging from collections import defaultdict, deque
from datetime import datetime, timedelta from datetime import datetime, timedelta
from ping import ping import logging
from traceroute import TraceElem, traceroute from multiprocessing import Process, Queue
from subprocess import CalledProcessError import queue
from typing import NamedTuple import sys
from collections import defaultdict from typing import List
from .lib import *
import graphviz
import requests
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -35,103 +39,156 @@ def distinct(iter):
return l return l
class Host(NamedTuple):
hostname: str
ip: str
rank: int
latency: timedelta
samples: int = 1
def mean_latency(self):
return self.latency / self.samples
class Topology(object): class Topology(object):
LOCALHOST = Host("localhost", "127.0.0.1", 0, timedelta(seconds=0.1)) LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0)
def __init__(self): def __init__(self):
self._graph = defaultdict(set) # Dict[ip, List[ip]] self._graph = defaultdict(set) # Dict[address, List[address]]
self._nodes = {self.LOCALHOST.ip: self.LOCALHOST} # Dict[ip, Host] self._nodes = {self.LOCALHOST.address: self.LOCALHOST} # Dict[address, Host]
def next_hops(self, address: str) -> List[str]:
return list(self._graph.get(address))
def node(self, address: str) -> Hop:
return self._nodes.get(address)
def add_traceroute(self, trace): def add_traceroute(self, trace):
trace = list(trace)
hosts = []
newhosts = [self.LOCALHOST.ip]
rank = 0
for e in trace: for e in trace:
if e.ip not in self._nodes: if e.address not in self._nodes:
self._nodes[e.ip] = Host(e.hostname, e.ip, e.rank, e.latency, 1) self._nodes[e.address] = e
else: else:
self._nodes[e.ip] = Host(e.hostname, e.ip, e.rank, e.latency + self._nodes[e.ip].latency, self._nodes[e.ip].samples + 1) e0 = self._nodes[e.address]
e0._packets_sent += e._packets_sent
e0._rtts.extend(e.rtts)
e0._distance = max(e.distance, e0.distance)
if e.rank > rank: hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [(e.address, e.distance) for e in trace]
if newhosts: i1 = iter(hosts)
for h2 in newhosts: i2 = iter(hosts)
for h1 in hosts: next(i2)
self._graph[h1].add(h2)
hosts = newhosts
newhosts = []
rank = e.rank
if e.rank == rank: for (parent, _), (child, _) in zip(i1, i2):
newhosts.append(e.ip) self._graph[parent].add(child)
def render(self): def render(self):
for n in sorted(self._nodes.values(), key=lambda n: n.rank): g = graphviz.Digraph()
print(f"{n.hostname} ({n.ip}) => {self._graph[n.ip]}") for n in sorted(self._nodes.values(), key=lambda n: n.distance):
g.node(n.address)
for next in self._graph[n.address]:
g.edge(n.address, next)
# Lol. Lmao.
return requests.post("https://dot-to-ascii.ggerganov.com/dot-to-ascii.php", params={"boxart":1, "src": g.source}).text
# return g.source
def compute_topology(hostlist): def __iter__(self):
return iter(sorted(self._nodes.values(), key=lambda n: n.distance))
def __delitem__(self, key):
del self._graph[key]
del self._nodes[key]
def compute_topology(hostlist, topology=None):
"""Walk a series of traceroute tuples, computing a 'worst expected latency' topology from them.""" """Walk a series of traceroute tuples, computing a 'worst expected latency' topology from them."""
topology = Topology() topology = topology or Topology()
for h in hostlist: for h in hostlist:
topology.add_traceroute(traceroute(h)) trace = traceroute(h)
# Restrict the trace to hosts which ICMP ping
trace = [e for e in trace if ping(e.address, count=1).is_alive]
topology.add_traceroute(trace)
return sorted(topology._nodes.values(), key=lambda n: n.rank) return topology
def cycle(iter):
while True:
for e in iter:
yield e
def pinger(host, id, queue):
# Mokney patch the RTT tracking
host._rtts = deque(host._rtts, maxlen=100)
while True:
timeout = h.avg_rtt * 2 / 1000.0 # rtt is in ms but timeout is in sec.
start = datetime.now()
res = ping(host.address, id=id, timeout=timeout, count=3)
queue.put((start, res))
if res.is_alive:
host._rtts.extend(res._rtts)
host._packets_sent += res._packets_sent
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.DEBUG)
opts, args = parser.parse_known_args() opts, args = parser.parse_known_args()
now = start = datetime.now() now = start = datetime.now()
reconfigure_delay = timedelta(minutes=5) reconfigure_delay = timedelta(minutes=5)
configure_at = now - reconfigure_delay configure_at = now - reconfigure_delay
flush_delay = timedelta(seconds=5)
flush_at = now + flush_delay
topology = [] recovered_duration = timedelta(seconds=5)
dead_duration = timedelta(seconds=30)
topology = None
id = unique_identifier()
q = Queue()
workers = {}
last_seen = {}
spinner = cycle("|/-\\")
with open("incidents.txt", "a") as fp: with open("incidents.txt", "a") as fp:
while True: while True:
now = datetime.now() now = datetime.now()
if flush_at <= now:
fp.flush()
flush_at = now + flush_delay
if configure_at <= now: if configure_at <= now:
log.info("Attempting to reconfigure network topology...") log.info("Attempting to reconfigure network topology...")
try: topology = compute_topology(opts.hosts, topology)
topology = compute_topology(opts.hosts) configure_at = now + reconfigure_delay
configure_at = now + reconfigure_delay log.info("Graph -\n" + topology.render())
for h in topology:
log.info(f"in topology {h}") for h in topology:
except CalledProcessError: if h.distance < 1 or h.distance > 6:
continue
if h.address in workers:
continue
else:
p = workers[h.address] = Process(target=pinger, args=(h, id, q))
p.start()
try:
timestamp, res = q.get(timeout=0.1)
last = last_seen.get(res.address)
if res.address not in workers:
pass pass
for h in topology: elif res.is_alive:
if h.rank == 0: if last and (delta := timestamp - last) > recovered_duration:
continue fp.write(f"RECOVERED\t{res.address}\t{timestamp.isoformat()}\t{delta.total_seconds()}\n")
last_seen[res.address] = timestamp
fail = False elif not res.is_alive:
try: if last and (delta := timestamp - last) > dead_duration:
if ping(h.ip, timeout=h.mean_latency() * 2) != 0: workers[h.address].terminate()
fail = True del workers[h.address]
except Exception as e: del topology[h.address]
fail = True fp.write(f"DEAD\t{res.address}\t{timestamp.isoformat()}\t{delta.total_seconds()}\n")
log.exception(e)
if fail: else:
msg = f"{datetime.now()} failed to reach {h.hostname} ({h.ip})" fp.write(f"DOWN\t{res.address}\t{timestamp.isoformat()}\n")
log.warning(msg)
fp.write(msg + "\n")
else: except queue.Empty:
sys.stderr.write('.') sys.stderr.write("\r" + next(spinner))
sys.stderr.flush() sys.stderr.flush()

View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
import logging
from time import sleep
from icmplib import Hop
from icmplib.exceptions import (
ICMPLibError,
TimeExceeded,
)
from icmplib.models import Hop, ICMPRequest
from icmplib.sockets import (
ICMPv4Socket,
ICMPv6Socket,
)
from icmplib.utils import *
log = logging.getLogger(__name__)
def urping(address: str, hops: int, via: str, family=None, count=3, fudge=4, interval=0.05, timeout=2, source=None, **kwargs) -> Hop:
"""Ur-ping by (ab)using ICMP TTLs.
Craft an ICMP message which would go one (or more) hops FARTHER than the `address` host, routed towards `via`.
Send `count * fudge` packets, looking for responses from `address`.
Responses from `address` are considered; and a `Hop` is built from those results.
Other responses from other hosts are discarded.
"""
if is_hostname(via):
via = resolve(via, family)[0]
if is_hostname(address):
address = resolve(address, falmiy)[0]
if is_ipv6_address(via):
_Socket = ICMPv6Socket
else:
_Socket = ICMPv4Socket
ttl = hops
hop = Hop(address, 0, [])
packets_sent = 0
with _Socket(source) as sock:
for _ in range(count * fudge):
request = ICMPRequest(
destination=via,
# Note that we act like this is a new stream with a new ID and sequence each time to try and fool multipathing.
id=unique_identifier(),
sequence=0,
ttl=ttl,
**kwargs)
try:
sock.send(request)
packets_sent += 1
reply = None
reply = sock.receive(request, timeout)
rtt = (reply.time - request.time) * 1000
reply.raise_for_status()
except TimeExceeded:
sleep(interval)
except ICMPLibError as e:
log.exception(e)
break
if not reply:
log.debug(f"got no reply from {address}")
else:
# Because the default repr() is crap
reply_str = "ICMPReply({})".format(", ".join(slot + "=" + str(getattr(reply, slot)) for slot in reply.__slots__))
log.debug(f"Pinging {address} (distance {hops}) via {via} got reply {reply_str}")
if reply and reply.source != address:
continue
elif reply:
hop._packets_sent += 1
hop._rtts.append(rtt)
if hop and hop._packets_sent >= count:
break
return hop

View file

@ -1,18 +0,0 @@
"""A shitty ping wrapper."""
from datetime import timedelta
from subprocess import check_call, DEVNULL
def ping(host: str,
count: int = 3,
interval: float = 0.3,
timeout: timedelta = timedelta(seconds=3)):
return check_call(["ping", "-q",
"-i", str(interval),
"-c", str(count),
"-W", str(timeout.total_seconds()),
host],
stdout=DEVNULL,
stderr=DEVNULL,)

View file

@ -1,48 +0,0 @@
"""A shitty traceroute wrapper."""
from datetime import timedelta
import re
from subprocess import (
CalledProcessError,
check_call,
check_output,
DEVNULL,
)
from typing import Iterator, List, NamedTuple
class TraceElem(NamedTuple):
hostname: str
ip: str
latency: timedelta
rank: int
_LINE = re.compile(r"\*|(((?P<hostname>[-_\w\d\.]*)\s+\((?P<ip>[a-f\d\.:]*)\)\s+)?(?P<latency>[\d\.]*) ms)")
def _parse_traceroute(lines: List[str]) -> Iterator[TraceElem]:
for rank, l in zip(range(1, 1<<64), lines):
ip = None
hostname = None
for m in re.finditer(_LINE, l):
if m.group("latency"):
ip = m.group("ip") or ip
hostname = m.group("hostname") or hostname
yield TraceElem(hostname, ip, timedelta(milliseconds=float(m.group("latency"))), rank)
def traceroute(host: str, icmp=True, timeout=timedelta(seconds=5)) -> Iterator[TraceElem]:
# FIXME: Make ICMP mode an option, not on always
yield from _parse_traceroute(
check_output(["traceroute",
# Set wait; note use of total_seconds which is "real" valued.
"-w", str(timeout.total_seconds()),
# Use ICMP probes same as PING.
# This means all probed hosts will be pingable/ping compliant.
# May miss parts of the topology as a result.
"-I",
host],
stderr=DEVNULL,)
.decode("utf-8")
.splitlines())

View file

@ -1,31 +0,0 @@
#!/usr/bin/env python3
from typing import List
from traceroute import _parse_traceroute, TraceElem
from datetime import timedelta
import pytest
def parse_traceroute(lines):
"""Testing helper."""
return list(_parse_traceroute(lines))
@pytest.mark.parametrize("example, expected", [
# Basic case, one match
("3 10.60.142.2 (10.60.142.2) 117.502 ms",
[TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.502))]),
# Multiple matches on one line
("3 10.60.142.2 (10.60.142.2) 117.502 ms 10.60.142.3 (10.60.142.3) 75.624 ms 10.60.142.2 (10.60.142.2) 117.709 ms",
[TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.502)),
TraceElem("10.60.142.3", "10.60.142.3", timedelta(milliseconds=75.624)),
TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.709))]),
# Context sensitive case - traceroute doesn't always print the host & IP.
("7 ae-501-ar01.denver.co.denver.comcast.net (96.216.22.130) 41.920 ms 41.893 ms 74.385 ms",
[TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=41.920)),
TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=41.893)),
TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=74.385))]),
])
def test_examples(example: str, expected: List[TraceElem]):
assert parse_traceroute(example.splitlines()) == expected

View file

@ -16,7 +16,9 @@ docutils==0.17
ExifRead==2.3.2 ExifRead==2.3.2
flake8==3.9.2 flake8==3.9.2
Flask==2.0.1 Flask==2.0.1
graphviz==0.18.2
hypothesis==6.14.5 hypothesis==6.14.5
icmplib==3.0.2
idna==2.10 idna==2.10
imagesize==1.2.0 imagesize==1.2.0
importlib-metadata==4.0.1 importlib-metadata==4.0.1