From c0b8db46aafe27610dee661664d71cc44ed9eb23 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Sat, 19 Nov 2022 23:45:47 -0700 Subject: [PATCH] Intern the relay --- projects/activitypub_relay/BUILD | 14 + projects/activitypub_relay/Dockerfile | 22 + projects/activitypub_relay/README.md | 80 +++ projects/activitypub_relay/relay.sh | 7 + projects/activitypub_relay/relay.yaml.example | 43 ++ .../src/python/relay/__init__.py | 3 + .../src/python/relay/__main__.py | 397 +++++++++++++ .../src/python/relay/application.py | 128 +++++ .../src/python/relay/config.py | 222 ++++++++ .../src/python/relay/database.py | 189 +++++++ .../src/python/relay/http_debug.py | 62 ++ .../src/python/relay/logger.py | 35 ++ .../src/python/relay/misc.py | 533 ++++++++++++++++++ .../src/python/relay/processors.py | 115 ++++ .../src/python/relay/views.py | 210 +++++++ tools/python/requirements.txt | 22 +- 16 files changed, 2081 insertions(+), 1 deletion(-) create mode 100644 projects/activitypub_relay/BUILD create mode 100644 projects/activitypub_relay/Dockerfile create mode 100644 projects/activitypub_relay/README.md create mode 100755 projects/activitypub_relay/relay.sh create mode 100644 projects/activitypub_relay/relay.yaml.example create mode 100644 projects/activitypub_relay/src/python/relay/__init__.py create mode 100644 projects/activitypub_relay/src/python/relay/__main__.py create mode 100644 projects/activitypub_relay/src/python/relay/application.py create mode 100644 projects/activitypub_relay/src/python/relay/config.py create mode 100644 projects/activitypub_relay/src/python/relay/database.py create mode 100644 projects/activitypub_relay/src/python/relay/http_debug.py create mode 100644 projects/activitypub_relay/src/python/relay/logger.py create mode 100644 projects/activitypub_relay/src/python/relay/misc.py create mode 100644 projects/activitypub_relay/src/python/relay/processors.py create mode 100644 projects/activitypub_relay/src/python/relay/views.py diff --git a/projects/activitypub_relay/BUILD b/projects/activitypub_relay/BUILD new file mode 100644 index 0000000..524411e --- /dev/null +++ b/projects/activitypub_relay/BUILD @@ -0,0 +1,14 @@ +py_project( + name = "pelorama_relay", + lib_deps = [ + py_requirement("aiohttp"), + py_requirement("async_lru"), + py_requirement("cachetools"), + py_requirement("click"), + py_requirement("pycryptodome"), + py_requirement("pyyaml"), + py_requirement("retry"), + ], + main = "src/python/relay/__main__.py", + shebang = "/usr/bin/env python3" +) diff --git a/projects/activitypub_relay/Dockerfile b/projects/activitypub_relay/Dockerfile new file mode 100644 index 0000000..aa4a80c --- /dev/null +++ b/projects/activitypub_relay/Dockerfile @@ -0,0 +1,22 @@ +FROM library/python:3.10 + MAINTAINER Reid McKenzie + +RUN pip install --upgrade pip + +RUN adduser app +RUN mkdir /app +RUN chown -R app:app /app +USER app +WORKDIR /app +ENV PATH="/app/.local/bin:${PATH}" +ENV PYTHONPATH="/app:${PYTHONPATH}" + +### App specific crap +# Deps vary least so do them first +RUN pip install --user install aiohttp async_lru cachetools click pycryptodome pyyaml retry + +COPY --chown=app:app src/python . +COPY --chown=app:app relay.yaml . +COPY --chown=app:app relay.jsonld . + +CMD ["python3", "relay/__main__.py", "-c", "relay.yaml"] diff --git a/projects/activitypub_relay/README.md b/projects/activitypub_relay/README.md new file mode 100644 index 0000000..9612913 --- /dev/null +++ b/projects/activitypub_relay/README.md @@ -0,0 +1,80 @@ +# ActivityPub Relay + +A generic ActivityPub/LitePub compatible with Pleroma and Mastodon. + +### What is a relay? + +A relay is a webserver implementing ActivityPub/LitePub. +Normally when posting content on an ActivityPub server, that content is only listed publicly on the feed of the hosting server, and servers to which your server announces that content. + +Relays provide a way for ActivityPub servers to announce posts to and receive posts from a wider audience. +For instance there are [public lists](https://github.com/brodi1/activitypub-relays) of nodes offering relaying. + +### Nuts and bolts of ActivityPub + +[ActivityPub](https://www.w3.org/TR/activitypub/) is a protocol by which [Actors](https://www.w3.org/TR/activitypub/#actors) exchange messages. +An actor consists of two key things - an inbox and an outbox. +The inbox is a URL to which other actors can `POST` messages. +The outbox is a URL naming a paginated collection which other actors can `GET` to read messages from. + +Any user in an ActivityPub system, for instance `@arrdem@macaw.social` is an actor with such an inbox outbox pair. +ActivityPub messages for follows of users or messages mentioning users are implemented with messages directly to the user's outbox. + +In addition, Mastodon ActivityPub servers themselves have ["instance actors"](https://github.com/mastodon/mastodon/issues/10453). +These actors communicate using [server to server interactions](https://www.w3.org/TR/activitypub/#server-to-server-interactions). +Instance actors (f. ex. `https://macaw.social/actor`) emit and receive messages regarding system level activities. +New posts (`Create`) are the major thing, but plenty of other messages flow through too. + +The relay "protocol" is a Follow of the relay's actor by an ActivityPub server that wishes to participate in the relay. +The relay will acknowledge the follow request, and then itself follow the participating server back. +These two follow relationships in place, the participating server will announce any new posts to the relay since the relay follows it. +This is just the normal ActivityPub behavior. +Likewise, the relay will announce any new posts it gets from elsewhere to the participating server. +Same deal. + +That's it. + +### Why relays at all? + +In theory, two ActivityPub servers can talk to each other directly. +Why relays? + +Relays are 1 to N bridges. +Follow one relay, get activity from every other host on that bridge. +This makes it easier for new, small nodes to participate in a wider network since they only need to follow one host not follow every single other host. +Traditional relay/clearing house model. + +### What is this program? + +The relay itself is a webserver providing two major components. +- A hardcoded webfinger implementation for resolving the user `acct:relay@your.relay.hostname` +- A message relay, which will perform a _stateless_ relay of new activity to connected nodes + +The relay offers three moderation capabilities: +- An explicit allowlist mode restricting connected nodes +- A denylist mode restricting connected nodes by name +- A denylist mode restricting connected nodes by self-identified server software + +## Getting Started + +Normally, you would direct your LitePub instance software to follow the LitePub actor found on the relay. +In Pleroma this would be something like: + + $ MIX_ENV=prod mix relay_follow https://your.relay.hostname/actor + +On Mastodon the process is similar, in `Administration > Relays > Add New Relay` one would list the relay's URL + + https://your.relay.hostname/actor + +## Status + +- Works +- Poorly packaged +- Not yet threaded (nor is upstream) +- Not yet tested (nor is upstream) +- Missing web-oriented administrative functionality +- Missing web-oriented configuration/state management + +## Copyright + +This work is derived from https://git.pleroma.social/pleroma/relay, which is published under the terms of the AGPLv3. diff --git a/projects/activitypub_relay/relay.sh b/projects/activitypub_relay/relay.sh new file mode 100755 index 0000000..dec0fd2 --- /dev/null +++ b/projects/activitypub_relay/relay.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env sh + +cd "$(realpath $(dirname $0))" + +bazel build :pelorama_relay + +exec ../../bazel-bin/projects/pelorama_relay/pelorama_relay -c $(realpath ./relay.yaml) diff --git a/projects/activitypub_relay/relay.yaml.example b/projects/activitypub_relay/relay.yaml.example new file mode 100644 index 0000000..26749e4 --- /dev/null +++ b/projects/activitypub_relay/relay.yaml.example @@ -0,0 +1,43 @@ +# this is the path that the object graph will get dumped to (in JSON-LD format), +# you probably shouldn't change it, but you can if you want. +db: relay.jsonld + +# Listener +listen: 0.0.0.0 +port: 8080 + +# Note +note: "Make a note about your instance here." + +# maximum number of inbox posts to do at once +post_limit: 512 + +# this section is for ActivityPub +ap: + # this is used for generating activitypub messages, as well as instructions for + # linking AP identities. it should be an SSL-enabled domain reachable by https. + host: 'relay.example.com' + + blocked_instances: + - 'bad-instance.example.com' + - 'another-bad-instance.example.com' + + whitelist_enabled: false + + whitelist: + - 'good-instance.example.com' + - 'another.good-instance.example.com' + + # uncomment the lines below to prevent certain activitypub software from posting + # to the relay (all known relays by default). this uses the software name in nodeinfo + #blocked_software: + #- 'activityrelay' + #- 'aoderelay' + #- 'social.seattle.wa.us-relay' + #- 'unciarelay' + +# cache limits as number of items. only change this if you know what you're doing +cache: + objects: 1024 + actors: 1024 + digests: 1024 diff --git a/projects/activitypub_relay/src/python/relay/__init__.py b/projects/activitypub_relay/src/python/relay/__init__.py new file mode 100644 index 0000000..67e68f5 --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/__init__.py @@ -0,0 +1,3 @@ +__version__ = "0.2.2" + +from . import logger diff --git a/projects/activitypub_relay/src/python/relay/__main__.py b/projects/activitypub_relay/src/python/relay/__main__.py new file mode 100644 index 0000000..370ddbe --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/__main__.py @@ -0,0 +1,397 @@ +import Crypto +import asyncio +import click +import platform + +from urllib.parse import urlparse + +from relay import misc, __version__ +from relay.application import Application, request_id_middleware +from relay.config import relay_software_names + + +app = None + + +@click.group( + "cli", context_settings={"show_default": True}, invoke_without_command=True +) +@click.option("--config", "-c", default="relay.yaml", help="path to the relay's config") +@click.version_option(version=__version__, prog_name="ActivityRelay") +@click.pass_context +def cli(ctx, config): + global app + app = Application(config, middlewares=[request_id_middleware]) + + if not ctx.invoked_subcommand: + if app.config.host.endswith("example.com"): + relay_setup.callback() + + else: + relay_run.callback() + + +@cli.group("inbox") +@click.pass_context +def cli_inbox(ctx): + "Manage the inboxes in the database" + pass + + +@cli_inbox.command("list") +def cli_inbox_list(): + "List the connected instances or relays" + + click.echo("Connected to the following instances or relays:") + + for inbox in app.database.inboxes: + click.echo(f"- {inbox}") + + +@cli_inbox.command("follow") +@click.argument("actor") +def cli_inbox_follow(actor): + "Follow an actor (Relay must be running)" + + if app.config.is_banned(actor): + return click.echo(f"Error: Refusing to follow banned actor: {actor}") + + if not actor.startswith("http"): + domain = actor + actor = f"https://{actor}/actor" + + else: + domain = urlparse(actor).hostname + + try: + inbox_data = app.database["relay-list"][domain] + inbox = inbox_data["inbox"] + + except KeyError: + actor_data = asyncio.run(misc.request(actor)) + inbox = actor_data.shared_inbox + + message = misc.Message.new_follow(host=app.config.host, actor=actor.id) + + asyncio.run(misc.request(inbox, message)) + click.echo(f"Sent follow message to actor: {actor}") + + +@cli_inbox.command("unfollow") +@click.argument("actor") +def cli_inbox_unfollow(actor): + "Unfollow an actor (Relay must be running)" + + if not actor.startswith("http"): + domain = actor + actor = f"https://{actor}/actor" + + else: + domain = urlparse(actor).hostname + + try: + inbox_data = app.database["relay-list"][domain] + inbox = inbox_data["inbox"] + message = misc.Message.new_unfollow( + host=app.config.host, actor=actor, follow=inbox_data["followid"] + ) + + except KeyError: + actor_data = asyncio.run(misc.request(actor)) + inbox = actor_data.shared_inbox + message = misc.Message.new_unfollow( + host=app.config.host, + actor=actor, + follow={ + "type": "Follow", + "object": actor, + "actor": f"https://{app.config.host}/actor", + }, + ) + + asyncio.run(misc.request(inbox, message)) + click.echo(f"Sent unfollow message to: {actor}") + + +@cli_inbox.command("add") +@click.argument("inbox") +def cli_inbox_add(inbox): + "Add an inbox to the database" + + if not inbox.startswith("http"): + inbox = f"https://{inbox}/inbox" + + if app.config.is_banned(inbox): + return click.echo(f"Error: Refusing to add banned inbox: {inbox}") + + if app.database.add_inbox(inbox): + app.database.save() + return click.echo(f"Added inbox to the database: {inbox}") + + click.echo(f"Error: Inbox already in database: {inbox}") + + +@cli_inbox.command("remove") +@click.argument("inbox") +def cli_inbox_remove(inbox): + "Remove an inbox from the database" + + try: + dbinbox = app.database.get_inbox(inbox, fail=True) + + except KeyError: + click.echo(f"Error: Inbox does not exist: {inbox}") + return + + app.database.del_inbox(dbinbox["domain"]) + app.database.save() + + click.echo(f"Removed inbox from the database: {inbox}") + + +@cli.group("instance") +def cli_instance(): + "Manage instance bans" + pass + + +@cli_instance.command("list") +def cli_instance_list(): + "List all banned instances" + + click.echo("Banned instances or relays:") + + for domain in app.config.blocked_instances: + click.echo(f"- {domain}") + + +@cli_instance.command("ban") +@click.argument("target") +def cli_instance_ban(target): + "Ban an instance and remove the associated inbox if it exists" + + if target.startswith("http"): + target = urlparse(target).hostname + + if app.config.ban_instance(target): + app.config.save() + + if app.database.del_inbox(target): + app.database.save() + + click.echo(f"Banned instance: {target}") + return + + click.echo(f"Instance already banned: {target}") + + +@cli_instance.command("unban") +@click.argument("target") +def cli_instance_unban(target): + "Unban an instance" + + if app.config.unban_instance(target): + app.config.save() + + click.echo(f"Unbanned instance: {target}") + return + + click.echo(f"Instance wasn't banned: {target}") + + +@cli.group("software") +def cli_software(): + "Manage banned software" + pass + + +@cli_software.command("list") +def cli_software_list(): + "List all banned software" + + click.echo("Banned software:") + + for software in app.config.blocked_software: + click.echo(f"- {software}") + + +@cli_software.command("ban") +@click.option( + "--fetch-nodeinfo/--ignore-nodeinfo", + "-f", + "fetch_nodeinfo", + default=False, + help="Treat NAME like a domain and try to fet the software name from nodeinfo", +) +@click.argument("name") +def cli_software_ban(name, fetch_nodeinfo): + "Ban software. Use RELAYS for NAME to ban relays" + + if name == "RELAYS": + for name in relay_software_names: + app.config.ban_software(name) + + app.config.save() + return click.echo("Banned all relay software") + + if fetch_nodeinfo: + software = asyncio.run(misc.fetch_nodeinfo(name)) + + if not software: + click.echo(f"Failed to fetch software name from domain: {name}") + + name = software + + if config.ban_software(name): + app.config.save() + return click.echo(f"Banned software: {name}") + + click.echo(f"Software already banned: {name}") + + +@cli_software.command("unban") +@click.option( + "--fetch-nodeinfo/--ignore-nodeinfo", + "-f", + "fetch_nodeinfo", + default=False, + help="Treat NAME like a domain and try to fet the software name from nodeinfo", +) +@click.argument("name") +def cli_software_unban(name, fetch_nodeinfo): + "Ban software. Use RELAYS for NAME to unban relays" + + if name == "RELAYS": + for name in relay_software_names: + app.config.unban_software(name) + + config.save() + return click.echo("Unbanned all relay software") + + if fetch_nodeinfo: + software = asyncio.run(misc.fetch_nodeinfo(name)) + + if not software: + click.echo(f"Failed to fetch software name from domain: {name}") + + name = software + + if app.config.unban_software(name): + app.config.save() + return click.echo(f"Unbanned software: {name}") + + click.echo(f"Software wasn't banned: {name}") + + +@cli.group("whitelist") +def cli_whitelist(): + "Manage the instance whitelist" + pass + + +@cli_whitelist.command("list") +def cli_whitelist_list(): + click.echo("Current whitelisted domains") + + for domain in app.config.whitelist: + click.echo(f"- {domain}") + + +@cli_whitelist.command("add") +@click.argument("instance") +def cli_whitelist_add(instance): + "Add an instance to the whitelist" + + if not app.config.add_whitelist(instance): + return click.echo(f"Instance already in the whitelist: {instance}") + + app.config.save() + click.echo(f"Instance added to the whitelist: {instance}") + + +@cli_whitelist.command("remove") +@click.argument("instance") +def cli_whitelist_remove(instance): + "Remove an instance from the whitelist" + + if not app.config.del_whitelist(instance): + return click.echo(f"Instance not in the whitelist: {instance}") + + app.config.save() + + if app.config.whitelist_enabled: + if app.database.del_inbox(inbox): + app.database.save() + + click.echo(f"Removed instance from the whitelist: {instance}") + + +@cli.command("setup") +def relay_setup(): + "Generate a new config" + + while True: + app.config.host = click.prompt( + "What domain will the relay be hosted on?", default=app.config.host + ) + + if not app.config.host.endswith("example.com"): + break + + click.echo("The domain must not be example.com") + + app.config.listen = click.prompt( + "Which address should the relay listen on?", default=app.config.listen + ) + + while True: + app.config.port = click.prompt( + "What TCP port should the relay listen on?", + default=app.config.port, + type=int, + ) + break + + app.config.save() + + if not app["is_docker"] and click.confirm( + "Relay all setup! Would you like to run it now?" + ): + relay_run.callback() + + +@cli.command("run") +def relay_run(): + "Run the relay" + + if app.config.host.endswith("example.com"): + return click.echo( + 'Relay is not set up. Please edit your relay config or run "activityrelay setup".' + ) + + vers_split = platform.python_version().split(".") + pip_command = "pip3 uninstall pycrypto && pip3 install pycryptodome" + + if Crypto.__version__ == "2.6.1": + if int(vers_split[1]) > 7: + click.echo( + "Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting..." + ) + return click.echo(pip_command) + + else: + click.echo( + "Warning: PyCrypto is old and should be replaced with pycryptodome" + ) + return click.echo(pip_command) + + if not misc.check_open_port(app.config.listen, app.config.port): + return click.echo( + f"Error: A server is already running on port {app.config.port}" + ) + + app.run() + + +if __name__ == "__main__": + cli(prog_name="relay") diff --git a/projects/activitypub_relay/src/python/relay/application.py b/projects/activitypub_relay/src/python/relay/application.py new file mode 100644 index 0000000..d8e99fc --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/application.py @@ -0,0 +1,128 @@ +import asyncio +import logging +import os +import signal + +from aiohttp import web +from cachetools import LRUCache +from datetime import datetime, timedelta + +from relay.config import RelayConfig +from relay.database import RelayDatabase +from relay.misc import DotDict, check_open_port, set_app +from relay.views import routes + +from uuid import uuid4 + + +class Application(web.Application): + def __init__(self, cfgpath, middlewares=None): + web.Application.__init__(self, middlewares=middlewares) + + self["starttime"] = None + self["running"] = False + self["is_docker"] = bool(os.environ.get("DOCKER_RUNNING")) + self["config"] = RelayConfig(cfgpath, self["is_docker"]) + + if not self["config"].load(): + self["config"].save() + + self["database"] = RelayDatabase(self["config"]) + self["database"].load() + + self["cache"] = DotDict( + { + key: Cache(maxsize=self["config"][key]) + for key in self["config"].cachekeys + } + ) + self["semaphore"] = asyncio.Semaphore(self["config"].push_limit) + + self.set_signal_handler() + set_app(self) + + @property + def cache(self): + return self["cache"] + + @property + def config(self): + return self["config"] + + @property + def database(self): + return self["database"] + + @property + def is_docker(self): + return self["is_docker"] + + @property + def semaphore(self): + return self["semaphore"] + + @property + def uptime(self): + if not self["starttime"]: + return timedelta(seconds=0) + + uptime = datetime.now() - self["starttime"] + + return timedelta(seconds=uptime.seconds) + + def set_signal_handler(self): + signal.signal(signal.SIGHUP, self.stop) + signal.signal(signal.SIGINT, self.stop) + signal.signal(signal.SIGQUIT, self.stop) + signal.signal(signal.SIGTERM, self.stop) + + def run(self): + if not check_open_port(self.config.listen, self.config.port): + return logging.error( + f"A server is already running on port {self.config.port}" + ) + + for route in routes: + self.router.add_route(*route) + + logging.info( + f"Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})" + ) + asyncio.run(self.handle_run()) + + def stop(self, *_): + self["running"] = False + + async def handle_run(self): + self["running"] = True + + runner = web.AppRunner( + self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"' + ) + await runner.setup() + + site = web.TCPSite( + runner, host=self.config.listen, port=self.config.port, reuse_address=True + ) + + await site.start() + self["starttime"] = datetime.now() + + while self["running"]: + await asyncio.sleep(0.25) + + await site.stop() + + self["starttime"] = None + self["running"] = False + + +class Cache(LRUCache): + def set_maxsize(self, value): + self.__maxsize = int(value) + + +@web.middleware +async def request_id_middleware(request, handler): + request.id = uuid4() + return await handler(request) diff --git a/projects/activitypub_relay/src/python/relay/config.py b/projects/activitypub_relay/src/python/relay/config.py new file mode 100644 index 0000000..d810965 --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/config.py @@ -0,0 +1,222 @@ +from pathlib import Path +from urllib.parse import urlparse + +from relay.misc import DotDict + +import yaml + + +relay_software_names = [ + "activityrelay", + "aoderelay", + "social.seattle.wa.us-relay", + "unciarelay", +] + + +class RelayConfig(DotDict): + apkeys = { + "host", + "whitelist_enabled", + "blocked_software", + "blocked_instances", + "whitelist", + } + + cachekeys = {"json", "objects", "digests"} + + def __init__(self, path, is_docker): + DotDict.__init__(self, {}) + + if is_docker: + path = "/data/relay.yaml" + + self._isdocker = is_docker + self._path = Path(path).expanduser() + + self.reset() + + def __setitem__(self, key, value): + if self._isdocker and key in ["db", "listen", "port"]: + return + + if key in ["blocked_instances", "blocked_software", "whitelist"]: + assert isinstance(value, (list, set, tuple)) + + elif key in ["port", "json", "objects", "digests"]: + assert isinstance(value, (int)) + + elif key == "whitelist_enabled": + assert isinstance(value, bool) + + super().__setitem__(key, value) + + @property + def db(self): + return Path(self["db"]).expanduser().resolve() + + @property + def path(self): + return self._path + + @property + def actor(self): + return f"https://{self.host}/actor" + + @property + def inbox(self): + return f"https://{self.host}/inbox" + + @property + def keyid(self): + return f"{self.actor}#main-key" + + def reset(self): + self.clear() + self.update( + { + "db": str(self._path.parent.joinpath(f"{self._path.stem}.jsonld")), + "listen": "0.0.0.0", + "port": 8080, + "note": "Make a note about your instance here.", + "push_limit": 512, + "host": "relay.example.com", + "blocked_software": [], + "blocked_instances": [], + "whitelist": [], + "whitelist_enabled": False, + "json": 1024, + "objects": 1024, + "digests": 1024, + } + ) + + def ban_instance(self, instance): + if instance.startswith("http"): + instance = urlparse(instance).hostname + + if self.is_banned(instance): + return False + + self.blocked_instances.append(instance) + return True + + def unban_instance(self, instance): + if instance.startswith("http"): + instance = urlparse(instance).hostname + + try: + self.blocked_instances.remove(instance) + return True + + except: + return False + + def ban_software(self, software): + if self.is_banned_software(software): + return False + + self.blocked_software.append(software) + return True + + def unban_software(self, software): + try: + self.blocked_software.remove(software) + return True + + except: + return False + + def add_whitelist(self, instance): + if instance.startswith("http"): + instance = urlparse(instance).hostname + + if self.is_whitelisted(instance): + return False + + self.whitelist.append(instance) + return True + + def del_whitelist(self, instance): + if instance.startswith("http"): + instance = urlparse(instance).hostname + + try: + self.whitelist.remove(instance) + return True + + except: + return False + + def is_banned(self, instance): + if instance.startswith("http"): + instance = urlparse(instance).hostname + + return instance in self.blocked_instances + + def is_banned_software(self, software): + if not software: + return False + + return software.lower() in self.blocked_software + + def is_whitelisted(self, instance): + if instance.startswith("http"): + instance = urlparse(instance).hostname + + return instance in self.whitelist + + def load(self): + self.reset() + + options = {} + + try: + options["Loader"] = yaml.FullLoader + + except AttributeError: + pass + + try: + with open(self.path) as fd: + config = yaml.load(fd, **options) + + except FileNotFoundError: + return False + + if not config: + return False + + for key, value in config.items(): + if key in ["ap", "cache"]: + for k, v in value.items(): + if k not in self: + continue + + self[k] = v + + elif key not in self: + continue + + self[key] = value + + if self.host.endswith("example.com"): + return False + + return True + + def save(self): + config = { + "db": self["db"], + "listen": self.listen, + "port": self.port, + "note": self.note, + "push_limit": self.push_limit, + "ap": {key: self[key] for key in self.apkeys}, + "cache": {key: self[key] for key in self.cachekeys}, + } + + with open(self._path, "w") as fd: + yaml.dump(config, fd, sort_keys=False) + + return config diff --git a/projects/activitypub_relay/src/python/relay/database.py b/projects/activitypub_relay/src/python/relay/database.py new file mode 100644 index 0000000..94d945f --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/database.py @@ -0,0 +1,189 @@ +import json +import logging + +from Crypto.PublicKey import RSA +from urllib.parse import urlparse + + +class RelayDatabase(dict): + def __init__(self, config): + dict.__init__( + self, + { + "relay-list": {}, + "private-key": None, + "follow-requests": {}, + "version": 1, + }, + ) + + self.config = config + self.PRIVKEY = None + + @property + def PUBKEY(self): + return self.PRIVKEY.publickey() + + @property + def pubkey(self): + return self.PUBKEY.exportKey("PEM").decode("utf-8") + + @property + def privkey(self): + return self["private-key"] + + @property + def hostnames(self): + return tuple(self["relay-list"].keys()) + + @property + def inboxes(self): + return tuple(data["inbox"] for data in self["relay-list"].values()) + return self["relay-list"] + + def generate_key(self): + self.PRIVKEY = RSA.generate(4096) + self["private-key"] = self.PRIVKEY.exportKey("PEM").decode("utf-8") + + def load(self): + new_db = True + + try: + with self.config.db.open() as fd: + data = json.load(fd) + + self["version"] = data.get("version", None) + self["private-key"] = data.get("private-key") + + if self["version"] == None: + self["version"] = 1 + + if "actorKeys" in data: + self["private-key"] = data["actorKeys"]["privateKey"] + + for item in data.get("relay-list", []): + domain = urlparse(item).hostname + self["relay-list"][domain] = {"inbox": item, "followid": None} + + else: + self["relay-list"] = data.get("relay-list", {}) + + for domain in self["relay-list"].keys(): + if self.config.is_banned(domain) or ( + self.config.whitelist_enabled + and not self.config.is_whitelisted(domain) + ): + self.del_inbox(domain) + + new_db = False + + except FileNotFoundError: + pass + + except json.decoder.JSONDecodeError as e: + if self.config.db.stat().st_size > 0: + raise e from None + + if not self.privkey: + logging.info("No actor keys present, generating 4096-bit RSA keypair.") + self.generate_key() + + else: + self.PRIVKEY = RSA.importKey(self.privkey) + + self.save() + return not new_db + + def save(self): + with self.config.db.open("w") as fd: + json.dump(self, fd, indent=4) + + def get_inbox(self, domain, fail=False): + if domain.startswith("http"): + domain = urlparse(domain).hostname + + if domain not in self["relay-list"]: + if fail: + raise KeyError(domain) + + return + + return self["relay-list"][domain] + + def add_inbox(self, inbox, followid=None, fail=False): + assert inbox.startswith("https"), "Inbox must be a url" + domain = urlparse(inbox).hostname + + if self.get_inbox(domain): + if fail: + raise KeyError(domain) + + return False + + self["relay-list"][domain] = { + "domain": domain, + "inbox": inbox, + "followid": followid, + } + + logging.debug(f"Added inbox to database: {inbox}") + return self["relay-list"][domain] + + def del_inbox(self, domain, followid=None, fail=False): + data = self.get_inbox(domain, fail=False) + + if not data: + if fail: + raise KeyError(domain) + + return False + + if not data["followid"] or not followid or data["followid"] == followid: + del self["relay-list"][data["domain"]] + logging.debug(f'Removed inbox from database: {data["inbox"]}') + return True + + if fail: + raise ValueError("Follow IDs do not match") + + logging.debug( + f'Follow ID does not match: db = {data["followid"]}, object = {followid}' + ) + return False + + def set_followid(self, domain, followid): + data = self.get_inbox(domain, fail=True) + data["followid"] = followid + + def get_request(self, domain, fail=True): + if domain.startswith("http"): + domain = urlparse(domain).hostname + + try: + return self["follow-requests"][domain] + + except KeyError as e: + if fail: + raise e + + def add_request(self, actor, inbox, followid): + domain = urlparse(inbox).hostname + + try: + request = self.get_request(domain) + request["followid"] = followid + + except KeyError: + pass + + self["follow-requests"][domain] = { + "actor": actor, + "inbox": inbox, + "followid": followid, + } + + def del_request(self, domain): + if domain.startswith("http"): + domain = urlparse(inbox).hostname + + del self["follow-requests"][domain] diff --git a/projects/activitypub_relay/src/python/relay/http_debug.py b/projects/activitypub_relay/src/python/relay/http_debug.py new file mode 100644 index 0000000..17a71dd --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/http_debug.py @@ -0,0 +1,62 @@ +import logging +import aiohttp + +from collections import defaultdict + + +STATS = { + "requests": defaultdict(int), + "response_codes": defaultdict(int), + "response_codes_per_domain": defaultdict(lambda: defaultdict(int)), + "delivery_codes": defaultdict(int), + "delivery_codes_per_domain": defaultdict(lambda: defaultdict(int)), + "exceptions": defaultdict(int), + "exceptions_per_domain": defaultdict(lambda: defaultdict(int)), + "delivery_exceptions": defaultdict(int), + "delivery_exceptions_per_domain": defaultdict(lambda: defaultdict(int)), +} + + +async def on_request_start(session, trace_config_ctx, params): + logging.debug("HTTP START [%r], [%r]", session, params) + + STATS["requests"][params.url.host] += 1 + + +async def on_request_end(session, trace_config_ctx, params): + logging.debug("HTTP END [%r], [%r]", session, params) + + host = params.url.host + status = params.response.status + + STATS["response_codes"][status] += 1 + STATS["response_codes_per_domain"][host][status] += 1 + + if params.method == "POST": + STATS["delivery_codes"][status] += 1 + STATS["delivery_codes_per_domain"][host][status] += 1 + + +async def on_request_exception(session, trace_config_ctx, params): + logging.debug("HTTP EXCEPTION [%r], [%r]", session, params) + + host = params.url.host + exception = repr(params.exception) + + STATS["exceptions"][exception] += 1 + STATS["exceptions_per_domain"][host][exception] += 1 + + if params.method == "POST": + STATS["delivery_exceptions"][exception] += 1 + STATS["delivery_exceptions_per_domain"][host][exception] += 1 + + +def http_debug(): + if logging.DEBUG >= logging.root.level: + return + + trace_config = aiohttp.TraceConfig() + trace_config.on_request_start.append(on_request_start) + trace_config.on_request_end.append(on_request_end) + trace_config.on_request_exception.append(on_request_exception) + return [trace_config] diff --git a/projects/activitypub_relay/src/python/relay/logger.py b/projects/activitypub_relay/src/python/relay/logger.py new file mode 100644 index 0000000..dfcc804 --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/logger.py @@ -0,0 +1,35 @@ +import logging +import os + +from pathlib import Path + + +## Get log level and file from environment if possible +env_log_level = os.environ.get("LOG_LEVEL", "INFO").upper() + +try: + env_log_file = Path(os.environ.get("LOG_FILE")).expanduser().resolve() + +except TypeError: + env_log_file = None + + +## Make sure the level from the environment is valid +try: + log_level = getattr(logging, env_log_level) + +except AttributeError: + log_level = logging.INFO + + +## Set logging config +handlers = [logging.StreamHandler()] + +if env_log_file: + handlers.append(logging.FileHandler(env_log_file)) + +logging.basicConfig( + level=log_level, + format="[%(asctime)s] %(levelname)s: %(message)s", + handlers=handlers, +) diff --git a/projects/activitypub_relay/src/python/relay/misc.py b/projects/activitypub_relay/src/python/relay/misc.py new file mode 100644 index 0000000..b371c79 --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/misc.py @@ -0,0 +1,533 @@ +import base64 +import json +import logging +import socket +import uuid +from random import randint + +from Crypto.Hash import SHA, SHA256, SHA512 +from Crypto.PublicKey import RSA +from Crypto.Signature import PKCS1_v1_5 +from aiohttp import ClientSession +from aiohttp.client_exceptions import ClientResponseError, ClientConnectorError +from aiohttp.hdrs import METH_ALL as METHODS +from aiohttp.web import Response as AiohttpResponse, View as AiohttpView +from datetime import datetime +from json.decoder import JSONDecodeError +from urllib.parse import urlparse +from async_lru import alru_cache + +from relay.http_debug import http_debug + +from retry import retry + + +app = None + +HASHES = {"sha1": SHA, "sha256": SHA256, "sha512": SHA512} + +MIMETYPES = { + "activity": "application/activity+json", + "html": "text/html", + "json": "application/json", + "text": "text/plain", +} + +NODEINFO_NS = { + "20": "http://nodeinfo.diaspora.software/ns/schema/2.0", + "21": "http://nodeinfo.diaspora.software/ns/schema/2.1", +} + + +def set_app(new_app): + global app + app = new_app + + +def build_signing_string(headers, used_headers): + return "\n".join(map(lambda x: ": ".join([x.lower(), headers[x]]), used_headers)) + + +def check_open_port(host, port): + if host == "0.0.0.0": + host = "127.0.0.1" + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + return s.connect_ex((host, port)) != 0 + + except socket.error as e: + return False + + +def create_signature_header(headers): + headers = {k.lower(): v for k, v in headers.items()} + used_headers = headers.keys() + sigstring = build_signing_string(headers, used_headers) + + sig = { + "keyId": app.config.keyid, + "algorithm": "rsa-sha256", + "headers": " ".join(used_headers), + "signature": sign_signing_string(sigstring, app.database.PRIVKEY), + } + + chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()] + return ",".join(chunks) + + +def distill_inboxes(object_id): + for inbox in app.database.inboxes: + if (urlparse(inbox).hostname != urlparse(object_id).hostname): + yield inbox + + +def generate_body_digest(body): + h = SHA256.new(body.encode("utf-8")) + bodyhash = base64.b64encode(h.digest()).decode("utf-8") + + return bodyhash + + +def sign_signing_string(sigstring, key): + pkcs = PKCS1_v1_5.new(key) + h = SHA256.new() + h.update(sigstring.encode("ascii")) + sigdata = pkcs.sign(h) + + return base64.b64encode(sigdata).decode("utf-8") + + +def split_signature(sig): + default = {"headers": "date"} + + sig = sig.strip().split(",") + + for chunk in sig: + k, _, v = chunk.partition("=") + v = v.strip('"') + default[k] = v + + default["headers"] = default["headers"].split() + return default + + +async def fetch_actor_key(actor): + actor_data = await request(actor) + + if not actor_data: + return None + + try: + return RSA.importKey(actor_data["publicKey"]["publicKeyPem"]) + + except Exception as e: + logging.debug(f"Exception occured while fetching actor key: {e}") + + +@alru_cache +async def fetch_nodeinfo(domain): + nodeinfo_url = None + wk_nodeinfo = await request( + f"https://{domain}/.well-known/nodeinfo", sign_headers=False, activity=False + ) + + if not wk_nodeinfo: + return + + wk_nodeinfo = WKNodeinfo(wk_nodeinfo) + + for version in ["20", "21"]: + try: + nodeinfo_url = wk_nodeinfo.get_url(version) + + except KeyError: + pass + + if not nodeinfo_url: + logging.debug(f"Failed to fetch nodeinfo url for domain: {domain}") + return False + + nodeinfo = await request(nodeinfo_url, sign_headers=False, activity=False) + + try: + return nodeinfo["software"]["name"] + + except KeyError: + return False + + +@retry(exceptions=(ClientConnectorError, ClientResponseError), backoff=1, tries=3) +async def request(uri, data=None, force=False, sign_headers=True, activity=True): + ## If a get request and not force, try to use the cache first + url = urlparse(uri) + method = "POST" if data else "GET" + action = data.get("type") if data else None + headers = { + "Accept": f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', + "User-Agent": "ActivityRelay", + } + + if data: + headers["Content-Type"] = MIMETYPES["activity" if activity else "json"] + + if sign_headers: + signing_headers = { + "(request-target)": f"{method.lower()} {url.path}", + "Date": datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT"), + "Host": url.netloc, + } + + if data: + assert isinstance(data, dict) + + data = json.dumps(data) + signing_headers.update( + { + "Digest": f"SHA-256={generate_body_digest(data)}", + "Content-Length": str(len(data.encode("utf-8"))), + } + ) + + signing_headers["Signature"] = create_signature_header(signing_headers) + + del signing_headers["(request-target)"] + del signing_headers["Host"] + + headers.update(signing_headers) + + logging.info("%s %s", method, uri) + # logging.debug("%r %r %r %r", method, uri, headers, data) + + async with ClientSession(trace_configs=http_debug()) as session: + async with session.request(method, uri, headers=headers, data=data) as resp: + resp.raise_for_status() + + ## aiohttp has been known to leak if the response hasn't been read, + ## so we're just gonna read the request no matter what + resp_data = await resp.read() + + ## Not expecting a response, so just return + if resp.status == 202: + return + + elif resp.status != 200: + if not resp_data: + return logging.debug(f"Received error when requesting {uri}: {resp.status} {resp_data}") + + return logging.debug(f"Received error when sending {action} to {uri}: {resp.status} {resp_data}") + + if resp.content_type == MIMETYPES["activity"]: + resp_data = await resp.json(loads=Message.new_from_json) + + elif resp.content_type == MIMETYPES["json"]: + resp_data = await resp.json(loads=DotDict.new_from_json) + + else: + logging.debug(f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f"Response: {resp_data}") + + logging.debug(f"{uri} >> resp {resp_data}") + + return resp_data + + +async def validate_signature(actor, http_request): + pubkey = await fetch_actor_key(actor) + + if not pubkey: + return False + + logging.debug(f"actor key: {pubkey}") + + headers = {key.lower(): value for key, value in http_request.headers.items()} + headers["(request-target)"] = " ".join( + [ + http_request.method.lower(), + http_request.path + ] + ) + + sig = split_signature(headers["signature"]) + logging.debug(f"sigdata: {sig}") + + sigstring = build_signing_string(headers, sig["headers"]) + logging.debug(f"sigstring: {sigstring}") + + sign_alg, _, hash_alg = sig["algorithm"].partition("-") + logging.debug(f"sign alg: {sign_alg}, hash alg: {hash_alg}") + + sigdata = base64.b64decode(sig["signature"]) + + pkcs = PKCS1_v1_5.new(pubkey) + h = HASHES[hash_alg].new() + h.update(sigstring.encode("ascii")) + result = pkcs.verify(h, sigdata) + + http_request["validated"] = result + + logging.debug(f"validates? {result}") + return result + + +class DotDict(dict): + def __init__(self, _data, **kwargs): + dict.__init__(self) + + self.update(_data, **kwargs) + + def __hasattr__(self, k): + return k in self + + def __getattr__(self, k): + try: + return self[k] + + except KeyError: + raise AttributeError( + f"{self.__class__.__name__} object has no attribute {k}" + ) from None + + def __setattr__(self, k, v): + if k.startswith("_"): + super().__setattr__(k, v) + + else: + self[k] = v + + def __setitem__(self, k, v): + if type(v) == dict: + v = DotDict(v) + + super().__setitem__(k, v) + + def __delattr__(self, k): + try: + dict.__delitem__(self, k) + + except KeyError: + raise AttributeError( + f"{self.__class__.__name__} object has no attribute {k}" + ) from None + + @classmethod + def new_from_json(cls, data): + if not data: + raise JSONDecodeError("Empty body", data, 1) + + try: + return cls(json.loads(data)) + + except ValueError: + raise JSONDecodeError("Invalid body", data, 1) + + def to_json(self, indent=None): + return json.dumps(self, indent=indent) + + def update(self, _data, **kwargs): + if isinstance(_data, dict): + for key, value in _data.items(): + self[key] = value + + elif isinstance(_data, (list, tuple, set)): + for key, value in _data: + self[key] = value + + for key, value in kwargs.items(): + self[key] = value + + # misc properties + @property + def domain(self): + return urlparse(getattr(self, 'id', None) or getattr(self, 'actor')).hostname + + # actor properties + @property + def pubkey(self): + return self.publicKey.publicKeyPem + + @property + def shared_inbox(self): + return self.get("endpoints", {}).get("sharedInbox", self.inbox) + + # activity properties + @property + def actorid(self): + if isinstance(self.actor, dict): + return self.actor.id + + return self.actor + + @property + def objectid(self): + if isinstance(self.object, dict): + return self.object.id + + return self.object + + +class Message(DotDict): + @classmethod + def new_actor(cls, host, pubkey, description=None): + return cls( + { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"https://{host}/actor", + "type": "Application", + "preferredUsername": "relay", + "name": "ActivityRelay", + "summary": description or "ActivityRelay bot", + "followers": f"https://{host}/followers", + "following": f"https://{host}/following", + "inbox": f"https://{host}/inbox", + "url": f"https://{host}/inbox", + "endpoints": {"sharedInbox": f"https://{host}/inbox"}, + "publicKey": { + "id": f"https://{host}/actor#main-key", + "owner": f"https://{host}/actor", + "publicKeyPem": pubkey, + }, + } + ) + + @classmethod + def new_announce(cls, host, object): + return cls( + { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"https://{host}/activities/{uuid.uuid4()}", + "type": "Announce", + "to": [f"https://{host}/followers"], + "actor": f"https://{host}/actor", + "object": object, + } + ) + + @classmethod + def new_follow(cls, host, actor): + return cls( + { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Follow", + "to": [actor], + "object": actor, + "id": f"https://{host}/activities/{uuid.uuid4()}", + "actor": f"https://{host}/actor", + } + ) + + @classmethod + def new_unfollow(cls, host, actor, follow): + return cls( + { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"https://{host}/activities/{uuid.uuid4()}", + "type": "Undo", + "to": [actor], + "actor": f"https://{host}/actor", + "object": follow, + } + ) + + @classmethod + def new_response(cls, host, actor, followid, accept): + return cls( + { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"https://{host}/activities/{uuid.uuid4()}", + "type": "Accept" if accept else "Reject", + "to": [actor], + "actor": f"https://{host}/actor", + "object": { + "id": followid, + "type": "Follow", + "object": f"https://{host}/actor", + "actor": actor, + }, + } + ) + + +class Response(AiohttpResponse): + @classmethod + def new(cls, body="", status=200, headers=None, ctype="text"): + kwargs = { + "status": status, + "headers": headers, + "content_type": MIMETYPES[ctype], + } + + if isinstance(body, bytes): + kwargs["body"] = body + + elif isinstance(body, dict) and ctype in {"json", "activity"}: + kwargs["text"] = json.dumps(body) + + else: + kwargs["text"] = body + + return cls(**kwargs) + + @classmethod + def new_error(cls, status, body, ctype="text"): + if ctype == "json": + body = json.dumps({"status": status, "error": body}) + + return cls.new(body=body, status=status, ctype=ctype) + + @property + def location(self): + return self.headers.get("Location") + + @location.setter + def location(self, value): + self.headers["Location"] = value + + +class View(AiohttpView): + async def _iter(self): + if self.request.method not in METHODS: + self._raise_allowed_methods() + + method = getattr(self, self.request.method.lower(), None) + + if method is None: + self._raise_allowed_methods() + + return await method(**self.request.match_info) + + @property + def app(self): + return self._request.app + + @property + def cache(self): + return self.app.cache + + @property + def config(self): + return self.app.config + + @property + def database(self): + return self.app.database + + +class WKNodeinfo(DotDict): + @classmethod + def new(cls, v20, v21): + return cls( + { + "links": [ + {"rel": NODEINFO_NS["20"], "href": v20}, + {"rel": NODEINFO_NS["21"], "href": v21}, + ] + } + ) + + def get_url(self, version="20"): + for item in self.links: + if item["rel"] == NODEINFO_NS[version]: + return item["href"] + + raise KeyError(version) diff --git a/projects/activitypub_relay/src/python/relay/processors.py b/projects/activitypub_relay/src/python/relay/processors.py new file mode 100644 index 0000000..1191cb2 --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/processors.py @@ -0,0 +1,115 @@ +import asyncio +import json +import logging + +from relay import misc + + +async def handle_relay(request, actor, data, software): + if data.objectid in request.app.cache.objects: + logging.info(f"already relayed {data.objectid}") + return + + logging.info(f"Relaying post from {data.actorid}") + + message = misc.Message.new_announce( + host=request.app.config.host, object=data.objectid + ) + + logging.debug(f">> relay: {message}") + + inboxes = misc.distill_inboxes(data.objectid) + futures = [misc.request(inbox, data=message) for inbox in inboxes] + + asyncio.ensure_future(asyncio.gather(*futures)) + request.app.cache.objects[data.objectid] = message.id + + +async def handle_forward(request, actor, data, software): + if data.id in request.app.cache.objects: + logging.info(f"already forwarded {data.id}") + return + + message = misc.Message.new_announce(host=request.app.config.host, object=data) + + logging.info(f"Forwarding post from {actor.id}") + logging.debug(f">> Relay {data}") + + inboxes = misc.distill_inboxes(data.id) + futures = [misc.request(inbox, data=message) for inbox in inboxes] + + asyncio.ensure_future(asyncio.gather(*futures)) + request.app.cache.objects[data.id] = message.id + + +async def handle_follow(request, actor, data, software): + if not request.app.database.add_inbox(actor.shared_inbox, data.id): + request.app.database.set_followid(actor.id, data.id) + + request.app.database.save() + + await misc.request( + actor.shared_inbox, + misc.Message.new_response( + host=request.app.config.host, + actor=actor.id, + followid=data.id, + accept=True + ), + ) + + await misc.request( + actor.shared_inbox, + misc.Message.new_follow( + host=request.app.config.host, + actor=actor.id + ), + ) + + +async def handle_undo(request, actor, data, software): + ## If the object is not a Follow, forward it + if data["object"]["type"] != "Follow": + return await handle_forward(request, actor, data, software) + + if not request.app.database.del_inbox(actor.domain, data.id): + return + + request.app.database.save() + + message = misc.Message.new_unfollow( + host=request.app.config.host, actor=actor.id, follow=data + ) + + await misc.request(actor.shared_inbox, message) + + +async def handle_dont(request, actor, data, software): + """Handle something by ... not handling it.""" + + logging.info(f"Disregarding {data!r}") + + +processors = { + "Announce": handle_relay, + "Create": handle_relay, + "Delete": handle_forward, + "Follow": handle_follow, + "Undo": handle_undo, + "Update": handle_forward, +} + + +async def run_processor(request, actor, data, software): + if data.type not in processors: + return + + logging.info(f'{request.id}: New "{data.type}" from actor: {actor.id}') + logging.debug(f'{request.id}: {data!r}') + + env = dict(data=data, actor=actor, software=software) + try: + return await processors.get(data.type, handle_dont)(request, actor, data, software) + + except: + logging.exception(f'{request.id}] {env!r}') diff --git a/projects/activitypub_relay/src/python/relay/views.py b/projects/activitypub_relay/src/python/relay/views.py new file mode 100644 index 0000000..cf602ad --- /dev/null +++ b/projects/activitypub_relay/src/python/relay/views.py @@ -0,0 +1,210 @@ +import json +import logging + +from relay import __version__, misc +from relay.http_debug import STATS +from relay.misc import DotDict, Message, Response, WKNodeinfo +from relay.processors import run_processor + +from aiohttp.web import HTTPUnauthorized + +routes = [] + + +def register_route(method, path): + def wrapper(func): + routes.append([method, path, func]) + return func + + return wrapper + + +@register_route("GET", "/") +async def home(request): + targets = "
".join(request.app.database.hostnames) + note = request.app.config.note + count = len(request.app.database.hostnames) + host = request.app.config.host + + text = f"""\ + +ActivityPub Relay at {host} + + + +

