diff --git a/projects/jobq/README.md b/projects/jobq/README.md index f97d230..c08f5ce 100644 --- a/projects/jobq/README.md +++ b/projects/jobq/README.md @@ -39,7 +39,7 @@ Benchmarks are extremely steady. Flushing a sqlite file to disk seems to be the limiting factor of I/O, and pipelining multiple message writes is undoubtably the way to go. However the purpose of the API is to use the sqlite file as the shared checkpoint between potentially many processes, so 'large' transactions are an antipattern. -Tests suggest that this library is rock steady at 100 writes per sec. and 100 polls per sec. and completely bounded by sqlite controlled I/O as evidenced by using `":memory:"` which doesn't have to `fsync()`. +The `naive_fsync` benchmark takes how long a simple `f.write(); f.flush(); os.fsync(f.fnum())` takes, and represents a lower bound for "transactional" I/O in a strictly appending system. That `insert` clocks in at about 10ms/op whereas `naive_fsync` clocks in at about 4.5ms suggests that the "overhead" imposed by SQLite is actually pretty reasonable, and only ~2x gains are possible without sacrificing durability. ``` shell $ bazel run :benchmark @@ -51,40 +51,54 @@ Target //projects/jobq:benchmark up-to-date: ... -Ran 'insert' 10000 times, total time 101.810816516 (s) - mean: 0.010148992843 (s) - median: 0.009474293 (s) - stddev: 0.006727934042954838 (s) - test overhead: 3.20888086e-05 (s) +Getting a baseline +Ran 'naive_serialize' 10000 times, total time 61.108668 ms + mean: 4.4800743 us + median: 3.876 us + stddev: 2.9536885497940415 us + test overhead: 1.6307925 us -Ran 'poll' 10000 times, total time 100.482262487 (s) - mean: 0.0100152467857 (s) - median: 0.0095528585 (s) - stddev: 0.00821730176268304 (s) - test overhead: 3.2979463000000004e-05 (s) +Ran 'naive_fsync' 10000 times, total time 46.169303029 s + mean: 4.5833899318 ms + median: 4.3935955 ms + stddev: 3.6405235897574997 ms + test overhead: 33.540371099999994 us -Ran 'append_event' 10000 times, total time 105.015296419 (s) - mean: 0.0104681294652 (s) - median: 0.009592544 (s) - stddev: 0.007321370576225584 (s) - test overhead: 3.34001767e-05 (s) +Testing with /tmp/jobq-bench.sqlite3 +Ran 'insert' 10000 times, total time 106.925228341 s + mean: 10.660283108 ms + median: 9.6410375 ms + stddev: 7.318317035793426 ms + test overhead: 32.2397261 us + +Ran 'poll' 10000 times, total time 102.727780818 s + mean: 10.2409366588 ms + median: 9.7355345 ms + stddev: 6.566850750848292 ms + test overhead: 31.841423 us + +Ran 'append_event' 10000 times, total time 105.075667417 s + mean: 10.4747147621 ms + median: 9.6445595 ms + stddev: 8.221955029149633 ms + test overhead: 32.8519796 us Testing with :memory: -Ran 'insert' 10000 times, total time 0.37031511 (s) - mean: 3.3595880100000005e-05 (s) - median: 2.96015e-05 (s) - stddev: 1.045088890675899e-05 (s) - test overhead: 3.4356309e-06 (s) +Ran 'insert' 10000 times, total time 527.735484 ms + mean: 49.054487699999996 us + median: 43.823 us + stddev: 13.212556522688379 us + test overhead: 3.7190607 us -Ran 'poll' 10000 times, total time 1.17148314 (s) - mean: 0.0001128911222 (s) - median: 9.7398e-05 (s) - stddev: 3.213524197973896e-05 (s) - test overhead: 4.2571917999999996e-06 (s) +Ran 'poll' 10000 times, total time 4.995857505 s + mean: 495.27983409999996 us + median: 161.8315 us + stddev: 443.6358523771585 us + test overhead: 4.3059164 us -Ran 'append_event' 10000 times, total time 0.415490332 (s) - mean: 3.78861989e-05 (s) - median: 3.3019e-05 (s) - stddev: 1.1752889674795285e-05 (s) - test overhead: 3.6628343e-06 (s) +Ran 'append_event' 10000 times, total time 579.750342 ms + mean: 54.1721441 us + median: 48.054 us + stddev: 15.205861822465426 us + test overhead: 3.8028901 us ``` diff --git a/projects/jobq/benchmark.py b/projects/jobq/benchmark.py index 9d4b854..63ae800 100644 --- a/projects/jobq/benchmark.py +++ b/projects/jobq/benchmark.py @@ -11,6 +11,7 @@ import string from statistics import mean, median, stdev import tempfile import logging +import json from jobq import JobQueue @@ -39,6 +40,20 @@ def timing(): obj.end = perf_counter_ns() +def timer(val: float) -> str: + """Given a time in NS, convert it to integral NS/MS/S such that the non-decimal part is integral.""" + + for factor, unit in [ + (1e9, 's'), + (1e6, 'ms'), + (1e3, 'us'), + (1, 'ns'), + ]: + scaled_val = val / factor + if 1e4 > scaled_val > 1.0: + return f"{scaled_val} {unit}" + + def bench(callable, reps): timings = [] with timing() as run_t: @@ -46,16 +61,52 @@ def bench(callable, reps): with timing() as t: callable() timings.append(t.duration) - print(f"""Ran {callable.__name__!r} {reps} times, total time {run_t.duration / 1e9} (s) - mean: {mean(timings) / 1e9} (s) - median: {median(timings) / 1e9} (s) - stddev: {stdev(timings) / 1e9} (s) - test overhead: {(run_t.duration - sum(timings)) / reps / 1e9} (s) + print(f"""Ran {callable.__name__!r} {reps} times, total time {timer(run_t.duration)} + mean: {timer(mean(timings))} + median: {timer(median(timings))} + stddev: {timer(stdev(timings))} + test overhead: {timer((run_t.duration - sum(timings)) / reps)} """) +def test_reference_json(reps): + """As a reference benchmark, test just appending to a file.""" + + jobs = [ + {"user_id": randint(0, 1<<32), "msg": randstr(256)} + for _ in range(reps) + ] + jobs_i = iter(jobs) + + def naive_serialize(): + json.dumps([next(jobs_i), ["CREATED"]]) + + bench(naive_serialize, reps) + + +def test_reference_fsync(reps): + """As a reference benchmark, test just appending to a file.""" + + jobs = [ + {"user_id": randint(0, 1<<32), "msg": randstr(256)} + for _ in range(reps) + ] + jobs_i = iter(jobs) + + handle, path = tempfile.mkstemp() + os.close(handle) + with open(path, "w") as fd: + def naive_fsync(): + fd.write(json.dumps([next(jobs_i), ["CREATED"]])) + fd.flush() + os.fsync(fd.fileno()) + + bench(naive_fsync, reps) + + def test_insert(q, reps): - # Measuring insertion time + """Benchmark insertion time to a given SQLite DB.""" + jobs = [ {"user_id": randint(0, 1<<32), "msg": randstr(256)} for _ in range(reps) @@ -69,14 +120,17 @@ def test_insert(q, reps): def test_poll(q, reps): + """Benchmark query/update time on a given SQLite DB.""" def poll(): - q.poll([["=", "json_extract(j.state, '$[0]')", "'CREATED'"]], ["POLLED"]) + q.poll("json_extract(j.state, '$[0]') = 'CREATED'", ["POLLED"]) bench(poll, reps) def test_append(q, reps): + """Benchmark adding an event on a given SQLite DB.""" + def append_event(): q.append_event(randint(1, reps), {"foo": "bar"}) @@ -95,6 +149,10 @@ if __name__ == "__main__": if os.path.exists(path): os.remove(path) + print("Getting a baseline") + test_reference_json(reps) + test_reference_fsync(reps) + # And the tests print(f"Testing with {path}") q = JobQueue(path) diff --git a/projects/jobq/src/python/jobq/__init__.py b/projects/jobq/src/python/jobq/__init__.py index f23fbb4..9da3116 100644 --- a/projects/jobq/src/python/jobq/__init__.py +++ b/projects/jobq/src/python/jobq/__init__.py @@ -258,7 +258,6 @@ class JobQueue(object): with self._db as db: cur = db.cursor() statement = _POLL_SQL.format(compile_query(query)) - print(statement) cur.execute(statement, {"state": json.dumps(new_state)}) results = cur.fetchall() if results: