Update benchmarks

This commit is contained in:
Reid 'arrdem' McKenzie 2021-08-20 10:10:04 -06:00
parent 01a5a1b67d
commit 2b101bf02c
3 changed files with 110 additions and 39 deletions

View file

@ -39,7 +39,7 @@ Benchmarks are extremely steady.
Flushing a sqlite file to disk seems to be the limiting factor of I/O, and pipelining multiple message writes is undoubtably the way to go.
However the purpose of the API is to use the sqlite file as the shared checkpoint between potentially many processes, so 'large' transactions are an antipattern.
Tests suggest that this library is rock steady at 100 writes per sec. and 100 polls per sec. and completely bounded by sqlite controlled I/O as evidenced by using `":memory:"` which doesn't have to `fsync()`.
The `naive_fsync` benchmark takes how long a simple `f.write(); f.flush(); os.fsync(f.fnum())` takes, and represents a lower bound for "transactional" I/O in a strictly appending system. That `insert` clocks in at about 10ms/op whereas `naive_fsync` clocks in at about 4.5ms suggests that the "overhead" imposed by SQLite is actually pretty reasonable, and only ~2x gains are possible without sacrificing durability.
``` shell
$ bazel run :benchmark
@ -51,40 +51,54 @@ Target //projects/jobq:benchmark up-to-date:
...
Ran 'insert' 10000 times, total time 101.810816516 (s)
mean: 0.010148992843 (s)
median: 0.009474293 (s)
stddev: 0.006727934042954838 (s)
test overhead: 3.20888086e-05 (s)
Getting a baseline
Ran 'naive_serialize' 10000 times, total time 61.108668 ms
mean: 4.4800743 us
median: 3.876 us
stddev: 2.9536885497940415 us
test overhead: 1.6307925 us
Ran 'poll' 10000 times, total time 100.482262487 (s)
mean: 0.0100152467857 (s)
median: 0.0095528585 (s)
stddev: 0.00821730176268304 (s)
test overhead: 3.2979463000000004e-05 (s)
Ran 'naive_fsync' 10000 times, total time 46.169303029 s
mean: 4.5833899318 ms
median: 4.3935955 ms
stddev: 3.6405235897574997 ms
test overhead: 33.540371099999994 us
Ran 'append_event' 10000 times, total time 105.015296419 (s)
mean: 0.0104681294652 (s)
median: 0.009592544 (s)
stddev: 0.007321370576225584 (s)
test overhead: 3.34001767e-05 (s)
Testing with /tmp/jobq-bench.sqlite3
Ran 'insert' 10000 times, total time 106.925228341 s
mean: 10.660283108 ms
median: 9.6410375 ms
stddev: 7.318317035793426 ms
test overhead: 32.2397261 us
Ran 'poll' 10000 times, total time 102.727780818 s
mean: 10.2409366588 ms
median: 9.7355345 ms
stddev: 6.566850750848292 ms
test overhead: 31.841423 us
Ran 'append_event' 10000 times, total time 105.075667417 s
mean: 10.4747147621 ms
median: 9.6445595 ms
stddev: 8.221955029149633 ms
test overhead: 32.8519796 us
Testing with :memory:
Ran 'insert' 10000 times, total time 0.37031511 (s)
mean: 3.3595880100000005e-05 (s)
median: 2.96015e-05 (s)
stddev: 1.045088890675899e-05 (s)
test overhead: 3.4356309e-06 (s)
Ran 'insert' 10000 times, total time 527.735484 ms
mean: 49.054487699999996 us
median: 43.823 us
stddev: 13.212556522688379 us
test overhead: 3.7190607 us
Ran 'poll' 10000 times, total time 1.17148314 (s)
mean: 0.0001128911222 (s)
median: 9.7398e-05 (s)
stddev: 3.213524197973896e-05 (s)
test overhead: 4.2571917999999996e-06 (s)
Ran 'poll' 10000 times, total time 4.995857505 s
mean: 495.27983409999996 us
median: 161.8315 us
stddev: 443.6358523771585 us
test overhead: 4.3059164 us
Ran 'append_event' 10000 times, total time 0.415490332 (s)
mean: 3.78861989e-05 (s)
median: 3.3019e-05 (s)
stddev: 1.1752889674795285e-05 (s)
test overhead: 3.6628343e-06 (s)
Ran 'append_event' 10000 times, total time 579.750342 ms
mean: 54.1721441 us
median: 48.054 us
stddev: 15.205861822465426 us
test overhead: 3.8028901 us
```

