Knock out a migrations system for AnoSQL

This commit is contained in:
Reid 'arrdem' McKenzie 2021-08-13 16:45:53 -06:00
parent 1d028bbfbf
commit 9991eb4466
3 changed files with 286 additions and 0 deletions

View file

@ -0,0 +1,6 @@
py_project(
name = "anosql-migrations",
lib_deps = [
py_requirement("anosql"),
],
)

View file

@ -0,0 +1,183 @@
"""Quick and dirty migrations for AnoSQL."""
import logging
from datetime import datetime
from hashlib import sha256
import re
import typing as t
import anosql
from anosql.core import Queries, from_str
log = logging.getLogger(__name__)
class MigrationDescriptor(t.NamedTuple):
name: str
sha256sum: str
committed_at: t.Optional[datetime] = None
def __repr__(self):
return f"MigrationDescriptor(name={self.name!r}, sha256sum='{self.sha256sum[:7]}...')"
def __hash__(self):
return hash((self.name, self.sha256sum))
def __eq__(self, other):
return (self.name, self.sha256sum) == (other.name, other.sha256sum)
_SQL = """
-- name: anosql_migrations_create_table#
-- Create the migrations table for the anosql_migrations plugin.
CREATE TABLE IF NOT EXISTS `anosql_migration` (
`name` TEXT PRIMARY KEY NOT NULL
, `committed_at` INT
, `sha256sum` TEXT NOT NULL
, CONSTRAINT `am_sha256sum_unique` UNIQUE (`sha256sum`)
);
-- name: anosql_migrations_list
-- List committed migrations
SELECT
`name`
, `committed_at`
, `sha256sum`
FROM `anosql_migration`
WHERE
`committed_at` > 0
ORDER BY
`name` ASC
;
-- name: anosql_migrations_get
-- Get a given migration by name
SELECT
`name`
, `committed_at`,
, `sha256sum`
FROM `anosql_migration`
WHERE
`name` = :name
ORDER BY
`rowid` ASC
;
-- name: anosql_migrations_create<!
-- Insert a migration, marking it as committed
INSERT OR REPLACE INTO `anosql_migration` (
`name`
, `committed_at`
, `sha256sum`
) VALUES (
:name
, :date
, :sha256sum
);
"""
def with_migrations(driver_adapter, queries: Queries, conn) -> Queries:
"""Initialize the migrations plugin."""
# Compile SQL as needed from the _SQL constant
_q = from_str(_SQL, driver_adapter)
# Merge. Sigh.
for _qname in _q.available_queries:
queries.add_query(_qname, getattr(_q, _qname))
# Create the migrations table
create_tables(queries, conn)
return queries
def create_tables(queries: Queries, conn) -> None:
"""Create the migrations table (if it doesn't exist)."""
if queries.anosql_migrations_create_table(conn):
log.info("Created migrations table")
# Insert the bootstrap 'fixup' record
execute_migration(queries, conn,
MigrationDescriptor(
name='anosql_migrations_create_table',
sha256sum=sha256(queries.anosql_migrations_create_table.sql.encode("utf-8")).hexdigest()))
def committed_migrations(queries: Queries, conn) -> t.Iterable[MigrationDescriptor]:
"""Enumerate migrations committed to the database."""
for name, committed_at, sha256sum in queries.anosql_migrations_list(conn):
yield MigrationDescriptor(
name=name,
committed_at=datetime.fromtimestamp(committed_at),
sha256sum=sha256sum,
)
def available_migrations(queries: Queries, conn) -> t.Iterable[MigrationDescriptor]:
"""Enumerate all available migrations, executed or no."""
for query_name in sorted(queries.available_queries):
if not re.match("^migration", query_name):
continue
if query_name.endswith("_cursor"):
continue
# type: query_name: str
query_fn = getattr(queries, query_name)
# type: query_fn: t.Callable + {.__name__, .__doc__, .sql}
yield MigrationDescriptor(
name = query_name,
committed_at = None,
sha256sum = sha256(query_fn.sql.encode("utf-8")).hexdigest())
def execute_migration(queries: Queries, conn, migration: MigrationDescriptor):
"""Execute a given migration singularly."""
with conn:
# Mark the migration as in flight
queries.anosql_migrations_create(
conn,
# Args
name=migration.name,
date=-1,
sha256sum=migration.sha256sum,
)
# Run the migration function
getattr(queries, migration.name)(conn)
# Mark the migration as committed
queries.anosql_migrations_create(
conn,
# Args
name=migration.name,
date=int(datetime.utcnow().timestamp()),
sha256sum=migration.sha256sum,
)
def run_migrations(queries, conn):
"""Run all remaining migrations."""
avail = set(available_migrations(queries, conn))
committed = set(committed_migrations(queries, conn))
for migration in sorted(avail, key=lambda m: m.name):
if migration in committed:
log.info(f"Skipping committed migration {migration.name}")
else:
log.info(f"Beginning migration {migration.name}")
try:
execute_migration(queries, conn, migration)
except Exception as e:
log.exception(f"Migration {migration.name} failed!", e)
raise e

