diff options
Diffstat (limited to '')
-rw-r--r-- | src/hircine/db/__init__.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/src/hircine/db/__init__.py b/src/hircine/db/__init__.py new file mode 100644 index 0000000..493bd91 --- /dev/null +++ b/src/hircine/db/__init__.py @@ -0,0 +1,99 @@ +import os +import sys +from pathlib import Path + +from alembic import command as alembic_command +from alembic import script as alembic_script +from alembic.config import Config as AlembicConfig +from alembic.runtime import migration +from sqlalchemy import event, text +from sqlalchemy.engine import Engine +from sqlalchemy.ext.asyncio import ( + async_sessionmaker, + create_async_engine, +) + +from . import models + +alembic_ini = f"{Path(__file__).parent.parent}/migrations/alembic.ini" +session = async_sessionmaker(expire_on_commit=False, autoflush=False) + + +def ensuredb(dirs): # pragma: no cover + if not os.path.exists(dirs.database): + sys.exit("No database found.") + + +def sqlite_url(path): + return f"sqlite+aiosqlite:///{path}" + + +def model(model): + def decorator(cls): + cls._model = getattr(models, model) + return cls + + return decorator + + +def stamp_alembic(connection): + cfg = AlembicConfig(alembic_ini) + cfg.attributes["connection"] = connection + cfg.attributes["silent"] = True + + alembic_command.stamp(cfg, "head") + + +def check_current_head(connection): # pragma: no cover + directory = alembic_script.ScriptDirectory.from_config(AlembicConfig(alembic_ini)) + + context = migration.MigrationContext.configure(connection) + return set(context.get_current_heads()) == set(directory.get_heads()) + + +async def ensure_current_revision(engine): # pragma: no cover + async with engine.begin() as conn: + if not await conn.run_sync(check_current_head): + sys.exit("Database is not up to date, please run 'hircine migrate'.") + + +async def initialize(engine): + async with engine.begin() as conn: + await conn.run_sync(models.Base.metadata.drop_all) + await conn.run_sync(models.Base.metadata.create_all) + await conn.run_sync(stamp_alembic) + + +async def backup(engine, path): # pragma: no cover + async with engine.connect() as conn: + await conn.execute(text("VACUUM INTO :path"), {"path": path}) + + +async def vacuum(engine): # pragma: no cover + async with engine.connect() as conn: + await conn.execute(text("VACUUM")) + + +@event.listens_for(Engine, "connect") +def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.execute("PRAGMA journal_mode=WAL") + cursor.close() + + +def create_engine(path, echo=False): + return create_async_engine( + sqlite_url(path), + connect_args={"check_same_thread": False}, + echo=echo, + ) + + +def configure(dirs): # pragma: no cover + echo = "HIRCINE_DEV" in os.environ + + engine = create_engine(dirs.database, echo=echo) + session.configure(bind=engine) + + return engine |