source/projects/jobq/benchmark.py

167 lines
3.7 KiB
Python
Raw Normal View History

"""
Benchmarking the jobq.
"""
from contextlib import contextmanager
2021-08-30 04:18:57 +00:00
import json
import logging
import os
2021-08-30 04:18:57 +00:00
from random import choice, randint
from statistics import mean, median, stdev
2021-08-30 04:18:57 +00:00
import string
import tempfile
2021-08-30 04:18:57 +00:00
from time import perf_counter_ns
from jobq import JobQueue
def randstr(len):
2021-08-30 04:18:57 +00:00
return "".join(choice(string.ascii_uppercase + string.digits) for _ in range(len))
class Timing(object):
def __init__(self, start):
self.start = start
self.end = None
@property
def duration(self):
if self.end:
return self.end - self.start
@contextmanager
def timing():
"""A context manager that produces a semi-mutable timing record."""
obj = Timing(perf_counter_ns())
yield obj
obj.end = perf_counter_ns()
2021-08-20 16:10:04 +00:00
def timer(val: float) -> str:
"""Given a time in NS, convert it to integral NS/MS/S such that the non-decimal part is integral."""
for factor, unit in [
2021-08-30 04:18:57 +00:00
(1e9, "s"),
(1e6, "ms"),
(1e3, "us"),
(1, "ns"),
2021-08-20 16:10:04 +00:00
]:
scaled_val = val / factor
if 1e4 > scaled_val > 1.0:
return f"{scaled_val} {unit}"
def bench(callable, reps):
timings = []
with timing() as run_t:
for _ in range(reps):
with timing() as t:
callable()
timings.append(t.duration)
2021-08-20 16:10:04 +00:00
print(f"""Ran {callable.__name__!r} {reps} times, total time {timer(run_t.duration)}
mean: {timer(mean(timings))}
median: {timer(median(timings))}
stddev: {timer(stdev(timings))}
test overhead: {timer((run_t.duration - sum(timings)) / reps)}
""")
2021-08-20 16:10:04 +00:00
def test_reference_json(reps):
"""As a reference benchmark, test just appending to a file."""
jobs = [
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
for _ in range(reps)
]
jobs_i = iter(jobs)
def naive_serialize():
json.dumps([next(jobs_i), ["CREATED"]])
bench(naive_serialize, reps)
def test_reference_fsync(reps):
"""As a reference benchmark, test just appending to a file."""
jobs = [
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
for _ in range(reps)
]
jobs_i = iter(jobs)
handle, path = tempfile.mkstemp()
os.close(handle)
with open(path, "w") as fd:
def naive_fsync():
fd.write(json.dumps([next(jobs_i), ["CREATED"]]))
fd.flush()
os.fsync(fd.fileno())
bench(naive_fsync, reps)
def test_insert(q, reps):
2021-08-20 16:10:04 +00:00
"""Benchmark insertion time to a given SQLite DB."""
jobs = [
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
for _ in range(reps)
]
jobs_i = iter(jobs)
def insert():
q.create(next(jobs_i), new_state=["CREATED"])
bench(insert, reps)
def test_poll(q, reps):
2021-08-20 16:10:04 +00:00
"""Benchmark query/update time on a given SQLite DB."""
def poll():
2021-08-20 16:10:04 +00:00
q.poll("json_extract(j.state, '$[0]') = 'CREATED'", ["POLLED"])
bench(poll, reps)
2021-08-20 05:45:15 +00:00
def test_append(q, reps):
2021-08-20 16:10:04 +00:00
"""Benchmark adding an event on a given SQLite DB."""
2021-08-20 05:45:15 +00:00
def append_event():
q.append_event(randint(1, reps), {"foo": "bar"})
bench(append_event, reps)
if __name__ == "__main__":
2021-08-20 05:45:15 +00:00
# No logs
logging.getLogger().setLevel(logging.WARN)
# Test params
reps = 10000
path = "/tmp/jobq-bench.sqlite3"
# Ensuring a clean-ish run env.
if os.path.exists(path):
os.remove(path)
2021-08-20 16:10:04 +00:00
print("Getting a baseline")
test_reference_json(reps)
test_reference_fsync(reps)
# And the tests
print(f"Testing with {path}")
q = JobQueue(path)
test_insert(q, reps)
test_poll(q, reps)
2021-08-20 05:45:15 +00:00
test_append(q, reps)
print(f"Testing with :memory:")
q = JobQueue(":memory:")
test_insert(q, reps)
test_poll(q, reps)
2021-08-20 05:45:15 +00:00
test_append(q, reps)