View file

@ -11,6 +11,7 @@ import string
from statistics import mean, median, stdev
import tempfile
import logging
import json
from jobq import JobQueue
@ -39,6 +40,20 @@ def timing():
obj.end = perf_counter_ns()
def timer(val: float) -> str:
"""Given a time in NS, convert it to integral NS/MS/S such that the non-decimal part is integral."""
for factor, unit in [
(1e9, 's'),
(1e6, 'ms'),
(1e3, 'us'),
(1, 'ns'),
]:
scaled_val = val / factor
if 1e4 > scaled_val > 1.0:
return f"{scaled_val} {unit}"
def bench(callable, reps):
timings = []
with timing() as run_t:
@ -46,16 +61,52 @@ def bench(callable, reps):
with timing() as t:
callable()
timings.append(t.duration)
print(f"""Ran {callable.__name__!r} {reps} times, total time {run_t.duration / 1e9} (s)
mean: {mean(timings) / 1e9} (s)
median: {median(timings) / 1e9} (s)
stddev: {stdev(timings) / 1e9} (s)
test overhead: {(run_t.duration - sum(timings)) / reps / 1e9} (s)
print(f"""Ran {callable.__name__!r} {reps} times, total time {timer(run_t.duration)}
mean: {timer(mean(timings))}
median: {timer(median(timings))}
stddev: {timer(stdev(timings))}
test overhead: {timer((run_t.duration - sum(timings)) / reps)}
""")
def test_reference_json(reps):
"""As a reference benchmark, test just appending to a file."""
jobs = [
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
for _ in range(reps)
]
jobs_i = iter(jobs)
def naive_serialize():
json.dumps([next(jobs_i), ["CREATED"]])
bench(naive_serialize, reps)
def test_reference_fsync(reps):
"""As a reference benchmark, test just appending to a file."""
jobs = [
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
for _ in range(reps)
]
jobs_i = iter(jobs)
handle, path = tempfile.mkstemp()
os.close(handle)
with open(path, "w") as fd:
def naive_fsync():
fd.write(json.dumps([next(jobs_i), ["CREATED"]]))
fd.flush()
os.fsync(fd.fileno())
bench(naive_fsync, reps)
def test_insert(q, reps):
# Measuring insertion time
"""Benchmark insertion time to a given SQLite DB."""
jobs = [
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
for _ in range(reps)
@ -69,14 +120,17 @@ def test_insert(q, reps):
def test_poll(q, reps):
"""Benchmark query/update time on a given SQLite DB."""
def poll():
q.poll([["=", "json_extract(j.state, '$[0]')", "'CREATED'"]], ["POLLED"])
q.poll("json_extract(j.state, '$[0]') = 'CREATED'", ["POLLED"])
bench(poll, reps)
def test_append(q, reps):
"""Benchmark adding an event on a given SQLite DB."""
def append_event():
q.append_event(randint(1, reps), {"foo": "bar"})
@ -95,6 +149,10 @@ if __name__ == "__main__":
if os.path.exists(path):
os.remove(path)
print("Getting a baseline")
test_reference_json(reps)
test_reference_fsync(reps)
# And the tests
print(f"Testing with {path}")
q = JobQueue(path)

View file

@ -258,7 +258,6 @@ class JobQueue(object):
with self._db as db:
cur = db.cursor()
statement = _POLL_SQL.format(compile_query(query))
print(statement)
cur.execute(statement, {"state": json.dumps(new_state)})
results = cur.fetchall()
if results: