summaryrefslogblamecommitdiffstatshomepage
path: root/tests/api/test_archive.py
blob: 0ef3425880523209c84f3bdf48382ab50cc6a990 (plain) (tree)



































































































































































































































































































































































































                                                                                        
import os
from datetime import datetime as dt
from pathlib import Path

import hircine.config
import hircine.db as database
import hircine.thumbnailer as thumb
import pytest
from conftest import DB, Response
from hircine.db.models import Archive, Comic, Image, Page
from sqlalchemy import select


@pytest.fixture
def query_archive(execute_id):
    query = """
    query archive($id: Int!) {
        archive(id: $id) {
            __typename
            ... on FullArchive {
                id
                name
                createdAt
                mtime
                size
                path
                pageCount
                organized
                comics {
                    __typename
                    id
                }
                cover {
                    __typename
                    id
                }
                pages {
                    __typename
                    id
                    image {
                        __typename
                        id
                    }
                }
            }
            ... on Error {
                message
            }
            ... on IDNotFoundError {
                id
            }
        }
    }
    """

    return execute_id(query)


@pytest.fixture
def query_archives(execute):
    query = """
    query archives {
        archives {
            __typename
            count
            edges {
                id
                name
                size
                pageCount
                organized
                cover {
                    __typename
                    id
                }
            }
        }
    }
    """

    return execute(query)


@pytest.fixture
def update_archives(execute_update):
    mutation = """
    mutation updateArchives($ids: [Int!]!, $input: UpdateArchiveInput!) {
        updateArchives(ids: $ids, input: $input) {
            __typename
            ... on Success {
                message
            }
            ... on Error {
                message
            }
            ... on PageRemoteError {
                id
                archiveId
            }
            ... on IDNotFoundError {
                id
            }
        }
    }
    """

    return execute_update(mutation)


@pytest.fixture
def delete_archives(execute_delete):
    mutation = """
    mutation deleteArchives($ids: [Int!]!) {
        deleteArchives(ids: $ids) {
            __typename
            ... on Success {
                message
            }
            ... on Error {
                message
            }
            ... on IDNotFoundError {
                id
            }
        }
    }
    """

    return execute_delete(mutation)


def assert_image_matches(obj, model):
    assert obj["__typename"] == "Image"
    assert obj["id"] == model.id


def assert_page_matches(obj, model):
    assert obj["__typename"] == "Page"
    assert obj["id"] == model.id


@pytest.mark.anyio
async def test_query_archive(query_archive, gen_archive):
    archive = next(gen_archive)
    pages = archive.pages

    await DB.add(archive)

    response = Response(await query_archive(archive.id))
    response.assert_is("FullArchive")

    assert response.id == archive.id
    assert response.name == archive.name
    assert dt.fromisoformat(response.createdAt) == archive.created_at
    assert dt.fromisoformat(response.mtime) == archive.mtime
    assert response.size == archive.size
    assert response.path == archive.path
    assert response.comics == []
    assert response.pageCount == archive.page_count
    assert response.organized == archive.organized
    assert_image_matches(response.cover, pages[0].image)

    assert len(response.pages) == len(pages)

    page_iter = iter(sorted(pages, key=lambda page: page.index))
    for page in response.pages:
        matching_page = next(page_iter)
        assert_page_matches(page, matching_page)
        assert_image_matches(page["image"], matching_page.image)


@pytest.mark.anyio
async def test_query_archive_sorts_pages(query_archive, gen_jumbled_archive):
    archive = await DB.add(next(gen_jumbled_archive))

    response = Response(await query_archive(archive.id))
    response.assert_is("FullArchive")

    page_iter = iter(sorted(archive.pages, key=lambda page: page.index))
    for page in response.pages:
        matching_page = next(page_iter)
        assert_page_matches(page, matching_page)
        assert_image_matches(page["image"], matching_page.image)


@pytest.mark.anyio
async def test_query_archive_fails_not_found(query_archive):
    response = Response(await query_archive(1))
    response.assert_is("IDNotFoundError")
    assert response.id == 1
    assert response.message == "Archive ID not found: '1'"


@pytest.mark.anyio
async def test_query_archives(query_archives, gen_archive):
    archives = await DB.add_all(*gen_archive)

    response = Response(await query_archives())
    response.assert_is("ArchiveFilterResult")

    assert response.count == len(archives)
    assert isinstance((response.edges), list)
    assert len(response.edges) == len(archives)

    edges = iter(response.edges)
    for archive in sorted(archives, key=lambda a: a.name):
        edge = next(edges)
        assert edge["id"] == archive.id
        assert edge["name"] == archive.name
        assert edge["size"] == archive.size
        assert edge["pageCount"] == archive.page_count
        assert_image_matches(edge["cover"], archive.cover)


