summaryrefslogtreecommitdiffstatshomepage
path: root/src/hircine/db/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/hircine/db/__init__.py99
1 files changed, 99 insertions, 0 deletions
diff --git a/src/hircine/db/__init__.py b/src/hircine/db/__init__.py
new file mode 100644
index 0000000..493bd91
--- /dev/null
+++ b/src/hircine/db/__init__.py
@@ -0,0 +1,99 @@
+import os
+import sys
+from pathlib import Path
+
+from alembic import command as alembic_command
+from alembic import script as alembic_script
+from alembic.config import Config as AlembicConfig
+from alembic.runtime import migration
+from sqlalchemy import event, text
+from sqlalchemy.engine import Engine
+from sqlalchemy.ext.asyncio import (
+ async_sessionmaker,
+ create_async_engine,
+)
+
+from . import models
+
+alembic_ini = f"{Path(__file__).parent.parent}/migrations/alembic.ini"
+session = async_sessionmaker(expire_on_commit=False, autoflush=False)
+
+
+def ensuredb(dirs): # pragma: no cover
+ if not os.path.exists(dirs.database):
+ sys.exit("No database found.")
+
+
+def sqlite_url(path):
+ return f"sqlite+aiosqlite:///{path}"
+
+
+def model(model):
+ def decorator(cls):
+ cls._model = getattr(models, model)
+ return cls
+
+ return decorator
+
+
+def stamp_alembic(connection):
+ cfg = AlembicConfig(alembic_ini)
+ cfg.attributes["connection"] = connection
+ cfg.attributes["silent"] = True
+
+ alembic_command.stamp(cfg, "head")
+
+
+def check_current_head(connection): # pragma: no cover
+ directory = alembic_script.ScriptDirectory.from_config(AlembicConfig(alembic_ini))
+
+ context = migration.MigrationContext.configure(connection)
+ return set(context.get_current_heads()) == set(directory.get_heads())
+
+
+async def ensure_current_revision(engine): # pragma: no cover
+ async with engine.begin() as conn:
+ if not await conn.run_sync(check_current_head):
+ sys.exit("Database is not up to date, please run 'hircine migrate'.")
+
+
+async def initialize(engine):
+ async with engine.begin() as conn:
+ await conn.run_sync(models.Base.metadata.drop_all)
+ await conn.run_sync(models.Base.metadata.create_all)
+ await conn.run_sync(stamp_alembic)
+
+
+async def backup(engine, path): # pragma: no cover
+ async with engine.connect() as conn:
+ await conn.execute(text("VACUUM INTO :path"), {"path": path})
+
+
+async def vacuum(engine): # pragma: no cover
+ async with engine.connect() as conn:
+ await conn.execute(text("VACUUM"))
+
+
+@event.listens_for(Engine, "connect")
+def set_sqlite_pragma(dbapi_connection, connection_record):
+ cursor = dbapi_connection.cursor()
+ cursor.execute("PRAGMA foreign_keys=ON")
+ cursor.execute("PRAGMA journal_mode=WAL")
+ cursor.close()
+
+
+def create_engine(path, echo=False):
+ return create_async_engine(
+ sqlite_url(path),
+ connect_args={"check_same_thread": False},
+ echo=echo,
+ )
+
+
+def configure(dirs): # pragma: no cover
+ echo = "HIRCINE_DEV" in os.environ
+
+ engine = create_engine(dirs.database, echo=echo)
+ session.configure(bind=engine)
+
+ return engine