WIP - somewhat working v2 state

This commit is contained in:
Reid 'arrdem' McKenzie 2021-12-12 19:44:12 -07:00
parent 39eff4e53a
commit c589cbd6c5
6 changed files with 567 additions and 346 deletions

View file

@ -1,5 +1,7 @@
"""Aloe - A shitty weathermapping tool. """Aloe - A shitty weathermapping tool.
Think MTR but with the ability to detect/declare incidents and emit logs.
Periodically traceroutes the egress network, and then walks pings out the egress network recording times and hosts which Periodically traceroutes the egress network, and then walks pings out the egress network recording times and hosts which
failed to respond. Expects a network in excess of 90% packet delivery, but with variable timings. Intended to probe for failed to respond. Expects a network in excess of 90% packet delivery, but with variable timings. Intended to probe for
when packet delivery latencies radically degrade and maintain a report file. when packet delivery latencies radically degrade and maintain a report file.
@ -7,211 +9,233 @@ when packet delivery latencies radically degrade and maintain a report file.
""" """
import argparse import argparse
from collections import defaultdict, deque from collections import deque as ringbuffer
from datetime import datetime, timedelta import curses
from itertools import cycle from datetime import timedelta
from itertools import count
import logging import logging
from multiprocessing import Process, Queue
from random import randint
import queue import queue
from queue import Queue
import sys import sys
from time import sleep from threading import Event, Lock, Thread
from typing import List from time import sleep, time
from .icmp import *
from .icmp import _ping
from .cursedlogger import CursesHandler
import graphviz
from icmplib import Hop, traceroute
from icmplib.utils import *
import requests
import pytz import pytz
from .urping import ping, urping
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("hosts", nargs="+") parser.add_argument("hosts", nargs="+")
class Topology(object):
LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0)
def __init__(self):
self._graph = defaultdict(set)
self._nodes = {self.LOCALHOST.address: self.LOCALHOST}
def next_hops(self, address: str) -> List[str]:
return list(self._graph.get(address))
def node(self, address: str) -> Hop:
return self._nodes.get(address)
def add_traceroute(self, trace):
for e in trace:
if e.address not in self._nodes:
self._nodes[e.address] = e
else:
e0 = self._nodes[e.address]
e0._packets_sent += e._packets_sent
e0._rtts.extend(e.rtts)
e0._distance = min(e.distance, e0.distance)
hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [
(e.address, e.distance) for e in trace
]
i1 = iter(hosts)
i2 = iter(hosts)
next(i2)
for (parent, _), (child, _) in zip(i1, i2):
self._graph[parent].add(child)
def render(self):
g = graphviz.Digraph()
for n in sorted(self._nodes.values(), key=lambda n: n.distance):
g.node(n.address)
for next in self._graph[n.address]:
g.edge(n.address, next)
# Lol. Lmao.
return requests.post(
"https://dot-to-ascii.ggerganov.com/dot-to-ascii.php",
params={"boxart": 1, "src": g.source},
).text
def __iter__(self):
return iter(sorted(self._nodes.values(), key=lambda n: n.distance))
def __delitem__(self, key):
del self._graph[key]
del self._nodes[key]
INTERVAL = 0.5 INTERVAL = 0.5
Z = pytz.timezone("America/Denver") Z = pytz.timezone("America/Denver")
def compute_topology(hostlist, topology=None):
"""Walk a series of traceroute tuples, computing a 'worst expected latency' topology from them."""
topology = topology or Topology() class HostState(object):
for h in hostlist: """A model of a (bounded) time series of host state.
trace = traceroute(h)
# Restrict the trace to hosts which ICMP ping
trace = [e for e in trace if ping(e.address, interval=INTERVAL, count=3).is_alive]
topology.add_traceroute(trace)
return topology """
def pinger(host, queue, next=None): class MonitoredHost(object):
# Mokney patch the RTT tracking def __init__(self, hostname: str, timeout: timedelta, id=None):
host._rtts = deque(host._rtts, maxlen=100) self._hostname = hostname
id = randint(1, 1<<16 - 1) self._timeout = timeout
sequence = 0 self._sequence = request_sequence(hostname, timeout, id)
self._lock = Lock()
self._state = ringbuffer(maxlen=360)
self._is_up = False
self._lost = 0
self._up = 0
def __call__(self, shutdown: Event, q: Queue):
"""Monitor a given host by throwing requests into the queue; intended to be a Thread target."""
while not shutdown.is_set():
req = next(self._sequence)
resp = _ping(q, req)
if resp and not self._is_up:
# log.debug(f"Host {self._hostname} is up!")
self._is_up = self._is_up or resp
self._up = resp._time
elif resp and self._is_up:
# log.debug(f"Host {self._hostname} holding up...")
pass
elif not resp and self._is_up:
# log.debug(f"Host {self._hostname} is down!")
self._is_up = None
self._up = None
elif not resp and not self._is_up:
pass
with self._lock:
if not resp:
self._lost += 1
if self._state and not self._state[0]:
self._lost -= 1
# self._state = (self._state + [req])[:-3600]
self._state.append(resp)
sleep(1)
def last(self):
with self._lock:
return next(reversed(self._state), None)
def last_window(self, duration: timedelta = None):
with self._lock:
l = []
t = time() - duration.total_seconds()
for i in reversed(self._state):
if not i or i._time > t:
l.insert(0, i)
else:
break
return l
def loss(self, duration: timedelta):
log = self.last_window(duration)
if log:
return log.count(None) / len(log)
else:
return 0.0
def is_up(self, duration: timedelta, threshold = 0.25):
return self.loss(duration) <= threshold
def last_seen(self, now: datetime) -> timedelta:
if state := self.last():
return now - datetime.fromtimestamp(state._time)
def up(self, duration: datetime):
if self._up:
return datetime.fromtimestamp(self._up)
def main():
stdscr = curses.initscr()
maxy, maxx = stdscr.getmaxyx()
begin_x = 2; begin_y = maxy - 12
height = 10; width = maxx - 4
logscr = curses.newwin(height, width, begin_y, begin_x)
handler = CursesHandler(logscr)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.DEBUG)
stdscr = curses.newwin(maxy - height - 2, width, 0, begin_x)
opts, args = parser.parse_known_args()
q = queue.Queue()
shutdown = Event()
p = Thread(target=icmp_worker, args=(shutdown, q,))
p.start()
hosts = {}
hl = Lock()
threads = {}
def create_host(address):
if address not in hosts:
log.info(f"Monitoring {address}...")
with hl:
hosts[address] = monitor = MonitoredHost(address, timedelta(seconds=8))
threads[address] = t = Thread(target=monitor, args=(shutdown, q))
t.start()
else:
log.debug(f"Already monitoring {address}...")
stdscr.refresh()
def render():
dt = timedelta(seconds=8)
with open("incidents.txt", "w") as fp:
incident = False
while not shutdown.is_set():
rows, cols = stdscr.getmaxyx()
down = 0
now = datetime.now()
i = 0
with hl:
for host, i in zip(hosts.values(), count(1)):
name = host._hostname
loss = host.loss(dt) * 100
state = host.last()
if not state:
down += 1
last_seen = "Down"
else:
last_seen = f"{host.last_seen(now).total_seconds():.2f}s ago"
if up := host.up(dt):
up = f" up: {(now - up).total_seconds():.2f}"
else:
up = ""
stdscr.addstr(i, 2, f"{name: <16s}]{up} lost: {loss:.2f}% last: {last_seen}".ljust(cols))
stdscr.border()
stdscr.refresh()
msg = None
if down >= 3 and not incident:
incident = True
msg = f"{datetime.now()} - {down} hosts down"
elif down < 3 and incident:
incident = False
msg = f"{datetime.now()} - network recovered"
if i != 0 and msg:
log.info(msg)
fp.write(msg + "\n")
fp.flush()
sleep(1)
rt = Thread(target=render)
rt.start()
def retrace():
while not shutdown.is_set():
for h in opts.hosts:
# FIXME: Use a real topology model
for hop in traceroute(q, h):
if ping(q, hop.address).is_alive:
create_host(hop.address)
sleep(60 * 5)
tt = Thread(target=retrace)
tt.start()
try:
while True: while True:
timeout = min(h.avg_rtt / 1000.0, 0.5) # rtt is in ms but timeout is in sec. sleep(1)
start = datetime.now(tz=Z) finally:
curses.endwin()
res = ping(host.address, timeout=timeout, interval=INTERVAL, count=3, id=id, sequence=sequence) sys.stdout.flush()
sequence += res._packets_sent sys.stderr.flush()
shutdown.set()
queue.put((start, res))
sleep(INTERVAL)
if res.is_alive:
host._rtts.extend(res._rtts)
host._packets_sent += res._packets_sent
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG) main()
opts, args = parser.parse_known_args()
now = start = datetime.now(tz=Z)
reconfigure_delay = timedelta(minutes=5)
configure_at = now - reconfigure_delay
flush_delay = timedelta(seconds=1)
flush_at = now + flush_delay
recovered_duration = timedelta(seconds=5)
dead_duration = timedelta(minutes=30)
topology = None
q = Queue()
workers = {}
last_seen = {}
state = {}
spinner = cycle("|/-\\")
with open("incidents.txt", "a") as fp:
fp.write("RESTART\n")
while True:
now = datetime.now(tz=Z)
if flush_at <= now:
fp.flush()
flush_at = now + flush_delay
if configure_at <= now:
log.info("Attempting to reconfigure network topology...")
try:
topology = compute_topology(opts.hosts, topology)
configure_at = now + reconfigure_delay
log.info("Graph -\n" + topology.render())
for h in topology:
if h.distance == 0:
continue
if h.address in workers:
continue
else:
n = next(iter(topology.next_hops(h.address)), None)
p = workers[h.address] = Process(target=pinger, args=(h, q, n))
p.start()
except Exception as e:
log.exception(e)
try:
# Revert to "logical now" of whenever the last ping results came in.
now, res = q.get(timeout=0.1)
last = last_seen.get(res.address)
delta = now - last if last else None
sys.stderr.write("\r" + next(spinner) + " " + f"ICMPResponse({res.address}, {res._rtts}, {res._packets_sent})" + " " * 20)
sys.stderr.flush()
if res.address not in workers:
pass
elif res.is_alive:
last_seen[res.address] = now
if last and delta > recovered_duration:
state[res.address] = True
fp.write(
f"RECOVERED\t{res.address}\t{now.isoformat()}\t{delta.total_seconds()}\n"
)
elif not last:
state[res.address] = True
fp.write(f"UP\t{res.address}\t{now.isoformat()}\n")
elif not res.is_alive:
if last and delta > dead_duration:
workers[res.address].terminate()
del workers[res.address]
del topology[res.address]
del last_seen[res.address]
del state[res.address]
fp.write(
f"DEAD\t{res.address}\t{now.isoformat()}\t{delta.total_seconds()}\n"
)
elif last and delta > recovered_duration and state[res.address]:
fp.write(f"DOWN\t{res.address}\t{now.isoformat()}\n")
state[res.address] = False
except queue.Empty:
sys.stderr.write("\r" + next(spinner))
sys.stderr.flush()

