diff options
Diffstat (limited to '')
-rw-r--r-- | tests/api/test_archive.py | 388 |
1 files changed, 388 insertions, 0 deletions
diff --git a/tests/api/test_archive.py b/tests/api/test_archive.py new file mode 100644 index 0000000..0ef3425 --- /dev/null +++ b/tests/api/test_archive.py @@ -0,0 +1,388 @@ +import os +from datetime import datetime as dt +from pathlib import Path + +import hircine.config +import hircine.db as database +import hircine.thumbnailer as thumb +import pytest +from conftest import DB, Response +from hircine.db.models import Archive, Comic, Image, Page +from sqlalchemy import select + + +@pytest.fixture +def query_archive(execute_id): + query = """ + query archive($id: Int!) { + archive(id: $id) { + __typename + ... on FullArchive { + id + name + createdAt + mtime + size + path + pageCount + organized + comics { + __typename + id + } + cover { + __typename + id + } + pages { + __typename + id + image { + __typename + id + } + } + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_archives(execute): + query = """ + query archives { + archives { + __typename + count + edges { + id + name + size + pageCount + organized + cover { + __typename + id + } + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def update_archives(execute_update): + mutation = """ + mutation updateArchives($ids: [Int!]!, $input: UpdateArchiveInput!) { + updateArchives(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on PageRemoteError { + id + archiveId + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_update(mutation) + + +@pytest.fixture +def delete_archives(execute_delete): + mutation = """ + mutation deleteArchives($ids: [Int!]!) { + deleteArchives(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +def assert_image_matches(obj, model): + assert obj["__typename"] == "Image" + assert obj["id"] == model.id + + +def assert_page_matches(obj, model): + assert obj["__typename"] == "Page" + assert obj["id"] == model.id + + +@pytest.mark.anyio +async def test_query_archive(query_archive, gen_archive): + archive = next(gen_archive) + pages = archive.pages + + await DB.add(archive) + + response = Response(await query_archive(archive.id)) + response.assert_is("FullArchive") + + assert response.id == archive.id + assert response.name == archive.name + assert dt.fromisoformat(response.createdAt) == archive.created_at + assert dt.fromisoformat(response.mtime) == archive.mtime + assert response.size == archive.size + assert response.path == archive.path + assert response.comics == [] + assert response.pageCount == archive.page_count + assert response.organized == archive.organized + assert_image_matches(response.cover, pages[0].image) + + assert len(response.pages) == len(pages) + + page_iter = iter(sorted(pages, key=lambda page: page.index)) + for page in response.pages: + matching_page = next(page_iter) + assert_page_matches(page, matching_page) + assert_image_matches(page["image"], matching_page.image) + + +@pytest.mark.anyio +async def test_query_archive_sorts_pages(query_archive, gen_jumbled_archive): + archive = await DB.add(next(gen_jumbled_archive)) + + response = Response(await query_archive(archive.id)) + response.assert_is("FullArchive") + + page_iter = iter(sorted(archive.pages, key=lambda page: page.index)) + for page in response.pages: + matching_page = next(page_iter) + assert_page_matches(page, matching_page) + assert_image_matches(page["image"], matching_page.image) + + +@pytest.mark.anyio +async def test_query_archive_fails_not_found(query_archive): + response = Response(await query_archive(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Archive ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_archives(query_archives, gen_archive): + archives = await DB.add_all(*gen_archive) + + response = Response(await query_archives()) + response.assert_is("ArchiveFilterResult") + + assert response.count == len(archives) + assert isinstance((response.edges), list) + assert len(response.edges) == len(archives) + + edges = iter(response.edges) + for archive in sorted(archives, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == archive.id + assert edge["name"] == archive.name + assert edge["size"] == archive.size + assert edge["pageCount"] == archive.page_count + assert_image_matches(edge["cover"], archive.cover) + + +@pytest.fixture +def gen_archive_with_files(tmpdir, monkeypatch, gen_archive): + content_dir = os.path.join(tmpdir, "content/") + object_dir = os.path.join(tmpdir, "objects/") + os.mkdir(content_dir) + os.mkdir(object_dir) + + dirs = hircine.config.DirectoryStructure(scan=content_dir, objects=object_dir) + monkeypatch.setattr(hircine.config, "dir_structure", dirs) + + archive = next(gen_archive) + + archive_path = Path(os.path.join(content_dir, "archive.zip")) + archive_path.touch() + archive.path = str(archive_path) + + img_paths = [] + for page in archive.pages: + for suffix in ["full", "thumb"]: + img_path = Path(thumb.object_path(object_dir, page.image.hash, suffix)) + os.makedirs(os.path.dirname(img_path), exist_ok=True) + img_path.touch() + + img_paths.append(img_path) + + yield archive, content_dir, object_dir, img_paths + + +@pytest.mark.anyio +async def test_delete_archive(delete_archives, gen_archive_with_files): + archive, content_dir, object_dir, img_paths = gen_archive_with_files + archive_path = archive.path + + archive = await DB.add(archive) + page_ids = [page.id for page in archive.pages] + image_ids = [page.image.id for page in archive.pages] + + response = Response(await delete_archives(archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + async with database.session() as s: + db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all() + db_images = ( + await s.scalars(select(Image).where(Image.id.in_(image_ids))) + ).all() + + assert db_pages == [] + assert db_images == [] + + assert os.path.exists(archive_path) is False + for img_path in img_paths: + assert os.path.exists(img_path) is False + + +@pytest.mark.anyio +async def test_delete_archive_deletes_images_only_when_necessary( + delete_archives, gen_archive_with_files, gen_archive +): + archive, content_dir, object_dir, img_paths = gen_archive_with_files + archive_path = archive.path + + archive = await DB.add(archive) + page_ids = [page.id for page in archive.pages] + image_ids = [page.image.id for page in archive.pages] + + another = next(gen_archive) + another.pages = [ + Page(path="foo", index=1, image_id=id, archive=another) for id in image_ids + ] + another.cover = archive.cover + await DB.add(another) + + response = Response(await delete_archives(archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + async with database.session() as s: + db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all() + db_images = ( + await s.scalars(select(Image.id).where(Image.id.in_(image_ids))) + ).all() + + assert db_pages == [] + assert db_images == image_ids + + assert os.path.exists(archive_path) is False + for img_path in img_paths: + assert os.path.exists(img_path) is True + + +@pytest.mark.anyio +async def test_delete_archive_cascades_on_comic( + delete_archives, gen_archive_with_files +): + archive, *_ = gen_archive_with_files + comic = Comic( + id=1, + title="Hic Sunt Dracones", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + + comic = await DB.add(comic) + + response = Response(await delete_archives(comic.archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + comic = await DB.get(Comic, comic.id) + assert comic is None + + +@pytest.mark.anyio +async def test_update_archives(update_archives, gen_archive): + old_archive = await DB.add(next(gen_archive)) + + response = Response( + await update_archives( + old_archive.id, + {"cover": {"id": old_archive.pages[1].id}, "organized": True}, + ) + ) + response.assert_is("UpdateSuccess") + + archive = await DB.get(Archive, old_archive.id) + + assert archive.cover_id == old_archive.pages[1].image.id + assert archive.organized is True + + +@pytest.mark.anyio +async def test_update_archive_fails_archive_not_found(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + + response = Response( + await update_archives(100, {"cover": {"id": archive.pages[1].id}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Archive ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_archive_cover_fails_page_not_found(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + + response = Response(await update_archives(archive.id, {"cover": {"id": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_archive_cover_fails_page_remote(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + another = await DB.add(next(gen_archive)) + remote_id = another.pages[0].id + + response = Response(await update_archives(archive.id, {"cover": {"id": remote_id}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == another.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {another.id}" + ) |