import base64 from datetime import datetime import json from json.decoder import JSONDecodeError import logging import socket from urllib.parse import urlparse import uuid from typing import Union from Crypto.Hash import SHA, SHA256, SHA512 from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 from aiohttp import ClientSession from aiohttp.client_exceptions import ( ClientConnectorError, ClientResponseError, ) from aiohttp.hdrs import METH_ALL as METHODS from aiohttp.web import ( Response as AiohttpResponse, View as AiohttpView, ) from async_lru import alru_cache from relay.http_debug import http_debug from retry import retry app = None HASHES = {"sha1": SHA, "sha256": SHA256, "sha512": SHA512} MIMETYPES = { "activity": "application/activity+json", "html": "text/html", "json": "application/json", "text": "text/plain", } NODEINFO_NS = { "20": "http://nodeinfo.diaspora.software/ns/schema/2.0", "21": "http://nodeinfo.diaspora.software/ns/schema/2.1", } def set_app(new_app): global app app = new_app def build_signing_string(headers, used_headers): return "\n".join(map(lambda x: ": ".join([x.lower(), headers[x]]), used_headers)) def check_open_port(host, port): if host == "0.0.0.0": host = "127.0.0.1" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: return s.connect_ex((host, port)) != 0 except socket.error as e: return False def create_signature_header(headers): headers = {k.lower(): v for k, v in headers.items()} used_headers = headers.keys() sigstring = build_signing_string(headers, used_headers) sig = { "keyId": app.config.keyid, "algorithm": "rsa-sha256", "headers": " ".join(used_headers), "signature": sign_signing_string(sigstring, app.database.PRIVKEY), } chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()] return ",".join(chunks) def distill_inboxes(object_id): for inbox in app.database.inboxes: if (urlparse(inbox).hostname != urlparse(object_id).hostname): yield inbox def generate_body_digest(body): h = SHA256.new(body.encode("utf-8")) bodyhash = base64.b64encode(h.digest()).decode("utf-8") return bodyhash def sign_signing_string(sigstring, key): pkcs = PKCS1_v1_5.new(key) h = SHA256.new() h.update(sigstring.encode("ascii")) sigdata = pkcs.sign(h) return base64.b64encode(sigdata).decode("utf-8") def split_signature(sig): default = {"headers": "date"} sig = sig.strip().split(",") for chunk in sig: k, _, v = chunk.partition("=") v = v.strip('"') default[k] = v default["headers"] = default["headers"].split() return default async def fetch_actor_key(actor): actor_data = await request(actor) if not actor_data: return None try: return RSA.importKey(actor_data["publicKey"]["publicKeyPem"]) except Exception as e: logging.debug(f"Exception occured while fetching actor key: {e}") @alru_cache async def fetch_nodeinfo(domain): nodeinfo_url = None wk_nodeinfo = await request( f"https://{domain}/.well-known/nodeinfo", sign_headers=False, activity=False ) if not wk_nodeinfo: return wk_nodeinfo = WKNodeinfo(wk_nodeinfo) for version in ["20", "21"]: try: nodeinfo_url = wk_nodeinfo.get_url(version) except KeyError: pass if not nodeinfo_url: logging.debug(f"Failed to fetch nodeinfo url for domain: {domain}") return False nodeinfo = await request(nodeinfo_url, sign_headers=False, activity=False) try: return nodeinfo["software"]["name"] except KeyError: return False @retry(exceptions=(ClientConnectorError, ClientResponseError), backoff=1, tries=3) async def request(uri, data=None, force=False, sign_headers=True, activity=True): ## If a get request and not force, try to use the cache first url = urlparse(uri) method = "POST" if data else "GET" action = data.get("type") if data else None headers = { "Accept": f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', "User-Agent": "ActivityRelay", } if data: headers["Content-Type"] = MIMETYPES["activity" if activity else "json"] if sign_headers: signing_headers = { "(request-target)": f"{method.lower()} {url.path}", "Date": datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT"), "Host": url.netloc, } if data: assert isinstance(data, dict) data = json.dumps(data) signing_headers.update( { "Digest": f"SHA-256={generate_body_digest(data)}", "Content-Length": str(len(data.encode("utf-8"))), } ) signing_headers["Signature"] = create_signature_header(signing_headers) del signing_headers["(request-target)"] del signing_headers["Host"] headers.update(signing_headers) logging.info("%s %s", method, uri) # logging.debug("%r %r %r %r", method, uri, headers, data) async with ClientSession(trace_configs=http_debug()) as session: async with session.request(method, uri, headers=headers, data=data) as resp: resp.raise_for_status() ## aiohttp has been known to leak if the response hasn't been read, ## so we're just gonna read the request no matter what resp_data = await resp.read() ## Not expecting a response, so just return if resp.status == 202: return elif resp.status != 200: if not resp_data: return logging.debug(f"Received error when requesting {uri}: {resp.status} {resp_data}") return logging.debug(f"Received error when sending {action} to {uri}: {resp.status} {resp_data}") if resp.content_type == MIMETYPES["activity"]: resp_data = await resp.json(loads=Message.new_from_json) elif resp.content_type == MIMETYPES["json"]: resp_data = await resp.json(loads=DotDict.new_from_json) else: logging.debug(f'Invalid Content-Type for "{url}": {resp.content_type}') return logging.debug(f"Response: {resp_data}") logging.debug(f"{uri} >> resp {resp_data}") return resp_data async def validate_signature(actor, http_request): pubkey = await fetch_actor_key(actor) if not pubkey: return False logging.debug(f"actor key: {pubkey}") headers = {key.lower(): value for key, value in http_request.headers.items()} headers["(request-target)"] = " ".join( [ http_request.method.lower(), http_request.path ] ) sig = split_signature(headers["signature"]) logging.debug(f"sigdata: {sig}") sigstring = build_signing_string(headers, sig["headers"]) logging.debug(f"sigstring: {sigstring}") sign_alg, _, hash_alg = sig["algorithm"].partition("-") logging.debug(f"sign alg: {sign_alg}, hash alg: {hash_alg}") sigdata = base64.b64decode(sig["signature"]) pkcs = PKCS1_v1_5.new(pubkey) h = HASHES[hash_alg].new() h.update(sigstring.encode("ascii")) result = pkcs.verify(h, sigdata) http_request["validated"] = result logging.debug(f"validates? {result}") return result class DotDict(dict): def __init__(self, _data, **kwargs): dict.__init__(self) self.update(_data, **kwargs) def __hasattr__(self, k): return k in self def __getattr__(self, k): try: return self[k] except KeyError: raise AttributeError( f"{self.__class__.__name__} object has no attribute {k}" ) from None def __setattr__(self, k, v): if k.startswith("_"): super().__setattr__(k, v) else: self[k] = v def __setitem__(self, k, v): if type(v) == dict: v = DotDict(v) super().__setitem__(k, v) def __delattr__(self, k): try: dict.__delitem__(self, k) except KeyError: raise AttributeError( f"{self.__class__.__name__} object has no attribute {k}" ) from None @classmethod def new_from_json(cls, data): if not data: raise JSONDecodeError("Empty body", data, 1) try: return cls(json.loads(data)) except ValueError: raise JSONDecodeError("Invalid body", data, 1) def to_json(self, indent=None): return json.dumps(self, indent=indent) def update(self, _data, **kwargs): if isinstance(_data, dict): for key, value in _data.items(): self[key] = value elif isinstance(_data, (list, tuple, set)): for key, value in _data: self[key] = value for key, value in kwargs.items(): self[key] = value # misc properties @property def domain(self): return urlparse(getattr(self, "id", None) or getattr(self, "actor")).hostname # actor properties @property def pubkey(self): return self.publicKey.publicKeyPem @property def shared_inbox(self): return self.get("endpoints", {}).get("sharedInbox", self.inbox) # activity properties @property def actorid(self): if isinstance(self.actor, dict): return self.actor.id return self.actor @property def objectid(self): if isinstance(self.object, dict): return self.object.id return self.object class Message(DotDict): @classmethod def new_actor(cls, host, pubkey, description=None): return cls( { "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{host}/actor", "type": "Application", "preferredUsername": "relay", "name": "ActivityRelay", "summary": description or "ActivityRelay bot", "followers": f"https://{host}/followers", "following": f"https://{host}/following", "inbox": f"https://{host}/inbox", "url": f"https://{host}/inbox", "endpoints": {"sharedInbox": f"https://{host}/inbox"}, "publicKey": { "id": f"https://{host}/actor#main-key", "owner": f"https://{host}/actor", "publicKeyPem": pubkey, }, } ) @classmethod def new_announce(cls, host, object): return cls( { "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{host}/activities/{uuid.uuid4()}", "type": "Announce", "to": [f"https://{host}/followers"], "actor": f"https://{host}/actor", "object": object, } ) @classmethod def new_follow(cls, host, actor): return cls( { "@context": "https://www.w3.org/ns/activitystreams", "type": "Follow", "to": [actor], "object": actor, "id": f"https://{host}/activities/{uuid.uuid4()}", "actor": f"https://{host}/actor", } ) @classmethod def new_unfollow(cls, host, actor, follow): return cls( { "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{host}/activities/{uuid.uuid4()}", "type": "Undo", "to": [actor], "actor": f"https://{host}/actor", "object": follow, } ) @classmethod def new_response(cls, host, actor, followid, accept): return cls( { "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{host}/activities/{uuid.uuid4()}", "type": "Accept" if accept else "Reject", "to": [actor], "actor": f"https://{host}/actor", "object": { "id": followid, "type": "Follow", "object": f"https://{host}/actor", "actor": actor, }, } ) class Response(AiohttpResponse): @classmethod def new(cls, body: Union[dict, str, bytes] = "", status=200, headers=None, ctype="text"): kwargs = { "status": status, "headers": headers, "content_type": MIMETYPES[ctype], } if isinstance(body, bytes): kwargs["body"] = body elif isinstance(body, dict) and ctype in {"json", "activity"}: kwargs["text"] = json.dumps(body) else: kwargs["text"] = body return cls(**kwargs) @classmethod def new_error(cls, status, body, ctype="text"): if ctype == "json": body = json.dumps({"status": status, "error": body}) return cls.new(body=body, status=status, ctype=ctype) @property def location(self): return self.headers.get("Location") @location.setter def location(self, value): self.headers["Location"] = value class WKNodeinfo(DotDict): @classmethod def new(cls, v20, v21): return cls( { "links": [ {"rel": NODEINFO_NS["20"], "href": v20}, {"rel": NODEINFO_NS["21"], "href": v21}, ] } ) def get_url(self, version="20"): for item in self.links: if item["rel"] == NODEINFO_NS[version]: return item["href"] raise KeyError(version)