@pytest.fixture
def gen_archive_with_files(tmpdir, monkeypatch, gen_archive):
    content_dir = os.path.join(tmpdir, "content/")
    object_dir = os.path.join(tmpdir, "objects/")
    os.mkdir(content_dir)
    os.mkdir(object_dir)

    dirs = hircine.config.DirectoryStructure(scan=content_dir, objects=object_dir)
    monkeypatch.setattr(hircine.config, "dir_structure", dirs)

    archive = next(gen_archive)

    archive_path = Path(os.path.join(content_dir, "archive.zip"))
    archive_path.touch()
    archive.path = str(archive_path)

    img_paths = []
    for page in archive.pages:
        for suffix in ["full", "thumb"]:
            img_path = Path(thumb.object_path(object_dir, page.image.hash, suffix))
            os.makedirs(os.path.dirname(img_path), exist_ok=True)
            img_path.touch()

            img_paths.append(img_path)

    yield archive, content_dir, object_dir, img_paths


@pytest.mark.anyio
async def test_delete_archive(delete_archives, gen_archive_with_files):
    archive, content_dir, object_dir, img_paths = gen_archive_with_files
    archive_path = archive.path

    archive = await DB.add(archive)
    page_ids = [page.id for page in archive.pages]
    image_ids = [page.image.id for page in archive.pages]

    response = Response(await delete_archives(archive.id))
    response.assert_is("DeleteSuccess")

    archive = await DB.get(Archive, archive.id)
    assert archive is None

    async with database.session() as s:
        db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all()
        db_images = (
            await s.scalars(select(Image).where(Image.id.in_(image_ids)))
        ).all()

    assert db_pages == []
    assert db_images == []

    assert os.path.exists(archive_path) is False
    for img_path in img_paths:
        assert os.path.exists(img_path) is False


@pytest.mark.anyio
async def test_delete_archive_deletes_images_only_when_necessary(
    delete_archives, gen_archive_with_files, gen_archive
):
    archive, content_dir, object_dir, img_paths = gen_archive_with_files
    archive_path = archive.path

    archive = await DB.add(archive)
    page_ids = [page.id for page in archive.pages]
    image_ids = [page.image.id for page in archive.pages]

    another = next(gen_archive)
    another.pages = [
        Page(path="foo", index=1, image_id=id, archive=another) for id in image_ids
    ]
    another.cover = archive.cover
    await DB.add(another)

    response = Response(await delete_archives(archive.id))
    response.assert_is("DeleteSuccess")

    archive = await DB.get(Archive, archive.id)
    assert archive is None

    async with database.session() as s:
        db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all()
        db_images = (
            await s.scalars(select(Image.id).where(Image.id.in_(image_ids)))
        ).all()

    assert db_pages == []
    assert db_images == image_ids

    assert os.path.exists(archive_path) is False
    for img_path in img_paths:
        assert os.path.exists(img_path) is True


@pytest.mark.anyio
async def test_delete_archive_cascades_on_comic(
    delete_archives, gen_archive_with_files
):
    archive, *_ = gen_archive_with_files
    comic = Comic(
        id=1,
        title="Hic Sunt Dracones",
        archive=archive,
        cover=archive.cover,
        pages=archive.pages,
    )

    comic = await DB.add(comic)

    response = Response(await delete_archives(comic.archive.id))
    response.assert_is("DeleteSuccess")

    archive = await DB.get(Archive, archive.id)
    assert archive is None

    comic = await DB.get(Comic, comic.id)
    assert comic is None


@pytest.mark.anyio
async def test_update_archives(update_archives, gen_archive):
    old_archive = await DB.add(next(gen_archive))

    response = Response(
        await update_archives(
            old_archive.id,
            {"cover": {"id": old_archive.pages[1].id}, "organized": True},
        )
    )
    response.assert_is("UpdateSuccess")

    archive = await DB.get(Archive, old_archive.id)

    assert archive.cover_id == old_archive.pages[1].image.id
    assert archive.organized is True


@pytest.mark.anyio
async def test_update_archive_fails_archive_not_found(update_archives, gen_archive):
    archive = await DB.add(next(gen_archive))

    response = Response(
        await update_archives(100, {"cover": {"id": archive.pages[1].id}})
    )
    response.assert_is("IDNotFoundError")
    assert response.id == 100
    assert response.message == "Archive ID not found: '100'"


@pytest.mark.anyio
async def test_update_archive_cover_fails_page_not_found(update_archives, gen_archive):
    archive = await DB.add(next(gen_archive))

    response = Response(await update_archives(archive.id, {"cover": {"id": 100}}))
    response.assert_is("IDNotFoundError")
    assert response.id == 100
    assert response.message == "Page ID not found: '100'"


@pytest.mark.anyio
async def test_update_archive_cover_fails_page_remote(update_archives, gen_archive):
    archive = await DB.add(next(gen_archive))
    another = await DB.add(next(gen_archive))
    remote_id = another.pages[0].id

    response = Response(await update_archives(archive.id, {"cover": {"id": remote_id}}))
    response.assert_is("PageRemoteError")
    assert response.id == remote_id
    assert response.archiveId == another.id
    assert (
        response.message
        == f"Page ID {remote_id} comes from remote archive ID {another.id}"
    )