This is an Activity Relay for fediverse instances.

+

{note}

+

To host your own relay, you may download the code at this address: https://git.pleroma.social/pleroma/relay

+

List of {count} registered instances:
{targets}

+""" + + return Response.new(text, ctype="html") + + +@register_route("GET", "/inbox") +@register_route("GET", "/actor") +async def actor(request): + data = Message.new_actor( + host=request.app.config.host, pubkey=request.app.database.pubkey + ) + + return Response.new(data, ctype="activity") + + +@register_route("POST", "/inbox") +@register_route("POST", "/actor") +async def inbox(request): + config = request.app.config + database = request.app.database + + # reject if missing signature header + if "signature" not in request.headers: + logging.debug("Actor missing signature header") + raise HTTPUnauthorized(body="missing signature") + + # read message and get actor id and domain + try: + data = await request.json(loads=Message.new_from_json) + + if "actor" not in data: + raise KeyError("actor") + + # reject if there is no actor in the message + except KeyError: + logging.debug("actor not in data") + return Response.new_error(400, "no actor in message", "json") + + except: + logging.exception("Failed to parse inbox message") + return Response.new_error(400, "failed to parse message", "json") + + # FIXME: A lot of this code assumes that we're going to have access to the entire actor descriptor. + # This isn't something we actually generally need, and it's not clear how used it is. + # The original relay implementation mostly used to determine activity source domain for relaying. + # This has been refactored out, since there are other sources of this information. + # This PROBABLY means we can do without this data ever ... but here it is. + + # Trying to deal with actors/visibility + if isinstance(data.object, dict) and not data.object.get('discoverable', True): + actor = DotDict({"id": "dummy-for-undiscoverable-object"}) + + # Normal path of looking up the actor... + else: + try: + # FIXME: Needs a cache + actor = await misc.request(data.actorid) + except: + logging.exception(f'{request.id}: {data!r}') + return + + logging.debug(f"Inbox >> {data!r}") + + # reject if actor is empty + if not actor: + logging.debug(f"Failed to fetch actor: {data.actorid}") + return Response.new_error(400, "failed to fetch actor", "json") + + # reject if the actor isn't whitelisted while the whiltelist is enabled + elif config.whitelist_enabled and not config.is_whitelisted(data.domain): + logging.debug( + f"Rejected actor for not being in the whitelist: {data.actorid}" + ) + return Response.new_error(403, "access denied", "json") + + # reject if actor is banned + if request.app["config"].is_banned(data.domain): + logging.debug(f"Ignored request from banned actor: {data.actorid}") + return Response.new_error(403, "access denied", "json") + + # FIXME: Needs a cache + software = await misc.fetch_nodeinfo(data.domain) + + # reject if software used by actor is banned + if config.blocked_software: + if config.is_banned_software(software): + logging.debug(f"Rejected actor for using specific software: {software}") + return Response.new_error(403, "access denied", "json") + + # reject if the signature is invalid + if not (await misc.validate_signature(data.actorid, request)): + logging.debug(f"signature validation failed for: {data.actorid}") + return Response.new_error(401, "signature check failed", "json") + + # reject if activity type isn't 'Follow' and the actor isn't following + if data["type"] != "Follow" and not database.get_inbox(data.domain): + logging.debug( + f"Rejected actor for trying to post while not following: {data.actorid}" + ) + return Response.new_error(401, "access denied", "json") + + logging.debug(f">> payload {data}") + + await run_processor(request, actor, data, software) + return Response.new(status=202) + + +@register_route("GET", "/.well-known/webfinger") +async def webfinger(request): + subject = request.query["resource"] + + if subject != f"acct:relay@{request.app.config.host}": + return Response.new_error(404, "user not found", "json") + + data = { + "subject": subject, + "aliases": [request.app.config.actor], + "links": [ + { + "href": request.app.config.actor, + "rel": "self", + "type": "application/activity+json", + }, + { + "href": request.app.config.actor, + "rel": "self", + "type": 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + }, + ], + } + + return Response.new(data, ctype="json") + + +@register_route("GET", "/nodeinfo/{version:\d.\d\.json}") +async def nodeinfo_2_0(request): + niversion = request.match_info["version"][:3] + data = { + "openRegistrations": not request.app.config.whitelist_enabled, + "protocols": ["activitypub"], + "services": {"inbound": [], "outbound": []}, + "software": {"name": "activityrelay", "version": __version__}, + "usage": {"localPosts": 0, "users": {"total": 1}}, + "metadata": {"peers": request.app.database.hostnames}, + "version": niversion, + } + + if niversion == "2.1": + data["software"]["repository"] = "https://git.pleroma.social/pleroma/relay" + + return Response.new(data, ctype="json") + + +@register_route("GET", "/.well-known/nodeinfo") +async def nodeinfo_wellknown(request): + data = WKNodeinfo.new( + v20=f"https://{request.app.config.host}/nodeinfo/2.0.json", + v21=f"https://{request.app.config.host}/nodeinfo/2.1.json", + ) + + return Response.new(data, ctype="json") + + +@register_route("GET", "/stats") +async def stats(*args, **kwargs): + return Response.new(STATS, ctype="json") diff --git a/tools/python/requirements.txt b/tools/python/requirements.txt index da5a540..4c4fa91 100644 --- a/tools/python/requirements.txt +++ b/tools/python/requirements.txt @@ -1,13 +1,19 @@ aiohttp==3.8.1 aiosignal==1.2.0 alabaster==0.7.12 +async-lru==1.0.3 async-timeout==4.0.2 attrs==21.4.0 autoflake==1.4 Babel==2.9.1 +bases==0.2.1 beautifulsoup4==4.10.0 black==21.8b0 +blake3==0.3.1 bleach==4.1.0 +borg==2012.4.1 +cachetools==5.2.0 +cbor2==5.4.3 certifi==2021.10.8 chardet==4.0.0 charset-normalizer==2.0.10 @@ -15,7 +21,9 @@ click==7.1.2 colored==1.4.3 commonmark==0.9.1 coverage==6.2 -dataclasses +Cython==0.29.30 +dataclasses==0.6 +decorator==5.1.1 Deprecated==1.2.13 docutils==0.17.1 ExifRead==2.3.2 @@ -45,8 +53,11 @@ mccabe==0.6.1 meraki==1.24.0 mirakuru==2.4.1 mistune==2.0.1 +mmh3==3.0.0 multidict==5.2.0 +multiformats==0.1.4.post3 mypy-extensions==0.4.3 +numpy==1.23.1 octorest==0.4 openapi-schema-validator==0.2.0 openapi-spec-validator==0.3.1 @@ -56,6 +67,7 @@ pathspec==0.9.0 pep517==0.12.0 pip==21.3.1 pip-tools==6.4.0 +plac==1.3.5 platformdirs==2.4.1 pluggy==1.0.0 port-for==0.6.1 @@ -66,10 +78,14 @@ pudb==2022.1 pur==5.4.2 py==1.11.0 pycodestyle==2.8.0 +pycryptodome==3.15.0 +pycryptodomex==3.15.0 pyflakes==2.4.0 Pygments==2.11.2 pyparsing==3.0.6 pyrsistent==0.18.1 +pysha3==1.0.2 +pyskein==1.0 pytest==6.2.5 pytest-cov==3.0.0 pytest-postgresql==4.1.0 @@ -84,6 +100,8 @@ regex==2021.11.10 requests==2.27.1 requests-toolbelt==0.9.1 requirements-parser==0.3.1 +retry==0.9.2 +scipy==1.8.1 setuptools==60.5.0 six==1.16.0 smbus2==0.4.1 @@ -107,12 +125,14 @@ toposort==1.7 tornado==6.1 typed-ast==1.5.1 types-setuptools==57.4.7 +typing-validation==0.0.1.post7 typing_extensions==4.0.1 unify==0.5 untokenize==0.1.1 urllib3==1.26.8 urwid==2.1.2 urwid-readline==0.13 +wasm==1.2 wcwidth==0.2.5 webencodings==0.5.1 websocket-client==1.2.3