View file

@ -0,0 +1,27 @@
import logging
import curses
from collections import deque as ringbuffer
from itertools import count
class CursesHandler(logging.Handler):
def __init__(self, screen):
logging.Handler.__init__(self)
self._screen = screen
self._buff = ringbuffer(maxlen=screen.getmaxyx()[0] - 2)
def emit(self, record):
try:
msg = self.format(record) + "\n"
self._buff.append(msg)
self._screen.clear()
for i, msg in zip(count(1), self._buff):
self._screen.addstr(i, 2, msg)
self._screen.border()
self._screen.refresh()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)

View file

@ -0,0 +1,242 @@
"""Tools sitting between aloe and icmplib."""
from datetime import datetime, timedelta
from itertools import islice as take
import logging
import queue
from random import randint
from threading import Event, Lock
from time import sleep
from icmplib.exceptions import (
ICMPLibError,
ICMPSocketError,
TimeoutExceeded,
)
from icmplib.models import (
Hop,
Host,
ICMPReply,
ICMPRequest,
)
from icmplib.sockets import ICMPv4Socket
from icmplib.utils import is_hostname, resolve
log = logging.getLogger(__name__)
def better_repr(self):
elems = ", ".join(f"{slot}={getattr(self, slot)}" for slot in self.__slots__)
return f"<{type(self).__name__} ({elems})>"
ICMPRequest.__repr__ = better_repr
ICMPReply.__repr__ = better_repr
class ICMPRequestResponse(object):
"""A thread-safe request/response structure for pinging a host."""
PENDING = object()
TIMEOUT = object()
def __init__(self, address, request: ICMPRequest, timeout: timedelta):
_timeout = datetime.now() + timeout
self._address = address
self._request = request
self._timeout = _timeout
self._lock = Lock()
self._response = self.PENDING
def ready(self):
"""Determine if this request is still waiting."""
return self.get() is not self.PENDING
def get(self):
"""Get the response, unless the timeout has passed in which case return a timeout."""
with self._lock:
if self._response is not self.PENDING:
return self._response
elif self._timeout < datetime.now():
return self.TIMEOUT
else:
return self._response
def set(self, response):
"""Set the response, unless the timeout has passed in which case set a timeout."""
if isinstance(response, ICMPReply):
with self._lock:
if self._timeout < datetime.now():
self._response = self.TIMEOUT
else:
# rtt = (reply.time - request.time) * 1000
self._response = response
def icmp_worker(shutdown: Event, q: queue.Queue):
"""A worker thread which processes ICMP requests; sending packets and listening for matching responses."""
state = {}
with ICMPv4Socket(None, True) as sock:
while not shutdown.is_set():
# Send one
try:
item = q.get(block=False, timeout=0.1)
request = item._request
state[(request._id, request._sequence)] = item
# log.info(f"Sending request {item._request!r}")
sock.send(item._request)
except (ICMPLibError, ICMPSocketError, queue.Empty):
pass
# Recieve one
try:
if response := sock.receive(None, 0.1):
key = (response.id, response.sequence)
if key in state:
# log.info(f"Got response {response!r}")
state[key].set(response)
del state[key]
else:
# log.warning(f"Recieved non-matching response {response!r}")
pass
except (ICMPLibError, ICMPSocketError):
pass
# GC one
if key := next(iter(state.keys()), None):
if state[key].ready():
del state[key]
# Sleep one
sleep(0.1)
def traceroute(q: queue.Queue,
address: str,
first_hop: int = 1,
max_hops: int = 32,
count: int = 3,
id: int = None,
family: int = None):
if is_hostname(address):
address = resolve(address, family)[0]
mask = ((1<<16) - 1)
id = id or randint(1, mask) & 0xFFFF
ttl = first_hop
host_reached = False
hops = []
while not host_reached and ttl <= max_hops:
reply = None
packets_sent = 0
rtts = []
for sequence in range(count):
request = ICMPRequestResponse(
address,
ICMPRequest(
destination=address,
id=id,
sequence=sequence,
ttl=ttl
),
timedelta(seconds=1),
)
q.put(request)
while not request.ready():
sleep(0.1)
_reply = request.get()
if _reply is ICMPRequestResponse.TIMEOUT:
sleep(0.1)
continue
elif _reply:
reply = reply or _reply
try:
reply.raise_for_status()
host_reached = True
except ICMPLibError:
pass
rtt = (reply.time - request._request.time) * 1000
rtts.append(rtt)
if reply:
hops.append(
Hop(
address=reply.source,
packets_sent=packets_sent,
rtts=rtts,
distance=ttl
)
)
ttl += 1
return hops
def request_sequence(hostname: str,
timeout: timedelta,
id: int = None,
family: int = None):
"""Generate a sequence of requests monitoring a specific, usable as a request source for a ping."""
if is_hostname(hostname):
destination = resolve(hostname, family)[0]
else:
destination = hostname
mask = ((1<<16) - 1)
id = id or randint(1, mask) & 0xFFFF
sequence = 1
while True:
yield ICMPRequestResponse(
hostname,
ICMPRequest(
destination=destination,
id=id,
sequence=sequence & mask,
),
timeout
)
sequence += 1
def _ping(q: queue.Queue, request: ICMPRequestResponse):
q.put(request)
while not request.ready():
sleep(0.1)
_response = request.get()
if _response is not ICMPRequestResponse.TIMEOUT:
return _response
def ping(q: queue.Queue,
address: str,
count: int = 3,
id: int = None,
family: int = None) -> Host:
"""Ping a host N times."""
rtts = []
for request in take(request_sequence(address, timedelta(seconds=1)), count):
if reply := _ping(q, request):
rtt = (reply.time - request._request.time) * 1000
rtts.append(rtt)
return Host(
address=address,
packets_sent=count,
rtts=rtts,
)

