Overhaul to a considerably more useful multiprocess state

This commit is contained in:
Reid 'arrdem' McKenzie 2021-11-20 22:05:45 -07:00
parent 8469ab7758
commit 7fcbf6f3fa
7 changed files with 222 additions and 166 deletions

View file

@ -7,5 +7,8 @@ zapp_binary(
main = "src/python/aloe/__main__.py",
deps = [
":lib",
py_requirement("graphviz"),
py_requirement("icmplib"),
py_requirement("requests"),
],
)

View file

@ -6,15 +6,19 @@ when packet delivery latencies radically degrade and maintain a report file.
"""
import sys
import argparse
import logging
from collections import defaultdict, deque
from datetime import datetime, timedelta
from ping import ping
from traceroute import TraceElem, traceroute
from subprocess import CalledProcessError
from typing import NamedTuple
from collections import defaultdict
import logging
from multiprocessing import Process, Queue
import queue
import sys
from typing import List
from .lib import *
import graphviz
import requests
log = logging.getLogger(__name__)
@ -35,103 +39,156 @@ def distinct(iter):
return l
class Host(NamedTuple):
hostname: str
ip: str
rank: int
latency: timedelta
samples: int = 1
def mean_latency(self):
return self.latency / self.samples
class Topology(object):
LOCALHOST = Host("localhost", "127.0.0.1", 0, timedelta(seconds=0.1))
LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0)
def __init__(self):
self._graph = defaultdict(set) # Dict[ip, List[ip]]
self._nodes = {self.LOCALHOST.ip: self.LOCALHOST} # Dict[ip, Host]
self._graph = defaultdict(set) # Dict[address, List[address]]
self._nodes = {self.LOCALHOST.address: self.LOCALHOST} # Dict[address, Host]
def next_hops(self, address: str) -> List[str]:
return list(self._graph.get(address))
def node(self, address: str) -> Hop:
return self._nodes.get(address)
def add_traceroute(self, trace):
trace = list(trace)
hosts = []
newhosts = [self.LOCALHOST.ip]
rank = 0
for e in trace:
if e.ip not in self._nodes:
self._nodes[e.ip] = Host(e.hostname, e.ip, e.rank, e.latency, 1)
if e.address not in self._nodes:
self._nodes[e.address] = e
else:
self._nodes[e.ip] = Host(e.hostname, e.ip, e.rank, e.latency + self._nodes[e.ip].latency, self._nodes[e.ip].samples + 1)
e0 = self._nodes[e.address]
e0._packets_sent += e._packets_sent
e0._rtts.extend(e.rtts)
e0._distance = max(e.distance, e0.distance)
if e.rank > rank:
if newhosts:
for h2 in newhosts:
for h1 in hosts:
self._graph[h1].add(h2)
hosts = newhosts
newhosts = []
rank = e.rank
hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [(e.address, e.distance) for e in trace]
i1 = iter(hosts)
i2 = iter(hosts)
next(i2)
if e.rank == rank:
newhosts.append(e.ip)
for (parent, _), (child, _) in zip(i1, i2):
self._graph[parent].add(child)
def render(self):
for n in sorted(self._nodes.values(), key=lambda n: n.rank):
print(f"{n.hostname} ({n.ip}) => {self._graph[n.ip]}")
g = graphviz.Digraph()
for n in sorted(self._nodes.values(), key=lambda n: n.distance):
g.node(n.address)
for next in self._graph[n.address]:
g.edge(n.address, next)
# Lol. Lmao.
return requests.post("https://dot-to-ascii.ggerganov.com/dot-to-ascii.php", params={"boxart":1, "src": g.source}).text
# return g.source
def compute_topology(hostlist):
def __iter__(self):
return iter(sorted(self._nodes.values(), key=lambda n: n.distance))
def __delitem__(self, key):
del self._graph[key]
del self._nodes[key]
def compute_topology(hostlist, topology=None):
"""Walk a series of traceroute tuples, computing a 'worst expected latency' topology from them."""
topology = Topology()
topology = topology or Topology()
for h in hostlist:
topology.add_traceroute(traceroute(h))
trace = traceroute(h)
# Restrict the trace to hosts which ICMP ping
trace = [e for e in trace if ping(e.address, count=1).is_alive]
topology.add_traceroute(trace)
return sorted(topology._nodes.values(), key=lambda n: n.rank)
return topology
def cycle(iter):
while True:
for e in iter:
yield e
def pinger(host, id, queue):
# Mokney patch the RTT tracking
host._rtts = deque(host._rtts, maxlen=100)
while True:
timeout = h.avg_rtt * 2 / 1000.0 # rtt is in ms but timeout is in sec.
start = datetime.now()
res = ping(host.address, id=id, timeout=timeout, count=3)
queue.put((start, res))
if res.is_alive:
host._rtts.extend(res._rtts)
host._packets_sent += res._packets_sent
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
opts, args = parser.parse_known_args()
now = start = datetime.now()
reconfigure_delay = timedelta(minutes=5)
configure_at = now - reconfigure_delay
flush_delay = timedelta(seconds=5)
flush_at = now + flush_delay
topology = []
recovered_duration = timedelta(seconds=5)
dead_duration = timedelta(seconds=30)
topology = None
id = unique_identifier()
q = Queue()
workers = {}
last_seen = {}
spinner = cycle("|/-\\")
with open("incidents.txt", "a") as fp:
while True:
now = datetime.now()
if flush_at <= now:
fp.flush()
flush_at = now + flush_delay
if configure_at <= now:
log.info("Attempting to reconfigure network topology...")
try:
topology = compute_topology(opts.hosts)
configure_at = now + reconfigure_delay
for h in topology:
log.info(f"in topology {h}")
except CalledProcessError:
topology = compute_topology(opts.hosts, topology)
configure_at = now + reconfigure_delay
log.info("Graph -\n" + topology.render())
for h in topology:
if h.distance < 1 or h.distance > 6:
continue
if h.address in workers:
continue
else:
p = workers[h.address] = Process(target=pinger, args=(h, id, q))
p.start()
try:
timestamp, res = q.get(timeout=0.1)
last = last_seen.get(res.address)
if res.address not in workers:
pass
for h in topology:
if h.rank == 0:
continue
elif res.is_alive:
if last and (delta := timestamp - last) > recovered_duration:
fp.write(f"RECOVERED\t{res.address}\t{timestamp.isoformat()}\t{delta.total_seconds()}\n")
last_seen[res.address] = timestamp
fail = False
try:
if ping(h.ip, timeout=h.mean_latency() * 2) != 0:
fail = True
except Exception as e:
fail = True
log.exception(e)
elif not res.is_alive:
if last and (delta := timestamp - last) > dead_duration:
workers[h.address].terminate()
del workers[h.address]
del topology[h.address]
fp.write(f"DEAD\t{res.address}\t{timestamp.isoformat()}\t{delta.total_seconds()}\n")
if fail:
msg = f"{datetime.now()} failed to reach {h.hostname} ({h.ip})"
log.warning(msg)
fp.write(msg + "\n")
else:
fp.write(f"DOWN\t{res.address}\t{timestamp.isoformat()}\n")
else:
sys.stderr.write('.')
sys.stderr.flush()
except queue.Empty:
sys.stderr.write("\r" + next(spinner))
sys.stderr.flush()

View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
import logging
from time import sleep
from icmplib import Hop
from icmplib.exceptions import (
ICMPLibError,
TimeExceeded,
)
from icmplib.models import Hop, ICMPRequest
from icmplib.sockets import (
ICMPv4Socket,
ICMPv6Socket,
)
from icmplib.utils import *
log = logging.getLogger(__name__)
def urping(address: str, hops: int, via: str, family=None, count=3, fudge=4, interval=0.05, timeout=2, source=None, **kwargs) -> Hop:
"""Ur-ping by (ab)using ICMP TTLs.
Craft an ICMP message which would go one (or more) hops FARTHER than the `address` host, routed towards `via`.
Send `count * fudge` packets, looking for responses from `address`.
Responses from `address` are considered; and a `Hop` is built from those results.
Other responses from other hosts are discarded.
"""
if is_hostname(via):
via = resolve(via, family)[0]
if is_hostname(address):
address = resolve(address, falmiy)[0]
if is_ipv6_address(via):
_Socket = ICMPv6Socket
else:
_Socket = ICMPv4Socket
ttl = hops
hop = Hop(address, 0, [])
packets_sent = 0
with _Socket(source) as sock:
for _ in range(count * fudge):
request = ICMPRequest(
destination=via,
# Note that we act like this is a new stream with a new ID and sequence each time to try and fool multipathing.
id=unique_identifier(),
sequence=0,
ttl=ttl,
**kwargs)
try:
sock.send(request)
packets_sent += 1
reply = None
reply = sock.receive(request, timeout)
rtt = (reply.time - request.time) * 1000
reply.raise_for_status()
except TimeExceeded:
sleep(interval)
except ICMPLibError as e:
log.exception(e)
break
if not reply:
log.debug(f"got no reply from {address}")
else:
# Because the default repr() is crap
reply_str = "ICMPReply({})".format(", ".join(slot + "=" + str(getattr(reply, slot)) for slot in reply.__slots__))
log.debug(f"Pinging {address} (distance {hops}) via {via} got reply {reply_str}")
if reply and reply.source != address:
continue
elif reply:
hop._packets_sent += 1
hop._rtts.append(rtt)
if hop and hop._packets_sent >= count:
break
return hop

View file

@ -1,18 +0,0 @@
"""A shitty ping wrapper."""
from datetime import timedelta
from subprocess import check_call, DEVNULL
def ping(host: str,
count: int = 3,
interval: float = 0.3,
timeout: timedelta = timedelta(seconds=3)):
return check_call(["ping", "-q",
"-i", str(interval),
"-c", str(count),
"-W", str(timeout.total_seconds()),
host],
stdout=DEVNULL,
stderr=DEVNULL,)

View file

@ -1,48 +0,0 @@
"""A shitty traceroute wrapper."""
from datetime import timedelta
import re
from subprocess import (
CalledProcessError,
check_call,
check_output,
DEVNULL,
)
from typing import Iterator, List, NamedTuple
class TraceElem(NamedTuple):
hostname: str
ip: str
latency: timedelta
rank: int
_LINE = re.compile(r"\*|(((?P<hostname>[-_\w\d\.]*)\s+\((?P<ip>[a-f\d\.:]*)\)\s+)?(?P<latency>[\d\.]*) ms)")
def _parse_traceroute(lines: List[str]) -> Iterator[TraceElem]:
for rank, l in zip(range(1, 1<<64), lines):
ip = None
hostname = None
for m in re.finditer(_LINE, l):
if m.group("latency"):
ip = m.group("ip") or ip
hostname = m.group("hostname") or hostname
yield TraceElem(hostname, ip, timedelta(milliseconds=float(m.group("latency"))), rank)
def traceroute(host: str, icmp=True, timeout=timedelta(seconds=5)) -> Iterator[TraceElem]:
# FIXME: Make ICMP mode an option, not on always
yield from _parse_traceroute(
check_output(["traceroute",
# Set wait; note use of total_seconds which is "real" valued.
"-w", str(timeout.total_seconds()),
# Use ICMP probes same as PING.
# This means all probed hosts will be pingable/ping compliant.
# May miss parts of the topology as a result.
"-I",
host],
stderr=DEVNULL,)
.decode("utf-8")
.splitlines())

View file

@ -1,31 +0,0 @@
#!/usr/bin/env python3
from typing import List
from traceroute import _parse_traceroute, TraceElem
from datetime import timedelta
import pytest
def parse_traceroute(lines):
"""Testing helper."""
return list(_parse_traceroute(lines))
@pytest.mark.parametrize("example, expected", [
# Basic case, one match
("3 10.60.142.2 (10.60.142.2) 117.502 ms",
[TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.502))]),
# Multiple matches on one line
("3 10.60.142.2 (10.60.142.2) 117.502 ms 10.60.142.3 (10.60.142.3) 75.624 ms 10.60.142.2 (10.60.142.2) 117.709 ms",
[TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.502)),
TraceElem("10.60.142.3", "10.60.142.3", timedelta(milliseconds=75.624)),
TraceElem("10.60.142.2", "10.60.142.2", timedelta(milliseconds=117.709))]),
# Context sensitive case - traceroute doesn't always print the host & IP.
("7 ae-501-ar01.denver.co.denver.comcast.net (96.216.22.130) 41.920 ms 41.893 ms 74.385 ms",
[TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=41.920)),
TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=41.893)),
TraceElem("ae-501-ar01.denver.co.denver.comcast.net", "96.216.22.130", timedelta(milliseconds=74.385))]),
])
def test_examples(example: str, expected: List[TraceElem]):
assert parse_traceroute(example.splitlines()) == expected

View file

@ -16,7 +16,9 @@ docutils==0.17
ExifRead==2.3.2
flake8==3.9.2
Flask==2.0.1
graphviz==0.18.2
hypothesis==6.14.5
icmplib==3.0.2
idna==2.10
imagesize==1.2.0
importlib-metadata==4.0.1