summaryrefslogtreecommitdiffstatshomepage
path: root/tests/api/test_archive.py
diff options
context:
space:
mode:
authorWolfgang Müller2024-03-05 18:08:09 +0100
committerWolfgang Müller2024-03-05 19:25:59 +0100
commitd1d654ebac2d51e3841675faeb56480e440f622f (patch)
tree56ef123c1a15a10dfd90836e4038e27efde950c6 /tests/api/test_archive.py
downloadhircine-d1d654ebac2d51e3841675faeb56480e440f622f.tar.gz
Initial commit0.1.0
Diffstat (limited to 'tests/api/test_archive.py')
-rw-r--r--tests/api/test_archive.py388
1 files changed, 388 insertions, 0 deletions
diff --git a/tests/api/test_archive.py b/tests/api/test_archive.py
new file mode 100644
index 0000000..0ef3425
--- /dev/null
+++ b/tests/api/test_archive.py
@@ -0,0 +1,388 @@
+import os
+from datetime import datetime as dt
+from pathlib import Path
+
+import hircine.config
+import hircine.db as database
+import hircine.thumbnailer as thumb
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Archive, Comic, Image, Page
+from sqlalchemy import select
+
+
+@pytest.fixture
+def query_archive(execute_id):
+ query = """
+ query archive($id: Int!) {
+ archive(id: $id) {
+ __typename
+ ... on FullArchive {
+ id
+ name
+ createdAt
+ mtime
+ size
+ path
+ pageCount
+ organized
+ comics {
+ __typename
+ id
+ }
+ cover {
+ __typename
+ id
+ }
+ pages {
+ __typename
+ id
+ image {
+ __typename
+ id
+ }
+ }
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_archives(execute):
+ query = """
+ query archives {
+ archives {
+ __typename
+ count
+ edges {
+ id
+ name
+ size
+ pageCount
+ organized
+ cover {
+ __typename
+ id
+ }
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def update_archives(execute_update):
+ mutation = """
+ mutation updateArchives($ids: [Int!]!, $input: UpdateArchiveInput!) {
+ updateArchives(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on PageRemoteError {
+ id
+ archiveId
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_archives(execute_delete):
+ mutation = """
+ mutation deleteArchives($ids: [Int!]!) {
+ deleteArchives(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+def assert_image_matches(obj, model):
+ assert obj["__typename"] == "Image"
+ assert obj["id"] == model.id
+
+
+def assert_page_matches(obj, model):
+ assert obj["__typename"] == "Page"
+ assert obj["id"] == model.id
+
+
+@pytest.mark.anyio
+async def test_query_archive(query_archive, gen_archive):
+ archive = next(gen_archive)
+ pages = archive.pages
+
+ await DB.add(archive)
+
+ response = Response(await query_archive(archive.id))
+ response.assert_is("FullArchive")
+
+ assert response.id == archive.id
+ assert response.name == archive.name
+ assert dt.fromisoformat(response.createdAt) == archive.created_at
+ assert dt.fromisoformat(response.mtime) == archive.mtime
+ assert response.size == archive.size
+ assert response.path == archive.path
+ assert response.comics == []
+ assert response.pageCount == archive.page_count
+ assert response.organized == archive.organized
+ assert_image_matches(response.cover, pages[0].image)
+
+ assert len(response.pages) == len(pages)
+
+ page_iter = iter(sorted(pages, key=lambda page: page.index))
+ for page in response.pages:
+ matching_page = next(page_iter)
+ assert_page_matches(page, matching_page)
+ assert_image_matches(page["image"], matching_page.image)
+
+
+@pytest.mark.anyio
+async def test_query_archive_sorts_pages(query_archive, gen_jumbled_archive):
+ archive = await DB.add(next(gen_jumbled_archive))
+
+ response = Response(await query_archive(archive.id))
+ response.assert_is("FullArchive")
+
+ page_iter = iter(sorted(archive.pages, key=lambda page: page.index))
+ for page in response.pages:
+ matching_page = next(page_iter)
+ assert_page_matches(page, matching_page)
+ assert_image_matches(page["image"], matching_page.image)
+
+
+@pytest.mark.anyio
+async def test_query_archive_fails_not_found(query_archive):
+ response = Response(await query_archive(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Archive ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_archives(query_archives, gen_archive):
+ archives = await DB.add_all(*gen_archive)
+
+ response = Response(await query_archives())
+ response.assert_is("ArchiveFilterResult")
+
+ assert response.count == len(archives)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(archives)
+
+ edges = iter(response.edges)
+ for archive in sorted(archives, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == archive.id
+ assert edge["name"] == archive.name
+ assert edge["size"] == archive.size
+ assert edge["pageCount"] == archive.page_count
+ assert_image_matches(edge["cover"], archive.cover)
+
+
+@pytest.fixture
+def gen_archive_with_files(tmpdir, monkeypatch, gen_archive):
+ content_dir = os.path.join(tmpdir, "content/")
+ object_dir = os.path.join(tmpdir, "objects/")
+ os.mkdir(content_dir)
+ os.mkdir(object_dir)
+
+ dirs = hircine.config.DirectoryStructure(scan=content_dir, objects=object_dir)
+ monkeypatch.setattr(hircine.config, "dir_structure", dirs)
+
+ archive = next(gen_archive)
+
+ archive_path = Path(os.path.join(content_dir, "archive.zip"))
+ archive_path.touch()
+ archive.path = str(archive_path)
+
+ img_paths = []
+ for page in archive.pages:
+ for suffix in ["full", "thumb"]:
+ img_path = Path(thumb.object_path(object_dir, page.image.hash, suffix))
+ os.makedirs(os.path.dirname(img_path), exist_ok=True)
+ img_path.touch()
+
+ img_paths.append(img_path)
+
+ yield archive, content_dir, object_dir, img_paths
+
+
+@pytest.mark.anyio
+async def test_delete_archive(delete_archives, gen_archive_with_files):
+ archive, content_dir, object_dir, img_paths = gen_archive_with_files
+ archive_path = archive.path
+
+ archive = await DB.add(archive)
+ page_ids = [page.id for page in archive.pages]
+ image_ids = [page.image.id for page in archive.pages]
+
+ response = Response(await delete_archives(archive.id))
+ response.assert_is("DeleteSuccess")
+
+ archive = await DB.get(Archive, archive.id)
+ assert archive is None
+
+ async with database.session() as s:
+ db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all()
+ db_images = (
+ await s.scalars(select(Image).where(Image.id.in_(image_ids)))
+ ).all()
+
+ assert db_pages == []
+ assert db_images == []
+
+ assert os.path.exists(archive_path) is False
+ for img_path in img_paths:
+ assert os.path.exists(img_path) is False
+
+
+@pytest.mark.anyio
+async def test_delete_archive_deletes_images_only_when_necessary(
+ delete_archives, gen_archive_with_files, gen_archive
+):
+ archive, content_dir, object_dir, img_paths = gen_archive_with_files
+ archive_path = archive.path
+
+ archive = await DB.add(archive)
+ page_ids = [page.id for page in archive.pages]
+ image_ids = [page.image.id for page in archive.pages]
+
+ another = next(gen_archive)
+ another.pages = [
+ Page(path="foo", index=1, image_id=id, archive=another) for id in image_ids
+ ]
+ another.cover = archive.cover
+ await DB.add(another)
+
+ response = Response(await delete_archives(archive.id))
+ response.assert_is("DeleteSuccess")
+
+ archive = await DB.get(Archive, archive.id)
+ assert archive is None
+
+ async with database.session() as s:
+ db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all()
+ db_images = (
+ await s.scalars(select(Image.id).where(Image.id.in_(image_ids)))
+ ).all()
+
+ assert db_pages == []
+ assert db_images == image_ids
+
+ assert os.path.exists(archive_path) is False
+ for img_path in img_paths:
+ assert os.path.exists(img_path) is True
+
+
+@pytest.mark.anyio
+async def test_delete_archive_cascades_on_comic(
+ delete_archives, gen_archive_with_files
+):
+ archive, *_ = gen_archive_with_files
+ comic = Comic(
+ id=1,
+ title="Hic Sunt Dracones",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+
+ comic = await DB.add(comic)
+
+ response = Response(await delete_archives(comic.archive.id))
+ response.assert_is("DeleteSuccess")
+
+ archive = await DB.get(Archive, archive.id)
+ assert archive is None
+
+ comic = await DB.get(Comic, comic.id)
+ assert comic is None
+
+
+@pytest.mark.anyio
+async def test_update_archives(update_archives, gen_archive):
+ old_archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await update_archives(
+ old_archive.id,
+ {"cover": {"id": old_archive.pages[1].id}, "organized": True},
+ )
+ )
+ response.assert_is("UpdateSuccess")
+
+ archive = await DB.get(Archive, old_archive.id)
+
+ assert archive.cover_id == old_archive.pages[1].image.id
+ assert archive.organized is True
+
+
+@pytest.mark.anyio
+async def test_update_archive_fails_archive_not_found(update_archives, gen_archive):
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await update_archives(100, {"cover": {"id": archive.pages[1].id}})
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Archive ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_archive_cover_fails_page_not_found(update_archives, gen_archive):
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(await update_archives(archive.id, {"cover": {"id": 100}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Page ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_archive_cover_fails_page_remote(update_archives, gen_archive):
+ archive = await DB.add(next(gen_archive))
+ another = await DB.add(next(gen_archive))
+ remote_id = another.pages[0].id
+
+ response = Response(await update_archives(archive.id, {"cover": {"id": remote_id}}))
+ response.assert_is("PageRemoteError")
+ assert response.id == remote_id
+ assert response.archiveId == another.id
+ assert (
+ response.message
+ == f"Page ID {remote_id} comes from remote archive ID {another.id}"
+ )