View file

@ -0,0 +1,23 @@
"""An implementation of ring buffers which supports efficient reverse scans."""
class Ringbuffer(object):
def __init__(self, size: int, seed: list = None, fill: object = None):
if seed:
assert len(seed) <= size
self._state = ((seed or []) + [fill] * size)[:size]
self._size = size
self._start = 0
self._end = len(seed) if seed else 0
def append(self, obj):
self._state[self._end % self._size] = obj
self._end = (self._end + 1) % self._size
self._start = (self._start + 1) % self._size
def __iter__(self):
if self._start < self._end:
yield from iter(self._state[self._start:self._end])
else:
yield from iter(self._state[self._start:] + self._state[:self._end])
def __reversed__(self):

View file

@ -0,0 +1,68 @@
"""
A model of a (radial) network topology.
"""
from collections import defaultdict
import logging
from typing import List
import graphviz
from icmplib import Hop
import requests
log = logging.getLogger(__name__)
class Topology(object):
LOCALHOST = Hop("127.0.0.1", 1, [0.0], 0)
def __init__(self):
self._graph = defaultdict(set)
self._nodes = {self.LOCALHOST.address: self.LOCALHOST}
def next_hops(self, address: str) -> List[str]:
return list(self._graph.get(address))
def node(self, address: str) -> Hop:
return self._nodes.get(address)
def add_traceroute(self, trace):
for e in trace:
if e.address not in self._nodes:
self._nodes[e.address] = e
else:
e0 = self._nodes[e.address]
e0._packets_sent += e._packets_sent
e0._rtts.extend(e.rtts)
e0._distance = min(e.distance, e0.distance)
hosts = [(self.LOCALHOST.address, self.LOCALHOST.distance)] + [
(e.address, e.distance) for e in trace
]
i1 = iter(hosts)
i2 = iter(hosts)
next(i2)
for (parent, _), (child, _) in zip(i1, i2):
self._graph[parent].add(child)
def render(self):
g = graphviz.Digraph()
for n in sorted(self._nodes.values(), key=lambda n: n.distance):
g.node(n.address)
for next in self._graph[n.address]:
g.edge(n.address, next)
# Lol. Lmao.
return requests.post(
"https://dot-to-ascii.ggerganov.com/dot-to-ascii.php",
params={"boxart": 1, "src": g.source},
).text
def __iter__(self):
return iter(sorted(self._nodes.values(), key=lambda n: n.distance))
def __delitem__(self, key):
del self._graph[key]
del self._nodes[key]

