source/projects/activitypub_relay/src/python/relay/config.py

213 lines
5.1 KiB
Python

import os
from pathlib import Path
from urllib.parse import urlparse
from relay.misc import DotDict
import yaml
class RelayConfig(DotDict):
apkeys = {
"host",
"whitelist_enabled",
"blocked_software",
"blocked_instances",
"whitelist",
}
cachekeys = {"json", "objects", "digests"}
def __init__(self, path):
DotDict.__init__(self, {})
self._path = Path(path).expanduser()
self.reset()
def __setitem__(self, key, value):
if key in ["blocked_instances", "blocked_software", "whitelist"]:
assert isinstance(value, (list, set, tuple))
elif key in ["port", "json", "objects", "digests"]:
assert isinstance(value, (int))
elif key == "whitelist_enabled":
assert isinstance(value, bool)
super().__setitem__(key, value)
@property
def is_docker(self):
return bool(os.getenv("DOCKER_RUNNING"))
@property
def db(self):
return Path(self["db"]).expanduser().resolve()
@property
def path(self):
return self._path
@property
def actor(self):
return f"https://{self.host}/actor"
@property
def inbox(self):
return f"https://{self.host}/inbox"
@property
def keyid(self):
return f"{self.actor}#main-key"
def reset(self):
self.clear()
self.update(
{
"db": str(self._path.parent.joinpath(f"{self._path.stem}.jsonld")),
"listen": "0.0.0.0",
"port": 8080,
"note": "Make a note about your instance here.",
"push_limit": 512,
"host": "relay.example.com",
"blocked_software": [],
"blocked_instances": [],
"whitelist": [],
"whitelist_enabled": False,
"json": 1024,
"objects": 1024,
"digests": 1024,
"admin_token": None,
}
)
def ban_instance(self, instance):
if instance.startswith("http"):
instance = urlparse(instance).hostname
if self.is_banned(instance):
return False
self.blocked_instances.append(instance)
return True
def unban_instance(self, instance):
if instance.startswith("http"):
instance = urlparse(instance).hostname
try:
self.blocked_instances.remove(instance)
return True
except:
return False
def ban_software(self, software):
if self.is_banned_software(software):
return False
self.blocked_software.append(software)
return True
def unban_software(self, software):
try:
self.blocked_software.remove(software)
return True
except:
return False
def add_whitelist(self, instance):
if instance.startswith("http"):
instance = urlparse(instance).hostname
if self.is_whitelisted(instance):
return False
self.whitelist.append(instance)
return True
def del_whitelist(self, instance):
if instance.startswith("http"):
instance = urlparse(instance).hostname
try:
self.whitelist.remove(instance)
return True
except:
return False
def is_banned(self, instance):
if instance.startswith("http"):
instance = urlparse(instance).hostname
return instance in self.blocked_instances
def is_banned_software(self, software):
if not software:
return False
return software.lower() in self.blocked_software
def is_whitelisted(self, instance):
if instance.startswith("http"):
instance = urlparse(instance).hostname
return instance in self.whitelist
def load(self):
self.reset()
options = {}
try:
options["Loader"] = yaml.FullLoader
except AttributeError:
pass
try:
with open(self.path) as fd:
config = yaml.load(fd, **options)
except FileNotFoundError:
return False
if not config:
return False
for key, value in config.items():
if key in ["ap", "cache"]:
for k, v in value.items():
if k not in self:
continue
self[k] = v
elif key not in self:
continue
self[key] = value
if self.host.endswith("example.com"):
return False
return True
def save(self):
config = {
"db": self["db"],
"listen": self.listen,
"port": self.port,
"note": self.note,
"push_limit": self.push_limit,
"ap": {key: self[key] for key in self.apkeys},
"cache": {key: self[key] for key in self.cachekeys},
}
with open(self._path, "w") as fd:
yaml.dump(config, fd, sort_keys=False)
return config