import json import logging from urllib.parse import urlparse from Crypto.PublicKey import RSA class RelayDatabase(dict): def __init__(self, config): dict.__init__( self, { "relay-list": {}, "private-key": None, "follow-requests": {}, "version": 1, }, ) self.config = config self.PRIVKEY = None @property def PUBKEY(self): return self.PRIVKEY.publickey() @property def pubkey(self): return self.PUBKEY.exportKey("PEM").decode("utf-8") @property def privkey(self): return self["private-key"] @property def hostnames(self): return tuple(self["relay-list"].keys()) @property def inboxes(self): return tuple(data["inbox"] for data in self["relay-list"].values()) def generate_key(self): self.PRIVKEY = RSA.generate(4096) self["private-key"] = self.PRIVKEY.exportKey("PEM").decode("utf-8") def load(self): with self.config.db.open() as fd: data = json.load(fd) self["version"] = data.get("version", 1) self["private-key"] = data.get("private-key") self["relay-list"] = data.get("relay-list", {}) self["follow-requests"] = data.get("follow-requests") if not self.privkey: logging.info("No actor keys present, generating 4096-bit RSA keypair.") self.generate_key() self.save() else: self.PRIVKEY = RSA.importKey(self.privkey) def save(self): with self.config.db.open("w") as fd: json.dump(self, fd, indent=4) def get_inbox(self, domain, fail=False): if domain.startswith("http"): domain = urlparse(domain).hostname if domain not in self["relay-list"]: if fail: raise KeyError(domain) return return self["relay-list"][domain] def add_inbox(self, inbox, followid=None, fail=False): assert inbox.startswith("https"), "Inbox must be a url" domain = urlparse(inbox).hostname if self.get_inbox(domain): if fail: raise KeyError(domain) return False self["relay-list"][domain] = { "domain": domain, "inbox": inbox, "followid": followid, } logging.debug(f"Added inbox to database: {inbox}") return self["relay-list"][domain] def del_inbox(self, domain, followid=None, fail=False): data = self.get_inbox(domain, fail=False) if not data: if fail: raise KeyError(domain) return False if not data["followid"] or not followid or data["followid"] == followid: del self["relay-list"][data["domain"]] logging.debug(f'Removed inbox from database: {data["inbox"]}') return True if fail: raise ValueError("Follow IDs do not match") logging.debug( f'Follow ID does not match: db = {data["followid"]}, object = {followid}' ) return False def set_followid(self, domain, followid): if (data := self.get_inbox(domain, fail=True)): data["followid"] = followid def get_request(self, domain, fail=True): if domain.startswith("http"): domain = urlparse(domain).hostname try: return self["follow-requests"][domain] except KeyError as e: if fail: raise e def add_request(self, actor, inbox, followid): domain = urlparse(inbox).hostname try: if (request := self.get_request(domain)): request["followid"] = followid except KeyError: pass self["follow-requests"][domain] = { "actor": actor, "inbox": inbox, "followid": followid, } def del_request(self, domain): if domain.startswith("http"): domain = urlparse(domain).hostname del self["follow-requests"][domain] def get_requests(self): return list(self["follow-requests"].items())