View file

@ -0,0 +1,97 @@
"""Tests covering the migrations framework."""
import sqlite3
import anosql
from anosql.core import Queries
import anosql_migrations
import pytest
_SQL = """\
-- name: migration_0000_create_kv
CREATE TABLE kv (`id` INT, `key` TEXT, `value` TEXT);
"""
def table_exists(conn, table_name):
return list(conn.execute(f"""\
SELECT (
`name`
)
FROM `sqlite_master`
WHERE
`type` = 'table'
AND `name` = '{table_name}'
;"""))
@pytest.fixture
def conn() -> sqlite3.Connection:
"""Return an (empty) SQLite instance."""
return sqlite3.connect(":memory:")
def test_connect(conn: sqlite3.Connection):
"""Assert that the connection works and we can execute against it."""
assert list(conn.execute("SELECT 1;")) == [(1,),]
@pytest.fixture
def queries(conn) -> Queries:
"""A fixture for building a (migrations capable) anosql queries object."""
q = anosql.from_str(_SQL, "sqlite3")
return anosql_migrations.with_migrations("sqlite3", q, conn)
def test_queries(queries):
"""Assert that we can construct a queries instance with migrations features."""
assert isinstance(queries, Queries)
assert hasattr(queries, 'anosql_migrations_create_table')
assert hasattr(queries, 'anosql_migrations_list')
assert hasattr(queries, 'anosql_migrations_create')
def test_migrations_create_table(conn, queries):
"""Assert that the migrations system will (automagically) create the table."""
assert table_exists(conn, "anosql_migration"), "Migrations table did not create"
def test_migrations_list(conn, queries):
"""Test that we can list out available migrations."""
ms = list(anosql_migrations.available_migrations(queries, conn))
assert any(m.name == "migration_0000_create_kv" for m in ms), f"Didn't find in {ms!r}"
def test_committed_migrations(conn, queries):
"""Assert that only the bootstrap migration is committed to the empty connection."""
ms = list(anosql_migrations.committed_migrations(queries, conn))
assert len(ms) == 1
assert ms[0].name == "anosql_migrations_create_table"
def test_apply_migrations(conn, queries):
"""Assert that if we apply migrations, the requisite table is created."""
anosql_migrations.run_migrations(queries, conn)
assert table_exists(conn, "kv")
@pytest.fixture
def migrated_conn(conn, queries):
"""Generate a connection whithin which the `kv` migration has already been run."""
anosql_migrations.run_migrations(queries, conn)
return conn
def test_post_committed_migrations(migrated_conn, queries):
"""Assert that the create_kv migration has been committed."""
ms = list(anosql_migrations.committed_migrations(queries, migrated_conn))
assert any(m.name == "migration_0000_create_kv" for m in ms), "\n".join(migrated_conn.iterdump())