View file

@ -1,163 +0,0 @@
#!/usr/bin/env python3
import logging
from time import sleep
import traceback
from textwrap import indent
from random import randint
import sys
from icmplib.exceptions import (
ICMPLibError,
TimeExceeded,
TimeoutExceeded,
)
from icmplib.models import Host, Hop, ICMPRequest, ICMPReply
from icmplib.sockets import (
ICMPv4Socket,
ICMPv6Socket,
)
from icmplib.utils import *
log = logging.getLogger(__name__)
def better_repr(self):
elems = ", ".join(f"{slot}={getattr(self, slot)}" for slot in self.__slots__)
return f"<{type(self).__name__} ({elems})>"
ICMPRequest.__repr__ = better_repr
ICMPReply.__repr__ = better_repr
def urping(address: str,
hops: int,
via: str,
family=None,
count=3,
fudge=4,
id=None,
interval=0.2,
timeout=2,
source=None,
**kwargs) -> Hop:
"""Ur-ping by (ab)using ICMP TTLs.
Craft an ICMP message which would go one (or more) hops FARTHER than the `address` host, routed towards `via`.
Send `count * fudge` packets, looking for responses from `address`.
Responses from `address` are considered; and a `Hop` is built from those results.
Other responses from other hosts are discarded.
"""
if is_hostname(via):
via = resolve(via, family)[0]
if is_hostname(address):
address = resolve(address, falmiy)[0]
if is_ipv6_address(via):
_Socket = ICMPv6Socket
else:
_Socket = ICMPv4Socket
ttl = hops
hop = Hop(address, 0, [], hops)
packets_sent = 0
with _Socket(source) as sock:
for _ in range(count * fudge):
request = ICMPRequest(
destination=via,
# Note that we act like this is a new stream with a new ID and sequence each time to try and fool multipathing.
id=id or unique_identifier(),
sequence=0,
ttl=ttl,
**kwargs)
try:
sock.send(request)
packets_sent += 1
reply = None
reply = sock.receive(request, timeout)
rtt = (reply.time - request.time) * 1000
reply.raise_for_status()
assert reply.id == request.id
assert reply.sequence == request.sequence
assert reply.source == address
hop._packets_sent += 1
hop._rtts.append(rtt)
if hop._packets_sent >= count:
break
except AssertionError:
log.warning("Got response from unexpected node %s (expected %s) %r for request %4", reply.source, address, reply, request)
except (TimeoutExceeded, TimeExceeded):
pass
except ICMPLibError as e:
log.exception(e)
break
sleep(interval)
return hop
def ping(address, count=4, interval=1, timeout=2, id=None, source=None,
family=None, privileged=True, sequence=0, **kwargs):
"""A simple, if paranoid, ping."""
if is_hostname(address):
address = resolve(address, family)[0]
if is_ipv6_address(address):
_Socket = ICMPv6Socket
else:
_Socket = ICMPv4Socket
id = id or randint(1, 1<<16 - 1) & 0xFFFF
packets_sent = 0
rtts = []
with _Socket(source, privileged) as sock:
for base in range(count):
sequence = (sequence + base) & 0xFFFF
if base > 0:
sleep(interval)
request = ICMPRequest(
destination=address,
id=id,
sequence=sequence,
**kwargs)
try:
sock.send(request)
packets_sent += 1
reply = sock.receive(request, timeout)
reply.raise_for_status()
assert reply.id == request.id
assert reply.sequence == request.sequence
assert reply.source == address
rtt = (reply.time - request.time) * 1000
rtts.append(rtt)
except AssertionError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
log.warning("Got erroneous response:\n request: %r\n reply: %r\n err: |\n%s", request, reply, indent(traceback.format_exc(), " "))
except ICMPLibError:
pass
return Host(address, packets_sent, rtts)