[NO TESTS] WIP

This commit is contained in:
Reid 'arrdem' McKenzie 2023-08-16 19:07:11 -06:00
parent d21b1b2ddc
commit 9043cbe92b

View file

@ -6,62 +6,97 @@ import tomllib
from typing import Optional from typing import Optional
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from time import sleep from time import sleep
from pprint import pformat
import click import click
import requests import requests
import pytimeparse import pytimeparse
class Client(object): class Client(object):
BASE = "https://api.github.com" BASE = "https://api.github.com"
HEADERS = {"Accept": "application/vnd.github+json"} HEADERS = {"Accept": "application/vnd.github+json"}
def __init__(self, token): def __init__(self, token):
self._token = token self._token = token
self._session = requests.Session() self._headers = {"Authorization": f"Bearer {token}", **self.HEADERS}
self._session.headers["Authorization"] = f"Bearer {token}"
def get_notifications(self, page=1): def get_notifications(self, page=1, since: Optional[datetime] = None):
resp = self._session.get(f"{self.BASE}/notifications", headers=self.HEADERS, params={"page": page, "all": "false"}) resp = requests.get(
f"{self.BASE}/notifications",
headers=self._headers,
params={
"page": page,
"all": "false",
"since": since.isoformat() if since else None,
},
)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
def get_all_notifications(self): def get_all_notifications(self, since: Optional[datetime] = None):
page = 1 page = 1
while True: while True:
results = self.get_notifications(page=page) results = self.get_notifications(page=page, since=since)
if not results: if not results:
return return
yield from results yield from results
page += 1 page += 1
def read(self, notif): def read(self, notif):
return self._session.patch(notif["url"]).raise_for_status() return requests.patch(notif["url"], headers=self._headers).raise_for_status()
def unsubscribe(self, notif): def unsubscribe(self, notif):
return self._session.delete(notif["subscription_url"]).raise_for_status() return requests.delete(
notif["subscription_url"], headers=self._headers
).raise_for_status()
def get_pr(self, def get_pr(
self,
url: Optional[str] = None, url: Optional[str] = None,
repo: Optional[str] = None, repo: Optional[str] = None,
id: Optional[int] = None): id: Optional[int] = None,
):
url = url or f"{self.BASE}/{repo}/pulls/{id}" url = url or f"{self.BASE}/{repo}/pulls/{id}"
resp = self._session.get(url, headers=self.HEADERS) resp = requests.get(url, headers=self._headers)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
def get_pr_reviewers(self, pr): def get_pr_reviewers(self, pr):
url = pr["url"] + "/requested_reviewers" url = pr["url"] + "/requested_reviewers"
resp = self._session.get(url, headers=self.HEADERS) resp = requests.get(url, headers=self._headers)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
def get_user(self): def get_user(self):
resp = self._session.get(f"{self.BASE}/user") resp = requests.get(f"{self.BASE}/user", headers=self._headers)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
def get_user_teams(self): def get_user_teams(self):
resp = self._session.get(f"{self.BASE}/user/teams") resp = requests.get(f"{self.BASE}/user/teams", headers=self._headers)
resp.raise_for_status()
return resp.json()
def get_issue(
self,
url: Optional[str] = None,
repo: Optional[str] = None,
id: Optional[int] = None,
):
url = url or f"{self.BASE}/{repo}/issues/{id}"
resp = requests.get(url, headers=self._headers)
resp.raise_for_status()
return resp.json()
def get_comments(
self,
url: Optional[str] = None,
repo: Optional[str] = None,
id: Optional[int] = None,
):
url = url or f"{self.BASE}/{repo}/issues/{id}/comments"
resp = requests.get(url, headers=self._headers)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
@ -72,7 +107,13 @@ def cli():
@cli.command() @cli.command()
@click.option("--config", "config_path", type=Path, default=lambda: Path(os.getenv("BUILD_WORKSPACE_DIRECTORY", "")) / "projects/gh-unnotifier/config.toml") @click.option(
"--config",
"config_path",
type=Path,
default=lambda: Path(os.getenv("BUILD_WORKSPACE_DIRECTORY", ""))
/ "projects/gh-unnotifier/config.toml",
)
def oneshot(config_path: Path): def oneshot(config_path: Path):
with open(config_path, "rb") as fp: with open(config_path, "rb") as fp:
config = tomllib.load(fp) config = tomllib.load(fp)
@ -90,7 +131,13 @@ def oneshot(config_path: Path):
print("Resolved", notif["id"]) print("Resolved", notif["id"])
continue continue
print(notif["id"], notif["subscription_url"], notif["reason"], notif["subject"], pr) print(
notif["id"],
notif["subscription_url"],
notif["reason"],
notif["subject"],
pr,
)
def parse_seconds(text: str) -> timedelta: def parse_seconds(text: str) -> timedelta:
@ -98,22 +145,29 @@ def parse_seconds(text: str) -> timedelta:
@cli.command() @cli.command()
@click.option("--config", "config_path", type=Path, default=lambda: Path(os.getenv("BUILD_WORKSPACE_DIRECTORY", "")) / "projects/gh-unnotifier/config.toml") @click.option(
@click.option("--schedule", "schedule", default="1 minute", type=parse_seconds) "--config",
"config_path",
type=Path,
default=lambda: Path(os.getenv("BUILD_WORKSPACE_DIRECTORY", ""))
/ "projects/gh-unnotifier/config.toml",
)
@click.option("--schedule", "schedule", default="30 seconds", type=parse_seconds)
def maintain(config_path: Path, schedule: timedelta): def maintain(config_path: Path, schedule: timedelta):
with open(config_path, "rb") as fp: with open(config_path, "rb") as fp:
config = tomllib.load(fp) config = tomllib.load(fp)
client = Client(config["gh-unnotifier"]["api_key"]) client = Client(config["gh-unnotifier"]["api_key"])
org_shitlist = config["gh-unnotifier"].get("org_shitlist", []) org_shitlist = config["gh-unnotifier"].get("org_shitlist", [])
team_shitlist = config["gh-unnotifier"].get("team_shitlist", [])
user = client.get_user() user = client.get_user()
user_teams = {it["slug"] for it in client.get_user_teams()} user_teams = {it["slug"] for it in client.get_user_teams()}
mark = None mark = None
def _resolve(notif): def _resolve(notif, reason):
client.read(notif) client.read(notif)
client.unsubscribe(notif) client.unsubscribe(notif)
click.echo(f"Resolved {notif['id']}") click.echo(f"Resolved {notif['id']} {reason} ({notif['subject']})")
while True: while True:
try: try:
@ -121,27 +175,58 @@ def maintain(config_path: Path, schedule: timedelta):
mark = datetime.now(timezone.utc) mark = datetime.now(timezone.utc)
tick = mark + schedule tick = mark + schedule
assert tick - schedule == mark assert tick - schedule == mark
for notif in client.get_all_notifications(): for notif in client.get_all_notifications(since=prev):
subject = notif["subject"] subject = notif["subject"]
# Don't waste time on notifications which haven't changed since the last scrub
updated_at = datetime.fromisoformat(notif["updated_at"])
if prev and updated_at < prev:
continue
pr = None pr = None
if "/pulls/" in subject["url"]: if subject["type"] == "PullRequest":
if notif["reason"] == "review_requested":
pr = client.get_pr(url=subject["url"]) pr = client.get_pr(url=subject["url"])
if pr["state"] == "closed": reviewers = client.get_pr_reviewers(pr)
_resolve(notif) if (
any(org in subject["url"] for org in org_shitlist)
and not any(
it["login"] == user["login"]
for it in reviewers.get("users", [])
)
and not any(
it["slug"] in user_teams
and it["slug"] not in team_shitlist
for it in reviewers.get("teams", [])
)
):
_resolve(notif, "Reviewed")
continue continue
elif notif["reason"] == "team_mention":
pr = client.get_pr(url=subject["url"])
reviewers = client.get_pr_reviewers(pr) reviewers = client.get_pr_reviewers(pr)
if any(org in subject["url"] for org in org_shitlist) and not any(it["login"] == user["login"] for it in reviewers.get("users", [])) and not any(it["slug"] in user_teams for it in reviewers.get("teams", [])): if (
_resolve(notif) any(org in subject["url"] for org in org_shitlist)
and not any(
it["login"] == user["login"]
for it in reviewers.get("users", [])
)
and not any(
it["slug"] in user_teams
and it["slug"] not in team_shitlist
for it in reviewers.get("teams", [])
)
):
_resolve(notif, "Ignoring team mention")
continue continue
elif subject["type"] == "Issue":
issue = client.get_issue(url=subject["url"])
if issue["state"] == "closed":
comments = client.get_comments(url=issue["comments_url"])
if not any(
it["user"]["login"] == user["login"] for it in comments
):
_resolve(notif, "Unengaged issue closed")
duration = (tick - datetime.now(timezone.utc)).total_seconds() duration = (tick - datetime.now(timezone.utc)).total_seconds()
if duration > 0: if duration > 0:
sleep(duration) sleep(duration)