diff options
Diffstat (limited to '')
28 files changed, 6576 insertions, 0 deletions
diff --git a/tests/api/test_archive.py b/tests/api/test_archive.py new file mode 100644 index 0000000..0ef3425 --- /dev/null +++ b/tests/api/test_archive.py @@ -0,0 +1,388 @@ +import os +from datetime import datetime as dt +from pathlib import Path + +import hircine.config +import hircine.db as database +import hircine.thumbnailer as thumb +import pytest +from conftest import DB, Response +from hircine.db.models import Archive, Comic, Image, Page +from sqlalchemy import select + + +@pytest.fixture +def query_archive(execute_id): + query = """ + query archive($id: Int!) { + archive(id: $id) { + __typename + ... on FullArchive { + id + name + createdAt + mtime + size + path + pageCount + organized + comics { + __typename + id + } + cover { + __typename + id + } + pages { + __typename + id + image { + __typename + id + } + } + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_archives(execute): + query = """ + query archives { + archives { + __typename + count + edges { + id + name + size + pageCount + organized + cover { + __typename + id + } + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def update_archives(execute_update): + mutation = """ + mutation updateArchives($ids: [Int!]!, $input: UpdateArchiveInput!) { + updateArchives(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on PageRemoteError { + id + archiveId + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_update(mutation) + + +@pytest.fixture +def delete_archives(execute_delete): + mutation = """ + mutation deleteArchives($ids: [Int!]!) { + deleteArchives(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +def assert_image_matches(obj, model): + assert obj["__typename"] == "Image" + assert obj["id"] == model.id + + +def assert_page_matches(obj, model): + assert obj["__typename"] == "Page" + assert obj["id"] == model.id + + +@pytest.mark.anyio +async def test_query_archive(query_archive, gen_archive): + archive = next(gen_archive) + pages = archive.pages + + await DB.add(archive) + + response = Response(await query_archive(archive.id)) + response.assert_is("FullArchive") + + assert response.id == archive.id + assert response.name == archive.name + assert dt.fromisoformat(response.createdAt) == archive.created_at + assert dt.fromisoformat(response.mtime) == archive.mtime + assert response.size == archive.size + assert response.path == archive.path + assert response.comics == [] + assert response.pageCount == archive.page_count + assert response.organized == archive.organized + assert_image_matches(response.cover, pages[0].image) + + assert len(response.pages) == len(pages) + + page_iter = iter(sorted(pages, key=lambda page: page.index)) + for page in response.pages: + matching_page = next(page_iter) + assert_page_matches(page, matching_page) + assert_image_matches(page["image"], matching_page.image) + + +@pytest.mark.anyio +async def test_query_archive_sorts_pages(query_archive, gen_jumbled_archive): + archive = await DB.add(next(gen_jumbled_archive)) + + response = Response(await query_archive(archive.id)) + response.assert_is("FullArchive") + + page_iter = iter(sorted(archive.pages, key=lambda page: page.index)) + for page in response.pages: + matching_page = next(page_iter) + assert_page_matches(page, matching_page) + assert_image_matches(page["image"], matching_page.image) + + +@pytest.mark.anyio +async def test_query_archive_fails_not_found(query_archive): + response = Response(await query_archive(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Archive ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_archives(query_archives, gen_archive): + archives = await DB.add_all(*gen_archive) + + response = Response(await query_archives()) + response.assert_is("ArchiveFilterResult") + + assert response.count == len(archives) + assert isinstance((response.edges), list) + assert len(response.edges) == len(archives) + + edges = iter(response.edges) + for archive in sorted(archives, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == archive.id + assert edge["name"] == archive.name + assert edge["size"] == archive.size + assert edge["pageCount"] == archive.page_count + assert_image_matches(edge["cover"], archive.cover) + + +@pytest.fixture +def gen_archive_with_files(tmpdir, monkeypatch, gen_archive): + content_dir = os.path.join(tmpdir, "content/") + object_dir = os.path.join(tmpdir, "objects/") + os.mkdir(content_dir) + os.mkdir(object_dir) + + dirs = hircine.config.DirectoryStructure(scan=content_dir, objects=object_dir) + monkeypatch.setattr(hircine.config, "dir_structure", dirs) + + archive = next(gen_archive) + + archive_path = Path(os.path.join(content_dir, "archive.zip")) + archive_path.touch() + archive.path = str(archive_path) + + img_paths = [] + for page in archive.pages: + for suffix in ["full", "thumb"]: + img_path = Path(thumb.object_path(object_dir, page.image.hash, suffix)) + os.makedirs(os.path.dirname(img_path), exist_ok=True) + img_path.touch() + + img_paths.append(img_path) + + yield archive, content_dir, object_dir, img_paths + + +@pytest.mark.anyio +async def test_delete_archive(delete_archives, gen_archive_with_files): + archive, content_dir, object_dir, img_paths = gen_archive_with_files + archive_path = archive.path + + archive = await DB.add(archive) + page_ids = [page.id for page in archive.pages] + image_ids = [page.image.id for page in archive.pages] + + response = Response(await delete_archives(archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + async with database.session() as s: + db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all() + db_images = ( + await s.scalars(select(Image).where(Image.id.in_(image_ids))) + ).all() + + assert db_pages == [] + assert db_images == [] + + assert os.path.exists(archive_path) is False + for img_path in img_paths: + assert os.path.exists(img_path) is False + + +@pytest.mark.anyio +async def test_delete_archive_deletes_images_only_when_necessary( + delete_archives, gen_archive_with_files, gen_archive +): + archive, content_dir, object_dir, img_paths = gen_archive_with_files + archive_path = archive.path + + archive = await DB.add(archive) + page_ids = [page.id for page in archive.pages] + image_ids = [page.image.id for page in archive.pages] + + another = next(gen_archive) + another.pages = [ + Page(path="foo", index=1, image_id=id, archive=another) for id in image_ids + ] + another.cover = archive.cover + await DB.add(another) + + response = Response(await delete_archives(archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + async with database.session() as s: + db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all() + db_images = ( + await s.scalars(select(Image.id).where(Image.id.in_(image_ids))) + ).all() + + assert db_pages == [] + assert db_images == image_ids + + assert os.path.exists(archive_path) is False + for img_path in img_paths: + assert os.path.exists(img_path) is True + + +@pytest.mark.anyio +async def test_delete_archive_cascades_on_comic( + delete_archives, gen_archive_with_files +): + archive, *_ = gen_archive_with_files + comic = Comic( + id=1, + title="Hic Sunt Dracones", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + + comic = await DB.add(comic) + + response = Response(await delete_archives(comic.archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + comic = await DB.get(Comic, comic.id) + assert comic is None + + +@pytest.mark.anyio +async def test_update_archives(update_archives, gen_archive): + old_archive = await DB.add(next(gen_archive)) + + response = Response( + await update_archives( + old_archive.id, + {"cover": {"id": old_archive.pages[1].id}, "organized": True}, + ) + ) + response.assert_is("UpdateSuccess") + + archive = await DB.get(Archive, old_archive.id) + + assert archive.cover_id == old_archive.pages[1].image.id + assert archive.organized is True + + +@pytest.mark.anyio +async def test_update_archive_fails_archive_not_found(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + + response = Response( + await update_archives(100, {"cover": {"id": archive.pages[1].id}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Archive ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_archive_cover_fails_page_not_found(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + + response = Response(await update_archives(archive.id, {"cover": {"id": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_archive_cover_fails_page_remote(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + another = await DB.add(next(gen_archive)) + remote_id = another.pages[0].id + + response = Response(await update_archives(archive.id, {"cover": {"id": remote_id}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == another.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {another.id}" + ) diff --git a/tests/api/test_artist.py b/tests/api/test_artist.py new file mode 100644 index 0000000..8cb2f1a --- /dev/null +++ b/tests/api/test_artist.py @@ -0,0 +1,278 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Artist + + +@pytest.fixture +def query_artist(execute_id): + query = """ + query artist($id: Int!) { + artist(id: $id) { + __typename + ... on Artist { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_artists(execute): + query = """ + query artists { + artists { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_artist(execute_add): + mutation = """ + mutation addArtist($input: AddArtistInput!) { + addArtist(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_artists(execute_update): + mutation = """ + mutation updateArtists($ids: [Int!]!, $input: UpdateArtistInput!) { + updateArtists(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_artists(execute_delete): + mutation = """ + mutation deleteArtists($ids: [Int!]!) { + deleteArtists(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_artist(query_artist, gen_artist): + artist = await DB.add(next(gen_artist)) + + response = Response(await query_artist(artist.id)) + response.assert_is("Artist") + + assert response.id == artist.id + assert response.name == artist.name + + +@pytest.mark.anyio +async def test_query_artist_fails_not_found(query_artist): + response = Response(await query_artist(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Artist ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_artists(query_artists, gen_artist): + artists = await DB.add_all(*gen_artist) + + response = Response(await query_artists()) + response.assert_is("ArtistFilterResult") + + assert response.count == len(artists) + assert isinstance((response.edges), list) + assert len(response.edges) == len(artists) + + edges = iter(response.edges) + for artist in sorted(artists, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == artist.id + assert edge["name"] == artist.name + + +@pytest.mark.anyio +async def test_add_artist(add_artist): + response = Response(await add_artist({"name": "added artist"})) + response.assert_is("AddSuccess") + + artist = await DB.get(Artist, response.id) + assert artist is not None + assert artist.name == "added artist" + + +@pytest.mark.anyio +async def test_add_artist_fails_empty_parameter(add_artist): + response = Response(await add_artist({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_artist_fails_exists(add_artist, gen_artist): + artist = await DB.add(next(gen_artist)) + + response = Response(await add_artist({"name": artist.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Artist with this name exists" + + +@pytest.mark.anyio +async def test_delete_artist(delete_artists, gen_artist): + artist = await DB.add(next(gen_artist)) + id = artist.id + + response = Response(await delete_artists(id)) + response.assert_is("DeleteSuccess") + + artist = await DB.get(Artist, id) + assert artist is None + + +@pytest.mark.anyio +async def test_delete_artist_not_found(delete_artists): + response = Response(await delete_artists(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Artist ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_artist(update_artists, gen_artist): + artist = await DB.add(next(gen_artist)) + + input = {"name": "updated artist"} + response = Response(await update_artists(artist.id, input)) + response.assert_is("UpdateSuccess") + + artist = await DB.get(Artist, artist.id) + assert artist is not None + assert artist.name == "updated artist" + + +@pytest.mark.anyio +async def test_update_artist_fails_exists(update_artists, gen_artist): + first = await DB.add(next(gen_artist)) + second = await DB.add(next(gen_artist)) + + response = Response(await update_artists(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Artist with this name exists" + + +@pytest.mark.anyio +async def test_update_artist_fails_not_found(update_artists): + response = Response(await update_artists(1, {"name": "updated artist"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Artist ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_artists_cannot_bulk_edit_name(update_artists, gen_artist): + first = await DB.add(next(gen_artist)) + second = await DB.add(next(gen_artist)) + + response = Response(await update_artists([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_artist_fails_empty_parameter(update_artists, gen_artist, empty): + artist = await DB.add(next(gen_artist)) + + response = Response(await update_artists(artist.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_artist_changes_updated_at(update_artists): + original_artist = Artist(name="artist") + original_artist.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_artist = await DB.add(original_artist) + + response = Response(await update_artists(original_artist.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + artist = await DB.get(Artist, original_artist.id) + assert artist.updated_at > original_artist.updated_at diff --git a/tests/api/test_character.py b/tests/api/test_character.py new file mode 100644 index 0000000..567d2a4 --- /dev/null +++ b/tests/api/test_character.py @@ -0,0 +1,285 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Character + + +@pytest.fixture +def query_character(execute_id): + query = """ + query character($id: Int!) { + character(id: $id) { + __typename + ... on Character { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_characters(execute): + query = """ + query characters { + characters { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_character(execute_add): + mutation = """ + mutation addCharacter($input: AddCharacterInput!) { + addCharacter(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_characters(execute_update): + mutation = """ + mutation updateCharacters($ids: [Int!]!, $input: UpdateCharacterInput!) { + updateCharacters(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_characters(execute_delete): + mutation = """ + mutation deleteCharacters($ids: [Int!]!) { + deleteCharacters(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_character(query_character, gen_character): + character = await DB.add(next(gen_character)) + + response = Response(await query_character(character.id)) + response.assert_is("Character") + + assert response.id == character.id + assert response.name == character.name + + +@pytest.mark.anyio +async def test_query_character_fails_not_found(query_character): + response = Response(await query_character(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Character ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_characters(query_characters, gen_character): + characters = await DB.add_all(*gen_character) + + response = Response(await query_characters()) + response.assert_is("CharacterFilterResult") + + assert response.count == len(characters) + assert isinstance((response.edges), list) + assert len(response.edges) == len(characters) + + edges = iter(response.edges) + for character in sorted(characters, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == character.id + assert edge["name"] == character.name + + +@pytest.mark.anyio +async def test_add_character(add_character): + response = Response(await add_character({"name": "added character"})) + response.assert_is("AddSuccess") + + character = await DB.get(Character, response.id) + assert character is not None + assert character.name == "added character" + + +@pytest.mark.anyio +async def test_add_character_fails_empty_parameter(add_character): + response = Response(await add_character({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_character_fails_exists(add_character, gen_character): + character = await DB.add(next(gen_character)) + + response = Response(await add_character({"name": character.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Character with this name exists" + + +@pytest.mark.anyio +async def test_delete_character(delete_characters, gen_character): + character = await DB.add(next(gen_character)) + id = character.id + + response = Response(await delete_characters(id)) + response.assert_is("DeleteSuccess") + + character = await DB.get(Character, id) + assert character is None + + +@pytest.mark.anyio +async def test_delete_character_not_found(delete_characters): + response = Response(await delete_characters(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Character ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_character(update_characters, gen_character): + character = await DB.add(next(gen_character)) + + input = {"name": "updated character"} + response = Response(await update_characters(character.id, input)) + response.assert_is("UpdateSuccess") + + character = await DB.get(Character, character.id) + assert character is not None + assert character.name == "updated character" + + +@pytest.mark.anyio +async def test_update_character_fails_exists(update_characters, gen_character): + first = await DB.add(next(gen_character)) + second = await DB.add(next(gen_character)) + + response = Response(await update_characters(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Character with this name exists" + + +@pytest.mark.anyio +async def test_update_character_fails_not_found(update_characters): + response = Response(await update_characters(1, {"name": "updated_character"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Character ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_characters_cannot_bulk_edit_name( + update_characters, gen_character +): + first = await DB.add(next(gen_character)) + second = await DB.add(next(gen_character)) + + response = Response( + await update_characters([first.id, second.id], {"name": "unique"}) + ) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_character_fails_empty_parameter( + update_characters, gen_character, empty +): + character = await DB.add(next(gen_character)) + response = Response(await update_characters(character.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_character_changes_updated_at(update_characters): + original_character = Character(name="character") + original_character.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_character = await DB.add(original_character) + + response = Response( + await update_characters(original_character.id, {"name": "updated"}) + ) + response.assert_is("UpdateSuccess") + + character = await DB.get(Character, original_character.id) + assert character.updated_at > original_character.updated_at diff --git a/tests/api/test_circle.py b/tests/api/test_circle.py new file mode 100644 index 0000000..a03ba89 --- /dev/null +++ b/tests/api/test_circle.py @@ -0,0 +1,278 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Circle + + +@pytest.fixture +def query_circle(execute_id): + query = """ + query circle($id: Int!) { + circle(id: $id) { + __typename + ... on Circle { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_circles(execute): + query = """ + query circles { + circles { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_circle(execute_add): + mutation = """ + mutation addCircle($input: AddCircleInput!) { + addCircle(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_circles(execute_update): + mutation = """ + mutation updateCircles($ids: [Int!]!, $input: UpdateCircleInput!) { + updateCircles(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on IDNotFoundError { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_circles(execute_delete): + mutation = """ + mutation deleteCircles($ids: [Int!]!) { + deleteCircles(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_circle(query_circle, gen_circle): + circle = await DB.add(next(gen_circle)) + + response = Response(await query_circle(circle.id)) + response.assert_is("Circle") + + assert response.id == circle.id + assert response.name == circle.name + + +@pytest.mark.anyio +async def test_query_circle_fails_not_found(query_circle): + response = Response(await query_circle(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Circle ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_circles(query_circles, gen_circle): + circles = await DB.add_all(*gen_circle) + + response = Response(await query_circles()) + response.assert_is("CircleFilterResult") + + assert response.count == len(circles) + assert isinstance((response.edges), list) + assert len(response.edges) == len(circles) + + edges = iter(response.edges) + for circle in sorted(circles, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == circle.id + assert edge["name"] == circle.name + + +@pytest.mark.anyio +async def test_add_circle(add_circle): + response = Response(await add_circle({"name": "added circle"})) + response.assert_is("AddSuccess") + + circle = await DB.get(Circle, response.id) + assert circle is not None + assert circle.name == "added circle" + + +@pytest.mark.anyio +async def test_add_circle_fails_empty_parameter(add_circle): + response = Response(await add_circle({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_circle_fails_exists(add_circle, gen_circle): + circle = await DB.add(next(gen_circle)) + + response = Response(await add_circle({"name": circle.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Circle with this name exists" + + +@pytest.mark.anyio +async def test_delete_circle(delete_circles, gen_circle): + circle = await DB.add(next(gen_circle)) + id = circle.id + + response = Response(await delete_circles(id)) + response.assert_is("DeleteSuccess") + + circle = await DB.get(Circle, id) + assert circle is None + + +@pytest.mark.anyio +async def test_delete_circle_not_found(delete_circles): + response = Response(await delete_circles(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Circle ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_circle(update_circles, gen_circle): + circle = await DB.add(next(gen_circle)) + + input = {"name": "updated circle"} + response = Response(await update_circles(circle.id, input)) + response.assert_is("UpdateSuccess") + + circle = await DB.get(Circle, circle.id) + assert circle is not None + assert circle.name == "updated circle" + + +@pytest.mark.anyio +async def test_update_circle_fails_exists(update_circles, gen_circle): + first = await DB.add(next(gen_circle)) + second = await DB.add(next(gen_circle)) + + response = Response(await update_circles(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Circle with this name exists" + + +@pytest.mark.anyio +async def test_update_circle_fails_not_found(update_circles): + response = Response(await update_circles(1, {"name": "updated circle"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Circle ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_circles_cannot_bulk_edit_name(update_circles, gen_circle): + first = await DB.add(next(gen_circle)) + second = await DB.add(next(gen_circle)) + + response = Response(await update_circles([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_circle_fails_empty_parameter(update_circles, gen_circle, empty): + circle = await DB.add(next(gen_circle)) + + response = Response(await update_circles(circle.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_circle_changes_updated_at(update_circles): + original_circle = Circle(name="circle") + original_circle.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_circle = await DB.add(original_circle) + + response = Response(await update_circles(original_circle.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + circle = await DB.get(Circle, original_circle.id) + assert circle.updated_at > original_circle.updated_at diff --git a/tests/api/test_collection.py b/tests/api/test_collection.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/api/test_collection.py diff --git a/tests/api/test_comic.py b/tests/api/test_comic.py new file mode 100644 index 0000000..d3fa51e --- /dev/null +++ b/tests/api/test_comic.py @@ -0,0 +1,1505 @@ +from datetime import date, timezone +from datetime import datetime as dt + +import pytest +from conftest import DB, Response +from hircine.db.models import ( + Artist, + Circle, + Comic, + ComicArtist, + ComicTag, + Namespace, + Tag, + World, +) +from hircine.enums import Category, Censorship, Direction, Language, Layout, Rating + +full_comic_fragment = """ + fragment FullComic on FullComic { + id + title + category + censorship + createdAt + date + direction + language + layout + originalTitle + url + rating + pageCount + updatedAt + organized + bookmarked + archive { + __typename + id + } + artists { + __typename + id + } + characters { + __typename + id + } + circles { + __typename + id + } + cover { + __typename + id + } + pages { + __typename + id + } + tags { + __typename + id + name + } + worlds { + __typename + id + } + } +""" + +comic_fragment = """ + fragment Comic on Comic { + id + title + category + censorship + date + language + originalTitle + rating + pageCount + organized + bookmarked + artists { + __typename + id + } + characters { + __typename + id + } + circles { + __typename + id + } + cover { + __typename + id + } + tags { + __typename + id + name + } + worlds { + __typename + id + } + } +""" + + +@pytest.fixture +def query_comic(execute_id): + query = """ + query comic($id: Int!) { + comic(id: $id) { + __typename + ... on FullComic { + ...FullComic + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(full_comic_fragment + query) + + +@pytest.fixture +def query_comics(execute): + query = """ + query comics { + comics { + __typename + count + edges { + ...Comic + } + } + } + """ + + return execute(comic_fragment + query) + + +@pytest.fixture +def add_comic(execute_add): + mutation = """ + mutation addComic($input: AddComicInput!) { + addComic(input: $input) { + __typename + ... on AddComicSuccess { + id + archivePagesRemaining + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on PageClaimedError { + comicId + id + } + ... on PageRemoteError { + archiveId + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def delete_comics(execute_delete): + mutation = """ + mutation deleteComics($ids: [Int!]!) { + deleteComics(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.fixture +def update_comics(execute_update): + mutation = """ + mutation updateComics($ids: [Int!]!, $input: UpdateComicInput!) { + updateComics(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on PageRemoteError { + id + archiveId + } + ... on PageClaimedError { + id + comicId + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def upsert_comics(execute_update): + mutation = """ + mutation upsertComics($ids: [Int!]!, $input: UpsertComicInput!) { + upsertComics(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +def assert_association_matches(obj, model, typename): + assert obj["__typename"] == typename + assert obj["id"] == model.id + + +def assert_associations_match(objlist, modellist, typename, sortkey): + assert isinstance((objlist), list) + assert len(objlist) == len(modellist) + model = iter(sorted(modellist, key=sortkey)) + for obj in objlist: + assert_association_matches(obj, next(model), typename) + + +def assert_comic_item_matches(data, comic): + assert data["id"] == comic.id + assert data["title"] == comic.title + assert data["originalTitle"] == comic.original_title + assert date.fromisoformat(data["date"]) == comic.date + assert Rating[data["rating"]] == comic.rating + assert Language[data["language"]] == comic.language + assert data["pageCount"] == comic.page_count + assert data["organized"] == comic.organized + assert data["bookmarked"] == comic.bookmarked + + if data["category"]: + assert Category[data["category"]] == comic.category + else: + assert comic.category is None + + if data["censorship"]: + assert Censorship[data["censorship"]] == comic.censorship + else: + assert comic.censorship is None + + assert_association_matches(data["cover"], comic.cover, "Image") + assert_associations_match( + data["artists"], comic.artists, "Artist", lambda a: a.name + ) + assert_associations_match( + data["characters"], comic.characters, "Character", lambda c: c.name + ) + assert_associations_match( + data["circles"], comic.circles, "Circle", lambda c: c.name + ) + assert_associations_match(data["tags"], comic.tags, "ComicTag", lambda t: t.name) + assert_associations_match(data["worlds"], comic.worlds, "World", lambda w: w.name) + + +def assert_comic_matches(data, comic): + assert_comic_item_matches(data, comic) + assert dt.fromisoformat(data["createdAt"]) == comic.created_at + assert dt.fromisoformat(data["updatedAt"]) == comic.updated_at + assert Direction[data["direction"]] == comic.direction + assert Layout[data["layout"]] == comic.layout + assert data["url"] == comic.url + + assert_association_matches(data["archive"], comic.archive, "Archive") + assert_associations_match(data["pages"], comic.pages, "Page", lambda p: p.index) + + +@pytest.mark.anyio +async def test_query_comic(query_comic, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_comic(comic.id)) + response.assert_is("FullComic") + + assert_comic_matches(response.data, comic) + + +@pytest.mark.anyio +async def test_query_comic_sorts_pages(query_comic, gen_jumbled_archive): + archive = next(gen_jumbled_archive) + + comic = await DB.add( + Comic( + id=1, + title="A Jumbled Mess", + archive=archive, + pages=archive.pages, + cover=archive.cover, + ) + ) + + response = Response(await query_comic(comic.id)) + response.assert_is("FullComic") + + assert_associations_match(response.pages, comic.pages, "Page", lambda p: p.index) + + +@pytest.mark.anyio +async def test_query_comic_fails_not_found(query_comic): + response = Response(await query_comic(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_comics(query_comics, gen_comic): + comics = await DB.add_all(*gen_comic) + + response = Response(await query_comics()) + response.assert_is("ComicFilterResult") + + assert response.count == len(comics) + assert isinstance((response.edges), list) + assert len(response.edges) == len(comics) + + edge = iter(response.edges) + for comic in sorted(comics, key=lambda c: c.title): + assert_comic_item_matches(next(edge), comic) + + +@pytest.mark.anyio +async def test_add_comic(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + before = dt.now(timezone.utc).replace(microsecond=0) + + response = Response( + await add_comic( + { + "title": "The Comically Bad Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("AddComicSuccess") + assert response.archivePagesRemaining is False + + after = dt.now(timezone.utc).replace(microsecond=0) + + comic = await DB.get(Comic, response.id, full=True) + assert comic is not None + assert comic.title == "The Comically Bad Comic" + + assert comic.archive.id == archive.id + assert comic.archive.organized is True + + assert set([page.id for page in comic.pages]) == set( + [page.id for page in archive.pages] + ) + + assert comic.cover.id == archive.cover.id + + assert comic.category is None + assert comic.censorship is None + assert comic.created_at >= before + assert comic.created_at <= after + assert comic.date is None + assert comic.language is None + assert comic.layout == Layout.SINGLE + assert comic.original_title is None + assert comic.url is None + assert comic.rating is None + + assert comic.artists == [] + assert comic.characters == [] + assert comic.circles == [] + assert comic.tags == [] + assert comic.worlds == [] + + +@pytest.mark.anyio +async def test_add_comic_pages_remaining(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "The Unfinished Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages][:2]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("AddComicSuccess") + assert response.archivePagesRemaining is True + + comic = await DB.get(Comic, response.id, full=True) + assert comic.archive.organized is False + + +@pytest.mark.anyio +async def test_add_comic_fails_archive_not_found(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Voidful Comic", + "archive": {"id": 10}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("IDNotFoundError") + assert response.id == 10 + assert response.message == "Archive ID not found: '10'" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_not_found(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Pageless Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [10]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("IDNotFoundError") + assert response.id == 10 + assert response.message == "Page ID not found: '10'" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_claimed(add_comic, gen_archive): + other_archive = next(gen_archive) + other_comic = await DB.add( + Comic( + title="Lawful Comic", + archive=other_archive, + cover=other_archive.cover, + pages=other_archive.pages, + ) + ) + + claimed_page = other_comic.pages[0] + + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Comic of Attempted Burglary", + "archive": {"id": archive.id}, + "pages": {"ids": [claimed_page.id]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + + response.assert_is("PageClaimedError") + assert response.id == claimed_page.id + assert response.comicId == other_comic.id + assert ( + response.message + == f"Page ID {claimed_page.id} is already claimed by comic ID {other_comic.id}" + ) + + +@pytest.mark.anyio +async def test_add_comic_fails_empty_parameter(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("InvalidParameterError") + assert response.parameter == "title" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_remote(add_comic, gen_archive): + other_archive = await DB.add(next(gen_archive)) + other_page = other_archive.pages[0] + + archive = await DB.add(next(gen_archive)) + + response = Response( + await add_comic( + { + "title": "Comic of Multiple Archives", + "archive": {"id": archive.id}, + "pages": {"ids": [other_page.id]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + + response.assert_is("PageRemoteError") + assert response.id == other_page.id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_add_comic_fails_cover_remote(add_comic, gen_archive): + other_archive = await DB.add(next(gen_archive)) + other_page = other_archive.pages[0] + + archive = await DB.add(next(gen_archive)) + + response = Response( + await add_comic( + { + "title": "Comic of Multiple Archives", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": other_page.id}, + } + ) + ) + + response.assert_is("PageRemoteError") + assert response.id == other_page.id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_delete_comic(delete_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await delete_comics(comic.id)) + response.assert_is("DeleteSuccess") + + comic = await DB.get(Comic, comic.id) + assert comic is None + + +@pytest.mark.anyio +async def test_delete_comic_not_found(delete_comics): + response = Response(await delete_comics(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +def assert_assocs_match(assocs, collection, name_only=False): + assert set([o.name for o in assocs]) == set([o.name for o in collection]) + assert set([o.id for o in assocs]) == set([o.id for o in collection]) + + +@pytest.mark.anyio +async def test_update_comic(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + artists = await DB.add_all(Artist(name="arty"), Artist(name="farty")) + circles = await DB.add_all(Circle(name="round"), Circle(name="oval")) + worlds = await DB.add_all(World(name="animal world"), World(name="no spiders")) + + namespace = await DB.add(Namespace(name="emus")) + tag = await DB.add(Tag(name="creepy")) + ct = ComicTag(namespace=namespace, tag=tag) + + new_tags = [ct] + original_comic.tags + new_pages = [p.id for p in original_comic.pages[:2]] + + input = { + "title": "Saucy Savannah Adventures (now in Italian)", + "url": "file:///home/savannah/avventura", + "originalTitle": original_comic.title, + "cover": {"id": original_comic.pages[1].id}, + "pages": {"ids": new_pages}, + "favourite": False, + "organized": True, + "bookmarked": True, + "artists": {"ids": [a.id for a in artists]}, + "circles": {"ids": [c.id for c in circles]}, + "worlds": {"ids": [w.id for w in worlds]}, + "tags": {"ids": [ct.id for ct in new_tags]}, + "date": "2010-07-06", + "direction": "LEFT_TO_RIGHT", + "language": "IT", + "layout": "DOUBLE_OFFSET", + "rating": "EXPLICIT", + "censorship": "BAR", + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + assert comic.title == "Saucy Savannah Adventures (now in Italian)" + assert comic.original_title == original_comic.title + assert comic.cover.id == original_comic.pages[1].image.id + assert comic.url == "file:///home/savannah/avventura" + assert comic.favourite is False + assert comic.organized is True + assert comic.bookmarked is True + + assert set([p.id for p in comic.pages]) == set(new_pages) + + assert_assocs_match(comic.artists, artists) + assert_assocs_match(comic.circles, circles) + assert_assocs_match(comic.characters, original_comic.characters) + assert_assocs_match(comic.worlds, worlds) + assert_assocs_match(comic.tags, new_tags) + + assert comic.date == date(2010, 7, 6) + assert comic.direction == Direction.LEFT_TO_RIGHT + assert comic.layout == Layout.DOUBLE_OFFSET + assert comic.rating == Rating.EXPLICIT + assert comic.language == Language.IT + assert comic.censorship == Censorship.BAR + + +@pytest.mark.anyio +async def test_update_comic_clears_associations(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + empty = {"ids": []} + + input = { + "artists": empty, + "circles": empty, + "worlds": empty, + "tags": empty, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.artists == [] + assert comic.circles == [] + assert comic.worlds == [] + assert comic.tags == [] + + +@pytest.mark.anyio +async def test_update_comic_clears_enums(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + input = { + "category": None, + "censorship": None, + "rating": None, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.rating is None + assert comic.category is None + assert comic.censorship is None + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "with None", + "with empty string", + ], +) +@pytest.mark.anyio +async def test_update_comic_clears_string_fields(update_comics, gen_comic, empty): + original_comic = await DB.add(next(gen_comic)) + + input = { + "originalTitle": empty, + "url": empty, + "date": None, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.original_title is None + assert comic.date is None + assert comic.url is None + + +@pytest.mark.anyio +async def test_update_comic_fails_comic_not_found(update_comics): + response = Response(await update_comics(1, {"title": "This Will Not Happen"})) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.parametrize( + "parameter,empty", + [ + ("title", ""), + ("title", None), + ("direction", None), + ("layout", None), + ], + ids=[ + "title (empty string)", + "title (none)", + "direction", + "layout", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_empty_parameter( + update_comics, gen_archive, parameter, empty +): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {parameter: empty})) + response.assert_is("InvalidParameterError") + assert response.parameter == parameter + assert response.message == f"Invalid parameter '{parameter}': cannot be empty" + + +@pytest.mark.anyio +async def test_update_comic_fails_namespace_not_found(update_comics, gen_archive): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + tag = await DB.add(Tag(name="shiny")) + + response = Response( + await update_comics(comic.id, {"tags": {"ids": [f"1:{tag.id}"]}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_comic_fails_tag_not_found(update_comics, gen_archive): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + namespace = await DB.add(Namespace(name="height")) + + response = Response( + await update_comics(comic.id, {"tags": {"ids": [f"{namespace.id}:1"]}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.parametrize( + "input", + [ + "", + ":1", + "1:", + "a:b", + "tag", + ], + ids=[ + "empty", + "namespacing missing", + "tag missing", + "no numeric ids", + "wrong format", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_invalid_tag(update_comics, gen_archive, input): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {"tags": {"ids": [input]}})) + response.assert_is("InvalidParameterError") + assert response.parameter == "id" + + msg = "Invalid parameter 'id': ComicTag ID must be specified as <namespace_id>:<tag_id>" # noqa: E501 + assert response.message == msg + + +@pytest.mark.parametrize( + "name,key,id", + [ + ("Artist", "artists", 1), + ("Character", "characters", 1), + ("Circle", "circles", 1), + ("World", "worlds", 1), + ], + ids=[ + "artist", + "character", + "circle", + "world", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_assoc_not_found( + update_comics, gen_archive, name, key, id +): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {key: {"ids": [id]}})) + response.assert_is("IDNotFoundError") + assert response.id == id + assert response.message == f"{name} ID not found: '{id}'" + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + ], + ids=[ + "default option", + "empty option", + "explicit option", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignores_with(upsert_comics, gen_comic, option): + original_comic = await DB.add(next(gen_comic)) + + new_artists = await DB.add_all(Artist(name="arty"), Artist(name="farty")) + + input = { + "artists": { + "names": [a.name for a in new_artists] + ["newy"], + "options": option, + }, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.artists, original_comic.artists + list(new_artists)) + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + ], + ids=[ + "default option", + "empty option", + "explicit option", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignores_missing_tags_with(upsert_comics, gen_comic, option): + original_comic = await DB.add(next(gen_comic)) + + tags = await DB.add_all( + ComicTag( + comic_id=original_comic.id, + namespace=Namespace(name="foo"), + tag=Tag(name="bar"), + ) + ) + + input = { + "tags": {"names": ["foo:bar", "baz:qux"], "options": option}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, original_comic.tags + list(tags)) + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + {"onMissing": "CREATE"}, + ], + ids=[ + "default option", + "empty option", + "IGNORE", + "CREATE", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_skips_existing_tags(upsert_comics, gen_comic, option): + comic = await DB.add(next(gen_comic)) + + ctag = comic.tags[0] + names = [f"{ctag.namespace.name}:{ctag.tag.name}"] + + input = { + "tags": {"names": names, "options": option}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, comic.tags) + + +@pytest.mark.parametrize( + "valid", + [ + True, + False, + ], + ids=[ + "valid combination", + "invalid combination", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignore_missing_handles_resident( + upsert_comics, gen_comic, valid +): + original_comic = await DB.add(next(gen_comic)) + + namespace = await DB.add(Namespace(name="foo")) + tag = Tag(name="bar") + if valid: + tag.namespaces = [namespace] + + tag = await DB.add(tag) + ctag = ComicTag(namespace=namespace, tag=tag) + + expected_tags = original_comic.tags + + if valid: + expected_tags.append(ctag) + + input = { + "tags": {"names": ["foo:bar"], "options": {"onMissing": "IGNORE"}}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, expected_tags) + + +@pytest.mark.anyio +async def test_upsert_comic_tags_uses_existing(upsert_comics, empty_comic): + original_comic = await DB.add(empty_comic) + + await DB.add_all(Namespace(name="foo")) + await DB.add_all(Tag(name="bar")) + + tag_names = ["foo:bar"] + + response = Response( + await upsert_comics( + original_comic.id, + {"tags": {"names": tag_names, "options": {"onMissing": "CREATE"}}}, + ) + ) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert set(tag_names) == set( + [f"{t.namespace.name}:{t.tag.name}" for t in comic.tags] + ) + + +@pytest.mark.parametrize( + "key,list", + [ + ("artists", ["arty", "farty"]), + ("tags", ["alien:medium", "human:tiny"]), + ("artists", ["arty", "arty"]), + ("tags", ["foo:good", "bar:good"]), + ("tags", ["foo:good", "foo:bad"]), + ("artists", []), + ], + ids=[ + "artists", + "tags", + "artists (duplicate)", + "tags (duplicate)", + "namespace (duplicate)", + "artists (empty)", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_creates(upsert_comics, empty_comic, key, list): + original_comic = await DB.add(empty_comic) + + input = { + key: {"names": list, "options": {"onMissing": "CREATE"}}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert set(list) == set([o.name for o in getattr(comic, key)]) + + +@pytest.mark.anyio +async def test_upsert_comic_fails_creating_empty_assoc_name(upsert_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + input = { + "artists": {"names": ""}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("InvalidParameterError") + assert response.parameter == "Artist.name" + + +@pytest.mark.anyio +async def test_upsert_comic_does_not_replace(upsert_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + original_artists = set([a.name for a in original_comic.artists]) + + input = { + "artists": {"names": []}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id) + artists = set([a.name for a in comic.artists]) + + assert artists == original_artists + + +@pytest.mark.parametrize( + "input", + [ + "", + ":tiny", + "human:", + "medium", + ], + ids=[ + "empty", + "namespace missing", + "tag missing", + "wrong format", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_fails_creating_invalid_tag(upsert_comics, gen_comic, input): + comic = await DB.add(next(gen_comic)) + + input = { + "tags": {"names": [input]}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + msg = "Invalid parameter 'name': ComicTag name must be specified as <namespace>:<tag>" # noqa: E501 + assert response.message == msg + + +@pytest.mark.parametrize( + "options", + [ + None, + {}, + {"mode": "REPLACE"}, + ], + ids=[ + "by default (none)", + "by default (empty record)", + "when defined explicitly", + ], +) +@pytest.mark.anyio +async def test_update_comic_replaces_assocs(update_comics, gen_comic, options): + original_comic = await DB.add(next(gen_comic)) + new_artist = await DB.add(Artist(name="max")) + + input = { + "artists": {"ids": [new_artist.id]}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, [new_artist]) + + +@pytest.mark.anyio +async def test_update_comic_adds_assocs(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + new_artist = await DB.add(Artist(name="max")) + added_artists = original_comic.artists + [new_artist] + + input = { + "artists": {"ids": [new_artist.id], "options": {"mode": "ADD"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, added_artists) + + +@pytest.mark.anyio +async def test_update_comic_adds_existing_assocs(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + artists = original_comic.artists + + input = { + "artists": { + "ids": [artist.id for artist in artists], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, artists) + + +@pytest.mark.anyio +async def test_update_comic_adds_tags(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + new_namespace = await DB.add(Namespace(name="new")) + new_tag = await DB.add(Tag(name="new")) + added_tags = original_comic.tags + [ + ComicTag(comic_id=original_comic.id, tag=new_tag, namespace=new_namespace) + ] + + input = { + "tags": { + "ids": [f"{new_namespace.id}:{new_tag.id}"], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert_assocs_match(comic.tags, added_tags) + + +@pytest.mark.anyio +async def test_update_comic_adds_existing_tags(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + tags = original_comic.tags + + input = { + "tags": { + "ids": [f"{tag.namespace.id}:{tag.tag.id}" for tag in tags], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert_assocs_match(comic.tags, tags) + + +@pytest.mark.anyio +async def test_update_comic_removes_assocs(update_comics, empty_comic): + original_comic = empty_comic + removed_artist = Artist(id=1, name="sam") + remaining_artist = Artist(id=2, name="max") + original_comic.artists = [removed_artist, remaining_artist] + original_comic = await DB.add(original_comic) + + input = { + "artists": {"ids": [removed_artist.id], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, [remaining_artist]) + + +@pytest.mark.anyio +async def test_update_comic_removes_tags(update_comics, empty_comic): + original_comic = empty_comic + removed_tag = ComicTag( + comic_id=original_comic.id, + tag=Tag(id=1, name="gone"), + namespace=Namespace(id=1, name="all"), + ) + remaining_tag = ComicTag( + comic_id=original_comic.id, + tag=Tag(id=2, name="there"), + namespace=Namespace(id=2, name="still"), + ) + original_comic.tags = [removed_tag, remaining_tag] + original_comic = await DB.add(original_comic) + + input = { + "tags": {"ids": ["1:1"], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.tags, [remaining_tag]) + + +@pytest.mark.parametrize( + "rows,input", + [ + ([], {"title": "Updated Comic"}), + ( + [Artist(id=1, name="artist")], + {"artists": {"ids": [1]}}, + ), + ( + [Artist(id=1, name="artist"), ComicArtist(artist_id=1, comic_id=100)], + {"title": "Updated Comic", "artists": {"ids": [1]}}, + ), + ( + [ + Namespace(id=1, name="ns"), + Tag(id=1, name="artist"), + ], + {"tags": {"ids": ["1:1"]}}, + ), + ( + [ + Namespace(id=1, name="ns"), + Tag(id=1, name="artist"), + ComicTag(namespace_id=1, tag_id=1, comic_id=100), + ], + {"title": "Updated Comic", "tags": {"ids": ["1:1"]}}, + ), + ], + ids=[ + "with scalar", + "with assoc", + "with scalar and existing assoc", + "with tag", + "with scalar and existing tag", + ], +) +@pytest.mark.anyio +async def test_update_comic_changes_updated_at(update_comics, empty_comic, rows, input): + original_comic = empty_comic + original_comic.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_comic = await DB.add(original_comic) + + await DB.add_all(*rows) + + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id) + assert comic.updated_at > original_comic.updated_at + + +@pytest.mark.anyio +async def test_update_comic_cover_fails_page_not_found(update_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await update_comics(comic.id, {"cover": {"id": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_comic_cover_fails_page_remote( + update_comics, gen_comic, gen_archive +): + comic = await DB.add(next(gen_comic)) + other_archive = await DB.add(next(gen_archive)) + remote_id = other_archive.pages[0].id + + response = Response(await update_comics(comic.id, {"cover": {"id": remote_id}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_not_found(update_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await update_comics(comic.id, {"pages": {"ids": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_remote( + update_comics, gen_comic, gen_archive +): + comic = await DB.add(next(gen_comic)) + other_archive = await DB.add(next(gen_archive)) + remote_id = other_archive.pages[0].id + + response = Response(await update_comics(comic.id, {"pages": {"ids": [remote_id]}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_claimed(update_comics, gen_archive): + archive = await DB.add(next(gen_archive)) + + comic = await DB.add( + Comic( + id=1, + title="A Very Good Comic", + archive=archive, + cover=archive.pages[0].image, + pages=[archive.pages[0], archive.pages[1]], + ) + ) + + claiming = await DB.add( + Comic( + id=2, + title="A Very Claiming Comic", + archive=archive, + cover=archive.pages[2].image, + pages=[archive.pages[2], archive.pages[3]], + ) + ) + + claimed_id = claiming.pages[0].id + + response = Response(await update_comics(comic.id, {"pages": {"ids": [claimed_id]}})) + response.assert_is("PageClaimedError") + assert response.id == claimed_id + assert response.comicId == claiming.id + assert ( + response.message + == f"Page ID {claimed_id} is already claimed by comic ID {claiming.id}" + ) + + +@pytest.mark.parametrize( + "mode", + [ + ("REPLACE"), + ("REMOVE"), + ], +) +@pytest.mark.anyio +async def test_update_comic_pages_fails_empty(update_comics, gen_comic, mode): + comic = await DB.add(next(gen_comic)) + + ids = [] if mode == "REPLACE" else [p.id for p in comic.pages] + + response = Response( + await update_comics( + comic.id, {"pages": {"ids": ids, "options": {"mode": mode}}} + ) + ) + response.assert_is("InvalidParameterError") + assert response.parameter == "pages" + assert response.message == "Invalid parameter 'pages': cannot be empty" diff --git a/tests/api/test_comic_tag.py b/tests/api/test_comic_tag.py new file mode 100644 index 0000000..f536b79 --- /dev/null +++ b/tests/api/test_comic_tag.py @@ -0,0 +1,134 @@ +from functools import partial + +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace, Tag + + +@pytest.fixture +def query_comic_tags(schema_execute): + query = """ + query comicTags($forFilter: Boolean) { + comicTags(forFilter: $forFilter) { + edges { + __typename + id + name + } + count + } + } + """ + + def wrapper(q): + async def _execute(for_filter=False): + return await schema_execute(q, {"forFilter": for_filter}) + + return _execute + + return wrapper(query) + + +def build_item(namespace, tag): + nid, tid = "", "" + nname, tname = "", "" + + if namespace: + nid, nname = namespace.id, namespace.name + + if tag: + tid, tname = tag.id, tag.name + + item = { + "__typename": "ComicTag", + "id": f"{nid}:{tid}", + "name": f"{nname}:{tname}", + } + + return item + + +@pytest.mark.anyio +async def test_query_comic_tags_cross(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_foo, ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo, ns_bar]) + + await DB.add_all(ns_foo, ns_bar) + await DB.add_all(tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags()) + assert response.data["edges"] == [ + builder(ns_bar, tag_qar), + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + builder(ns_foo, tag_qoo), + ] + + +@pytest.mark.anyio +async def test_query_comic_tags_restricted_namespace(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo]) + + await DB.add_all(ns_foo, ns_bar) + await DB.add_all(tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags()) + assert response.data["edges"] == [ + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + ] + + +@pytest.mark.anyio +async def test_query_comic_tag_matchers_cross(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_foo, ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo, ns_bar]) + + await DB.add_all(ns_foo, ns_bar, tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags(for_filter=True)) + assert response.data["edges"] == [ + builder(ns_bar, None), + builder(ns_foo, None), + builder(None, tag_qar), + builder(None, tag_qoo), + builder(ns_bar, tag_qar), + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + builder(ns_foo, tag_qoo), + ] + + +@pytest.mark.anyio +async def test_query_comic_tag_matchers_restricted_namespace(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo]) + + await DB.add_all(ns_foo, ns_bar, tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags(for_filter=True)) + assert response.data["edges"] == [ + builder(ns_bar, None), + builder(ns_foo, None), + builder(None, tag_qar), + builder(None, tag_qoo), + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + ] diff --git a/tests/api/test_db.py b/tests/api/test_db.py new file mode 100644 index 0000000..f53b90f --- /dev/null +++ b/tests/api/test_db.py @@ -0,0 +1,324 @@ +from datetime import datetime, timedelta, timezone + +import hircine.db as database +import hircine.db.models as models +import hircine.db.ops as ops +import pytest +from conftest import DB +from hircine.db.models import ( + Artist, + Base, + Comic, + ComicTag, + DateTimeUTC, + MixinID, + Namespace, + Tag, + TagNamespaces, +) +from sqlalchemy.exc import StatementError +from sqlalchemy.orm import ( + Mapped, + mapped_column, +) + + +class Date(MixinID, Base): + date: Mapped[datetime] = mapped_column(DateTimeUTC) + + +@pytest.mark.anyio +async def test_db_requires_tzinfo(): + with pytest.raises(StatementError, match="tzinfo is required"): + await DB.add(Date(date=datetime(2019, 4, 22))) + + +@pytest.mark.anyio +async def test_db_converts_date_input_to_utc(): + date = datetime(2019, 4, 22, tzinfo=timezone(timedelta(hours=-4))) + await DB.add(Date(date=date)) + + item = await DB.get(Date, 1) + + assert item.date.tzinfo == timezone.utc + assert item.date == date + + +@pytest.mark.parametrize( + "modelcls,assoccls", + [ + (models.Artist, models.ComicArtist), + (models.Circle, models.ComicCircle), + (models.Character, models.ComicCharacter), + (models.World, models.ComicWorld), + ], + ids=["artists", "circles", "characters", "worlds"], +) +@pytest.mark.anyio +async def test_models_retained_when_clearing_association( + empty_comic, modelcls, assoccls +): + model = modelcls(id=1, name="foo") + key = f"{modelcls.__name__.lower()}s" + + comic = empty_comic + setattr(comic, key, [model]) + comic = await DB.add(comic) + + async with database.session() as s: + object = await s.get(Comic, comic.id) + setattr(object, key, []) + await s.commit() + + assert await DB.get(assoccls, (comic.id, model.id)) is None + assert await DB.get(Comic, comic.id) is not None + assert await DB.get(modelcls, model.id) is not None + + +@pytest.mark.anyio +async def test_models_retained_when_clearing_comictag(empty_comic): + comic = await DB.add(empty_comic) + + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="bar") + ct = ComicTag(comic_id=comic.id, namespace=namespace, tag=tag) + + await DB.add(ct) + + async with database.session() as s: + object = await s.get(Comic, comic.id) + object.tags = [] + await s.commit() + + assert await DB.get(ComicTag, (comic.id, ct.namespace_id, ct.tag_id)) is None + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Tag, tag.id) is not None + assert await DB.get(Comic, comic.id) is not None + + +@pytest.mark.parametrize( + "modelcls,assoccls", + [ + (models.Artist, models.ComicArtist), + (models.Circle, models.ComicCircle), + (models.Character, models.ComicCharacter), + (models.World, models.ComicWorld), + ], + ids=["artists", "circles", "characters", "worlds"], +) +@pytest.mark.anyio +async def test_only_association_cleared_when_deleting(empty_comic, modelcls, assoccls): + model = modelcls(id=1, name="foo") + + comic = empty_comic + setattr(comic, f"{modelcls.__name__.lower()}s", [model]) + comic = await DB.add(comic) + + await DB.delete(modelcls, model.id) + assert await DB.get(assoccls, (comic.id, model.id)) is None + assert await DB.get(Comic, comic.id) is not None + + +@pytest.mark.parametrize( + "deleted", + [ + "namespace", + "tag", + ], +) +@pytest.mark.anyio +async def test_only_comictag_association_cleared_when_deleting(empty_comic, deleted): + comic = await DB.add(empty_comic) + + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="bar") + + await DB.add(ComicTag(comic_id=comic.id, namespace=namespace, tag=tag)) + + if deleted == "namespace": + await DB.delete(Namespace, namespace.id) + elif deleted == "tag": + await DB.delete(Tag, tag.id) + + assert await DB.get(ComicTag, (comic.id, namespace.id, tag.id)) is None + if deleted == "namespace": + assert await DB.get(Tag, tag.id) is not None + elif deleted == "tag": + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Comic, comic.id) is not None + + +@pytest.mark.parametrize( + "modelcls,assoccls", + [ + (models.Artist, models.ComicArtist), + (models.Circle, models.ComicCircle), + (models.Character, models.ComicCharacter), + (models.World, models.ComicWorld), + ], + ids=["artists", "circles", "characters", "worlds"], +) +@pytest.mark.anyio +async def test_deleting_comic_only_clears_association(empty_comic, modelcls, assoccls): + model = modelcls(id=1, name="foo") + + comic = empty_comic + setattr(comic, f"{modelcls.__name__.lower()}s", [model]) + comic = await DB.add(comic) + + await DB.delete(Comic, comic.id) + assert await DB.get(assoccls, (comic.id, model.id)) is None + assert await DB.get(modelcls, model.id) is not None + + +@pytest.mark.anyio +async def test_deleting_comic_only_clears_comictag(empty_comic): + comic = await DB.add(empty_comic) + + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="bar") + + await DB.add(ComicTag(comic_id=comic.id, namespace=namespace, tag=tag)) + await DB.delete(Comic, comic.id) + + assert await DB.get(ComicTag, (comic.id, namespace.id, tag.id)) is None + assert await DB.get(Tag, tag.id) is not None + assert await DB.get(Namespace, namespace.id) is not None + + +@pytest.mark.anyio +async def test_models_retained_when_clearing_tagnamespace(): + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="foo", namespaces=[namespace]) + + tag = await DB.add(tag) + + async with database.session() as s: + db_tag = await s.get(Tag, tag.id, options=Tag.load_full()) + db_tag.namespaces = [] + await s.commit() + + assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Tag, tag.id) is not None + + +@pytest.mark.anyio +async def test_only_tagnamespace_cleared_when_deleting_tag(): + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="foo", namespaces=[namespace]) + + tag = await DB.add(tag) + + await DB.delete(Tag, tag.id) + + assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Tag, tag.id) is None + + +@pytest.mark.anyio +async def test_only_tagnamespace_cleared_when_deleting_namespace(): + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="foo", namespaces=[namespace]) + + tag = await DB.add(tag) + + await DB.delete(Namespace, namespace.id) + + assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None + assert await DB.get(Namespace, namespace.id) is None + assert await DB.get(Tag, tag.id) is not None + + +@pytest.mark.parametrize( + "use_identity_map", + [False, True], + ids=["without identity lookup", "with identity lookup"], +) +@pytest.mark.anyio +async def test_ops_get_all(gen_artist, use_identity_map): + artist = await DB.add(next(gen_artist)) + have = list(await DB.add_all(*gen_artist)) + have.append(artist) + + missing_ids = [10, 20] + + async with database.session() as s: + if use_identity_map: + s.add(artist) + + artists, missing = await ops.get_all( + s, + Artist, + [a.id for a in have] + missing_ids, + use_identity_map=use_identity_map, + ) + + assert set([a.id for a in artists]) == set([a.id for a in have]) + assert missing == set(missing_ids) + + +@pytest.mark.anyio +async def test_ops_get_all_names(gen_artist): + have = await DB.add_all(*gen_artist) + missing_names = ["arty", "farty"] + + async with database.session() as s: + artists, missing = await ops.get_all_names( + s, Artist, [a.name for a in have] + missing_names + ) + + assert set([a.name for a in artists]) == set([a.name for a in have]) + assert missing == set(missing_names) + + +@pytest.mark.parametrize( + "missing", + [[("foo", "bar"), ("qux", "qaz")], []], + ids=["missing", "no missing"], +) +@pytest.mark.anyio +async def test_ops_get_ctag_names(gen_comic, gen_tag, gen_namespace, missing): + comic = await DB.add(next(gen_comic)) + have = [(ct.namespace.name, ct.tag.name) for ct in comic.tags] + + async with database.session() as s: + cts, missing = await ops.get_ctag_names(s, comic.id, have + missing) + + assert set(have) == set([(ct.namespace.name, ct.tag.name) for ct in cts]) + assert missing == set(missing) + + +@pytest.mark.anyio +async def test_ops_lookup_identity(gen_artist): + one = await DB.add(next(gen_artist)) + two = await DB.add(next(gen_artist)) + rest = await DB.add_all(*gen_artist) + + async with database.session() as s: + get_one = await s.get(Artist, one.id) + get_two = await s.get(Artist, two.id) + s.add(get_one, get_two) + + artists, satisfied = ops.lookup_identity( + s, Artist, [a.id for a in [one, two] + list(rest)] + ) + + assert set([a.name for a in artists]) == set([a.name for a in [one, two]]) + assert satisfied == set([one.id, two.id]) + + +@pytest.mark.anyio +async def test_ops_get_image_orphans(gen_archive, gen_image): + await DB.add(next(gen_archive)) + + orphan_one = await DB.add(next(gen_image)) + orphan_two = await DB.add(next(gen_image)) + + async with database.session() as s: + orphans = set(await ops.get_image_orphans(s)) + + assert orphans == set( + [(orphan_one.id, orphan_one.hash), (orphan_two.id, orphan_two.hash)] + ) diff --git a/tests/api/test_filter.py b/tests/api/test_filter.py new file mode 100644 index 0000000..67a953f --- /dev/null +++ b/tests/api/test_filter.py @@ -0,0 +1,521 @@ +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace, Tag + + +@pytest.fixture +def query_comic_filter(execute_filter): + query = """ + query comics($filter: ComicFilterInput) { + comics(filter: $filter) { + __typename + count + edges { + id + title + } + } + } + """ + + return execute_filter(query) + + +@pytest.fixture +def query_string_filter(execute_filter): + query = """ + query artists($filter: ArtistFilterInput) { + artists(filter: $filter) { + __typename + count + edges { + id + name + } + } + } + """ + + return execute_filter(query) + + +@pytest.fixture +def query_tag_filter(execute_filter): + query = """ + query tags($filter: TagFilterInput) { + tags(filter: $filter) { + __typename + count + edges { + id + name + } + } + } + """ + + return execute_filter(query) + + +def id_list(edges): + return sorted([int(edge["id"]) for edge in edges]) + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"name": {"contains": "robin"}}}, + [3, 4], + ), + ({"exclude": {"name": {"contains": "smith"}}}, [2, 3]), + ( + { + "exclude": {"name": {"contains": "robin"}}, + "include": {"name": {"contains": "smith"}}, + }, + [1], + ), + ], + ids=[ + "includes", + "excludes", + "includes and excludes", + ], +) +@pytest.mark.anyio +async def test_string_filter(query_string_filter, gen_artist, filter, ids): + await DB.add_all(*gen_artist) + + response = Response(await query_string_filter(filter)) + response.assert_is("ArtistFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,empty_response", + [ + ({"include": {"name": {"contains": ""}}}, False), + ({"include": {"name": {}}}, False), + ({"exclude": {"name": {"contains": ""}}}, True), + ({"exclude": {"name": {}}}, False), + ], + ids=[ + "string (include)", + "field (include)", + "string (exclude)", + "field (exclude)", + ], +) +@pytest.mark.anyio +async def test_string_filter_handles_empty( + query_string_filter, gen_artist, filter, empty_response +): + artists = await DB.add_all(*gen_artist) + + response = Response(await query_string_filter(filter)) + response.assert_is("ArtistFilterResult") + + if empty_response: + assert response.edges == [] + assert response.count == 0 + else: + assert len(response.edges) == len(artists) + assert response.count == len(artists) + + +@pytest.mark.parametrize( + "filter", + [ + {"include": {}}, + {"exclude": {}}, + ], + ids=[ + "include", + "exclude", + ], +) +@pytest.mark.anyio +async def test_filter_handles_empty_field(query_string_filter, gen_artist, filter): + artists = await DB.add_all(*gen_artist) + + response = Response(await query_string_filter(filter)) + response.assert_is("ArtistFilterResult") + + assert len(response.edges) == len(artists) + assert response.count == len(artists) + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"artists": {"any": 1}}}, + [1, 3], + ), + ( + {"include": {"artists": {"all": 1}}}, + [1, 3], + ), + ( + {"include": {"artists": {"any": [1, 4]}}}, + [1, 3, 4], + ), + ( + {"include": {"artists": {"all": [1, 4]}}}, + [3], + ), + ( + {"exclude": {"artists": {"any": 1}}}, + [2, 4], + ), + ( + {"exclude": {"artists": {"all": 1}}}, + [2, 4], + ), + ( + {"exclude": {"artists": {"any": [1, 4]}}}, + [2], + ), + ( + {"exclude": {"artists": {"all": [1, 4]}}}, + [1, 2, 4], + ), + ( + { + "include": {"artists": {"any": [1]}}, + "exclude": {"artists": {"all": [4]}}, + }, + [1], + ), + ( + { + "include": {"artists": {"any": [1, 4]}}, + "exclude": {"artists": {"all": [1, 4]}}, + }, + [1, 4], + ), + ( + { + "include": {"artists": {"any": [1, 4], "all": [1, 2]}}, + }, + [1], + ), + ], + ids=[ + "includes any (single)", + "includes all (single)", + "includes any (list)", + "includes all (list)", + "excludes any (single)", + "excludes all (single)", + "excludes any (list)", + "excludes all (list)", + "includes and excludes (single)", + "includes and excludes (list)", + "includes any and all", + ], +) +@pytest.mark.anyio +async def test_assoc_filter(query_comic_filter, gen_comic, filter, ids): + await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,empty_response", + [ + ({"include": {"artists": {"any": []}}}, True), + ({"include": {"artists": {"all": []}}}, True), + ({"include": {"artists": {}}}, False), + ({"exclude": {"artists": {"any": []}}}, False), + ({"exclude": {"artists": {"all": []}}}, False), + ({"exclude": {"artists": {}}}, False), + ({"include": {"tags": {"any": ""}}}, True), + ({"include": {"tags": {"any": ":"}}}, True), + ], + ids=[ + "list (include any)", + "list (include all)", + "field (include)", + "list (exclude any)", + "list (exclude all)", + "field (exclude)", + "string (tags)", + "specifier (tags)", + ], +) +@pytest.mark.anyio +async def test_assoc_filter_handles_empty( + query_comic_filter, gen_comic, filter, empty_response +): + comics = await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + + response.assert_is("ComicFilterResult") + + if empty_response: + assert response.edges == [] + assert response.count == 0 + else: + assert len(response.edges) == len(comics) + assert response.count == len(comics) + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"tags": {"any": "1:"}}}, + [1, 2, 3], + ), + ( + {"include": {"tags": {"any": ":2"}}}, + [1, 4], + ), + ( + {"include": {"tags": {"exact": ["1:3", "2:1"]}}}, + [2], + ), + ( + {"include": {"tags": {"exact": ["1:"]}}}, + [3], + ), + ( + {"include": {"tags": {"exact": [":4"]}}}, + [3], + ), + ( + {"exclude": {"tags": {"all": ["1:4", "1:1"]}}}, + [2, 3, 4], + ), + ( + {"exclude": {"tags": {"exact": ["2:1", "2:2", "2:3"]}}}, + [1, 2, 3], + ), + ( + {"exclude": {"tags": {"exact": ["1:"]}}}, + [1, 2, 4], + ), + ( + {"exclude": {"tags": {"exact": [":4"]}}}, + [1, 2, 4], + ), + ], + ids=[ + "includes any namespace", + "includes any tag", + "includes exact tags", + "includes exact namespace", + "includes exact tag", + "excludes all tags", + "includes exact tags", + "includes exact namespace", + "includes exact tag", + ], +) +@pytest.mark.anyio +async def test_assoc_tag_filter(query_comic_filter, gen_comic, filter, ids): + await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"favourite": True}}, + [1], + ), + ( + {"include": {"rating": {"any": "EXPLICIT"}}}, + [3], + ), + ( + {"include": {"category": {"any": "MANGA"}}}, + [1, 2], + ), + ( + {"include": {"censorship": {"any": "MOSAIC"}}}, + [3], + ), + ( + {"exclude": {"favourite": True}}, + [2, 3, 4], + ), + ( + {"exclude": {"rating": {"any": ["EXPLICIT", "QUESTIONABLE"]}}}, + [1, 4], + ), + ], + ids=[ + "includes favourite", + "includes rating", + "includes category", + "includes censorship", + "excludes favourite", + "excludes ratings", + ], +) +@pytest.mark.anyio +async def test_field_filter(query_comic_filter, gen_comic, filter, ids): + await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"rating": {"empty": True}}}, + [100], + ), + ( + {"include": {"rating": {"empty": False}}}, + [1, 2], + ), + ( + {"exclude": {"rating": {"empty": True}}}, + [1, 2], + ), + ( + {"exclude": {"rating": {"empty": False}}}, + [100], + ), + ], + ids=[ + "includes rating empty", + "includes rating not empty", + "excludes rating empty", + "excludes rating not empty", + ], +) +@pytest.mark.anyio +async def test_field_presence(query_comic_filter, gen_comic, empty_comic, filter, ids): + await DB.add(next(gen_comic)) + await DB.add(next(gen_comic)) + await DB.add(empty_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"artists": {"empty": True}}}, + [100], + ), + ( + {"include": {"artists": {"empty": False}}}, + [1, 2], + ), + ( + {"exclude": {"artists": {"empty": True}}}, + [1, 2], + ), + ( + {"exclude": {"artists": {"empty": False}}}, + [100], + ), + ( + {"include": {"tags": {"empty": True}}}, + [100], + ), + ( + {"include": {"tags": {"empty": False}}}, + [1, 2], + ), + ( + {"exclude": {"tags": {"empty": True}}}, + [1, 2], + ), + ( + {"exclude": {"tags": {"empty": False}}}, + [100], + ), + ], + ids=[ + "includes artist empty", + "includes artist not empty", + "excludes artist empty", + "excludes artist not empty", + "includes tags empty", + "includes tags not empty", + "excludes tags empty", + "excludes tags not empty", + ], +) +@pytest.mark.anyio +async def test_assoc_presence(query_comic_filter, gen_comic, empty_comic, filter, ids): + await DB.add(next(gen_comic)) + await DB.add(next(gen_comic)) + await DB.add(empty_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"namespaces": {"any": 1}}}, + [1, 2], + ), + ( + {"include": {"namespaces": {"all": [1, 2]}}}, + [2], + ), + ( + {"include": {"namespaces": {"exact": [1]}}}, + [1], + ), + ( + {"exclude": {"namespaces": {"any": 2}}}, + [1], + ), + ( + {"exclude": {"namespaces": {"exact": [1]}}}, + [2], + ), + ], + ids=[ + "includes any namespace", + "includes all namespace", + "includes exact namespaces", + "excludes any namespace", + "excludes exact namespaces", + ], +) +@pytest.mark.anyio +async def test_tag_assoc_filter(query_tag_filter, gen_namespace, gen_tag, filter, ids): + foo = await DB.add(Namespace(id=1, name="foo")) + bar = await DB.add(Namespace(id=2, name="bar")) + + await DB.add(Tag(id=1, name="small", namespaces=[foo])) + await DB.add(Tag(id=2, name="large", namespaces=[foo, bar])) + + response = Response(await query_tag_filter(filter)) + response.assert_is("TagFilterResult") + + assert id_list(response.edges) == ids diff --git a/tests/api/test_image.py b/tests/api/test_image.py new file mode 100644 index 0000000..c8c26b3 --- /dev/null +++ b/tests/api/test_image.py @@ -0,0 +1,16 @@ +import pytest +from conftest import DB +from hircine.api.types import Image + + +@pytest.mark.anyio +async def test_image(gen_image): + images = await DB.add_all(*gen_image) + + for db_image in images: + image = Image(db_image) + assert image.id == db_image.id + assert image.hash == db_image.hash + assert image.width == db_image.width + assert image.height == db_image.height + assert image.aspect_ratio == db_image.width / db_image.height diff --git a/tests/api/test_namespace.py b/tests/api/test_namespace.py new file mode 100644 index 0000000..450075b --- /dev/null +++ b/tests/api/test_namespace.py @@ -0,0 +1,291 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace + + +@pytest.fixture +def query_namespace(execute_id): + query = """ + query namespace($id: Int!) { + namespace(id: $id) { + __typename + ... on Namespace { + id + name + sortName + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_namespaces(execute): + query = """ + query namespaces { + namespaces { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_namespace(execute_add): + mutation = """ + mutation addNamespace($input: AddNamespaceInput!) { + addNamespace(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_namespaces(execute_update): + mutation = """ + mutation updateNamespaces($ids: [Int!]!, $input: UpdateNamespaceInput!) { + updateNamespaces(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_namespaces(execute_delete): + mutation = """ + mutation deleteNamespaces($ids: [Int!]!) { + deleteNamespaces(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_namespace(query_namespace, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + + response = Response(await query_namespace(namespace.id)) + response.assert_is("Namespace") + + assert response.id == namespace.id + assert response.name == namespace.name + assert response.sortName == namespace.sort_name + + +@pytest.mark.anyio +async def test_query_namespace_fails_not_found(query_namespace): + response = Response(await query_namespace(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_namespaces(query_namespaces, gen_namespace): + namespaces = await DB.add_all(*gen_namespace) + response = Response(await query_namespaces()) + response.assert_is("NamespaceFilterResult") + + assert response.count == len(namespaces) + assert isinstance((response.edges), list) + assert len(response.edges) == len(namespaces) + + edges = iter(response.edges) + for namespace in sorted(namespaces, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == namespace.id + assert edge["name"] == namespace.name + + +@pytest.mark.anyio +async def test_add_namespace(add_namespace): + response = Response(await add_namespace({"name": "added", "sortName": "foo"})) + response.assert_is("AddSuccess") + + namespace = await DB.get(Namespace, response.id) + assert namespace is not None + assert namespace.name == "added" + assert namespace.sort_name == "foo" + + +@pytest.mark.anyio +async def test_add_namespace_fails_empty_parameter(add_namespace): + response = Response(await add_namespace({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_namespace_fails_exists(add_namespace, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + + response = Response(await add_namespace({"name": namespace.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Namespace with this name exists" + + +@pytest.mark.anyio +async def test_delete_namespace(delete_namespaces, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + id = namespace.id + + response = Response(await delete_namespaces(id)) + response.assert_is("DeleteSuccess") + + namespace = await DB.get(Namespace, id) + assert namespace is None + + +@pytest.mark.anyio +async def test_delete_namespace_not_found(delete_namespaces): + response = Response(await delete_namespaces(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_namespace(update_namespaces, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + + input = {"name": "updated", "sortName": "foo"} + response = Response(await update_namespaces(namespace.id, input)) + response.assert_is("UpdateSuccess") + + namespace = await DB.get(Namespace, namespace.id) + assert namespace is not None + assert namespace.name == "updated" + assert namespace.sort_name == "foo" + + +@pytest.mark.anyio +async def test_update_namespace_fails_exists(update_namespaces, gen_namespace): + first = await DB.add(next(gen_namespace)) + second = await DB.add(next(gen_namespace)) + + response = Response(await update_namespaces(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Namespace with this name exists" + + +@pytest.mark.anyio +async def test_update_namespace_fails_not_found(update_namespaces): + response = Response(await update_namespaces(1, {"name": "updated"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_namespaces_cannot_bulk_edit_name( + update_namespaces, gen_namespace +): + first = await DB.add(next(gen_namespace)) + second = await DB.add(next(gen_namespace)) + + response = Response( + await update_namespaces([first.id, second.id], {"name": "unique"}) + ) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_namespace_fails_empty_parameter( + update_namespaces, gen_namespace, empty +): + namespace = await DB.add(next(gen_namespace)) + response = Response(await update_namespaces(namespace.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_namespace_changes_updated_at(update_namespaces): + original_namespace = Namespace(name="namespace") + original_namespace.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_namespace = await DB.add(original_namespace) + + response = Response( + await update_namespaces(original_namespace.id, {"name": "updated"}) + ) + response.assert_is("UpdateSuccess") + + namespace = await DB.get(Namespace, original_namespace.id) + assert namespace.updated_at > original_namespace.updated_at diff --git a/tests/api/test_page.py b/tests/api/test_page.py new file mode 100644 index 0000000..debd69a --- /dev/null +++ b/tests/api/test_page.py @@ -0,0 +1,39 @@ +from datetime import datetime, timezone + +import pytest +from conftest import DB +from hircine.api.types import Page +from hircine.db.models import Archive + + +@pytest.mark.anyio +async def test_page(gen_page): + pages = list(gen_page) + images = [p.image for p in pages] + + # persist images and pages in database by binding them to a throwaway + # archive + archive = await DB.add( + Archive( + hash="339e3a32e5648fdeb2597f05cb2e1ef6", + path="some/archive.zip", + size=78631597, + mtime=datetime(1999, 12, 27, tzinfo=timezone.utc), + cover=images[0], + pages=pages, + page_count=len(pages), + ) + ) + + assert len(archive.pages) == len(pages) + + page_iter = iter(pages) + image_iter = iter(images) + for page in [Page(p) for p in archive.pages]: + matching_page = next(page_iter) + matching_image = next(image_iter) + + assert page.id == matching_page.id + assert page.comic_id is None + assert page.image.id == matching_image.id + assert page.path == matching_page.path diff --git a/tests/api/test_pagination.py b/tests/api/test_pagination.py new file mode 100644 index 0000000..67854c3 --- /dev/null +++ b/tests/api/test_pagination.py @@ -0,0 +1,61 @@ +import pytest +from conftest import DB, Response + + +@pytest.fixture +def query_pagination(schema_execute): + query = """ + query artists($pagination: Pagination) { + artists(pagination: $pagination) { + __typename + count + edges { + id + } + } + } + """ + + async def _execute(pagination=None): + return await schema_execute( + query, {"pagination": pagination} if pagination else None + ) + + return _execute + + +@pytest.mark.parametrize( + "pagination,count,length", + [ + (None, 4, 4), + ({"items": 3, "page": 1}, 4, 3), + ({"items": 3, "page": 2}, 4, 1), + ({"items": 3, "page": 3}, 0, 0), + ({"items": 10, "page": 1}, 4, 4), + ({"items": 0, "page": 1}, 0, 0), + ({"items": 2, "page": 0}, 0, 0), + ({"items": -1, "page": 0}, 0, 0), + ({"items": 0, "page": -1}, 0, 0), + ], + ids=[ + "is missing and lists all", + "lists first page", + "lists last page", + "lists none (no more items)", + "lists all", + "lists none (zero items)", + "lists none (page zero)", + "lists none (negative items)", + "lists none (negative page)", + ], +) +@pytest.mark.anyio +async def test_pagination(query_pagination, gen_artist, pagination, count, length): + await DB.add_all(*gen_artist) + + response = Response(await query_pagination(pagination)) + response.assert_is("ArtistFilterResult") + + assert response.count == count + assert isinstance((response.edges), list) + assert len(response.edges) == length diff --git a/tests/api/test_scraper_api.py b/tests/api/test_scraper_api.py new file mode 100644 index 0000000..1edd74f --- /dev/null +++ b/tests/api/test_scraper_api.py @@ -0,0 +1,395 @@ +import hircine.enums as enums +import hircine.plugins +import hircine.scraper.types as scraped +import pytest +from conftest import DB, Response +from hircine.scraper import ScrapeError, Scraper, ScrapeWarning + + +@pytest.fixture +def query_comic_scrapers(schema_execute): + query = """ + query comicScrapers($id: Int!) { + comicScrapers(id: $id) { + __typename + id + name + } + } + """ + + async def _execute(id): + return await schema_execute(query, {"id": id}) + + return _execute + + +@pytest.fixture +def query_scrape_comic(schema_execute): + query = """ + query scrapeComic($id: Int!, $scraper: String!) { + scrapeComic(id: $id, scraper: $scraper) { + __typename + ... on ScrapeComicResult { + data { + title + originalTitle + url + artists + category + censorship + characters + circles + date + direction + language + layout + rating + tags + worlds + } + warnings + } + ... on Error { + message + } + ... on ScraperNotFoundError { + name + } + ... on ScraperNotAvailableError { + scraper + comicId + } + ... on IDNotFoundError { + id + } + } + } + """ + + async def _execute(id, scraper): + return await schema_execute(query, {"id": id, "scraper": scraper}) + + return _execute + + +@pytest.fixture +def scrapers(empty_plugins): + class GoodScraper(Scraper): + name = "Good Scraper" + is_available = True + source = "good" + + def scrape(self): + yield scraped.Title("Arid Savannah Adventures") + yield scraped.OriginalTitle("Arid Savannah Hijinx") + yield scraped.URL("file:///home/savannah/adventures") + yield scraped.Language(enums.Language.EN) + yield scraped.Date.from_iso("2010-07-05") + yield scraped.Direction(enums.Direction["LEFT_TO_RIGHT"]) + yield scraped.Layout(enums.Layout.SINGLE) + yield scraped.Rating(enums.Rating.SAFE) + yield scraped.Category(enums.Category.MANGA) + yield scraped.Censorship(enums.Censorship.NONE) + yield scraped.Tag.from_string("animal:small") + yield scraped.Tag.from_string("animal:medium") + yield scraped.Tag.from_string("animal:big") + yield scraped.Tag.from_string("animal:massive") + yield scraped.Artist("alan smithee") + yield scraped.Artist("david agnew") + yield scraped.Character("greta giraffe") + yield scraped.Character("bob bear") + yield scraped.Character("rico rhinoceros") + yield scraped.Character("ziggy zebra") + yield scraped.Circle("archimedes") + yield scraped.World("animal friends") + + class DuplicateScraper(Scraper): + name = "Duplicate Scraper" + is_available = True + source = "dupe" + + def gen(self): + yield scraped.Title("Arid Savannah Adventures") + yield scraped.OriginalTitle("Arid Savannah Hijinx") + yield scraped.URL("file:///home/savannah/adventures") + yield scraped.Language(enums.Language.EN) + yield scraped.Date.from_iso("2010-07-05") + yield scraped.Direction(enums.Direction["LEFT_TO_RIGHT"]) + yield scraped.Layout(enums.Layout.SINGLE) + yield scraped.Rating(enums.Rating.SAFE) + yield scraped.Category(enums.Category.MANGA) + yield scraped.Censorship(enums.Censorship.NONE) + yield scraped.Tag.from_string("animal:small") + yield scraped.Tag.from_string("animal:medium") + yield scraped.Tag.from_string("animal:big") + yield scraped.Tag.from_string("animal:massive") + yield scraped.Artist("alan smithee") + yield scraped.Artist("david agnew") + yield scraped.Character("greta giraffe") + yield scraped.Character("bob bear") + yield scraped.Character("rico rhinoceros") + yield scraped.Character("ziggy zebra") + yield scraped.Circle("archimedes") + yield scraped.World("animal friends") + + def scrape(self): + yield from self.gen() + yield from self.gen() + + class WarnScraper(Scraper): + name = "Warn Scraper" + is_available = True + source = "warn" + + def warn_on_purpose(self, item): + raise ScrapeWarning(f"Could not parse: {item}") + + def scrape(self): + yield scraped.Title("Arid Savannah Adventures") + yield lambda: self.warn_on_purpose("Arid Savannah Hijinx") + yield scraped.Language(enums.Language.EN) + + class FailScraper(Scraper): + name = "Fail Scraper" + is_available = True + source = "fail" + + def scrape(self): + yield scraped.Title("Arid Savannah Adventures") + raise ScrapeError("Could not continue") + yield scraped.Language(enums.Language.EN) + + class UnavailableScraper(Scraper): + name = "Unavailable Scraper" + is_available = False + source = "unavail" + + def scrape(self): + yield None + + hircine.plugins.register_scraper("good", GoodScraper) + hircine.plugins.register_scraper("dupe", DuplicateScraper) + hircine.plugins.register_scraper("warn", WarnScraper) + hircine.plugins.register_scraper("fail", FailScraper) + hircine.plugins.register_scraper("unavail", UnavailableScraper) + + return [ + ("good", GoodScraper), + ("dupe", DuplicateScraper), + ("warn", WarnScraper), + ("fail", FailScraper), + ("unavail", UnavailableScraper), + ] + + +@pytest.mark.anyio +async def test_comic_scrapers(gen_comic, query_comic_scrapers, scrapers): + comic = await DB.add(next(gen_comic)) + response = Response(await query_comic_scrapers(comic.id)) + + assert isinstance((response.data), list) + + available_scrapers = [] + for name, cls in sorted(scrapers, key=lambda s: s[1].name): + instance = cls(comic) + if instance.is_available: + available_scrapers.append((name, cls)) + + assert len(response.data) == len(available_scrapers) + + data = iter(response.data) + for id, scraper in available_scrapers: + field = next(data) + assert field["__typename"] == "ComicScraper" + assert field["id"] == id + assert field["name"] == scraper.name + + +@pytest.mark.anyio +async def test_comic_empty_for_missing_comic(gen_comic, query_comic_scrapers, scrapers): + response = Response(await query_comic_scrapers(1)) + + assert response.data == [] + + +@pytest.mark.anyio +async def test_scrape_comic(gen_comic, query_scrape_comic, scrapers): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "good")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == [] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] == "Arid Savannah Hijinx" + assert scraped_comic["url"] == "file:///home/savannah/adventures" + assert scraped_comic["language"] == "EN" + assert scraped_comic["date"] == "2010-07-05" + assert scraped_comic["rating"] == "SAFE" + assert scraped_comic["category"] == "MANGA" + assert scraped_comic["direction"] == "LEFT_TO_RIGHT" + assert scraped_comic["layout"] == "SINGLE" + assert scraped_comic["tags"] == [ + "animal:small", + "animal:medium", + "animal:big", + "animal:massive", + ] + assert scraped_comic["artists"] == ["alan smithee", "david agnew"] + assert scraped_comic["characters"] == [ + "greta giraffe", + "bob bear", + "rico rhinoceros", + "ziggy zebra", + ] + assert scraped_comic["circles"] == ["archimedes"] + assert scraped_comic["worlds"] == ["animal friends"] + + +@pytest.mark.anyio +async def test_scrape_comic_removes_duplicates(gen_comic, query_scrape_comic, scrapers): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "dupe")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == [] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] == "Arid Savannah Hijinx" + assert scraped_comic["url"] == "file:///home/savannah/adventures" + assert scraped_comic["language"] == "EN" + assert scraped_comic["date"] == "2010-07-05" + assert scraped_comic["rating"] == "SAFE" + assert scraped_comic["category"] == "MANGA" + assert scraped_comic["direction"] == "LEFT_TO_RIGHT" + assert scraped_comic["layout"] == "SINGLE" + assert scraped_comic["tags"] == [ + "animal:small", + "animal:medium", + "animal:big", + "animal:massive", + ] + assert scraped_comic["artists"] == ["alan smithee", "david agnew"] + assert scraped_comic["characters"] == [ + "greta giraffe", + "bob bear", + "rico rhinoceros", + "ziggy zebra", + ] + assert scraped_comic["circles"] == ["archimedes"] + assert scraped_comic["worlds"] == ["animal friends"] + + +@pytest.mark.anyio +async def test_scrape_comic_fails_comic_not_found(query_scrape_comic, scrapers): + response = Response(await query_scrape_comic(1, "good")) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.anyio +async def test_scrape_comic_fails_scraper_not_found( + gen_comic, query_scrape_comic, scrapers +): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "missing")) + response.assert_is("ScraperNotFoundError") + + assert response.name == "missing" + assert response.message == "Scraper not found: 'missing'" + + +@pytest.mark.anyio +async def test_scrape_comic_fails_scraper_not_available( + gen_comic, query_scrape_comic, scrapers +): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "unavail")) + response.assert_is("ScraperNotAvailableError") + + assert response.scraper == "unavail" + assert response.comicId == comic.id + assert response.message == f"Scraper unavail not available for comic ID {comic.id}" + + +async def test_scrape_comic_with_transformer(gen_comic, query_scrape_comic, scrapers): + def keep(generator, info): + for item in generator: + match item: + case scraped.Title(): + yield item + + hircine.plugins.transformers = [keep] + + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "good")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == [] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] is None + assert scraped_comic["url"] is None + assert scraped_comic["language"] is None + assert scraped_comic["date"] is None + assert scraped_comic["rating"] is None + assert scraped_comic["category"] is None + assert scraped_comic["censorship"] is None + assert scraped_comic["direction"] is None + assert scraped_comic["layout"] is None + assert scraped_comic["tags"] == [] + assert scraped_comic["artists"] == [] + assert scraped_comic["characters"] == [] + assert scraped_comic["circles"] == [] + assert scraped_comic["worlds"] == [] + + +@pytest.mark.anyio +async def test_scrape_comic_catches_warnings(gen_comic, query_scrape_comic, scrapers): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "warn")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == ["Could not parse: Arid Savannah Hijinx"] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] is None + assert scraped_comic["language"] == "EN" + assert scraped_comic["date"] is None + assert scraped_comic["rating"] is None + assert scraped_comic["category"] is None + assert scraped_comic["direction"] is None + assert scraped_comic["layout"] is None + assert scraped_comic["tags"] == [] + assert scraped_comic["artists"] == [] + assert scraped_comic["characters"] == [] + assert scraped_comic["circles"] == [] + assert scraped_comic["worlds"] == [] + + +@pytest.mark.anyio +async def test_scrape_comic_fails_with_scraper_error( + gen_comic, query_scrape_comic, scrapers +): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "fail")) + response.assert_is("ScraperError") + assert response.message == "Scraping failed: Could not continue" diff --git a/tests/api/test_sort.py b/tests/api/test_sort.py new file mode 100644 index 0000000..b3c8562 --- /dev/null +++ b/tests/api/test_sort.py @@ -0,0 +1,137 @@ +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace + + +@pytest.fixture +def query_comic_sort(execute_sort): + query = """ + query comics($sort: ComicSortInput) { + comics(sort: $sort) { + __typename + count + edges { + id + title + } + } + } + """ + + return execute_sort(query) + + +@pytest.fixture +def query_namespace_sort(execute_sort): + query = """ + query namespaces($sort: NamespaceSortInput) { + namespaces(sort: $sort) { + __typename + count + edges { + id + name + } + } + } + """ + + return execute_sort(query) + + +@pytest.mark.parametrize( + "sort,reverse", + [ + ({"on": "DATE"}, False), + ({"on": "DATE", "direction": "DESCENDING"}, True), + ({"on": "DATE", "direction": "ASCENDING"}, False), + ], + ids=[ + "ascending (default)", + "descending", + "ascending", + ], +) +@pytest.mark.anyio +async def test_query_comics_sort_date(gen_comic, query_comic_sort, sort, reverse): + comics = await DB.add_all(*gen_comic) + ids = [c.id for c in sorted(comics, key=lambda c: c.date, reverse=reverse)] + + response = Response(await query_comic_sort(sort)) + response.assert_is("ComicFilterResult") + + assert ids == [edge["id"] for edge in response.edges] + + +@pytest.mark.parametrize( + "sort,reverse", + [ + ({"on": "TAG_COUNT"}, False), + ({"on": "TAG_COUNT", "direction": "DESCENDING"}, True), + ({"on": "TAG_COUNT", "direction": "ASCENDING"}, False), + ], + ids=[ + "ascending (default)", + "descending", + "ascending", + ], +) +@pytest.mark.anyio +async def test_query_comics_sort_tag_count(gen_comic, query_comic_sort, sort, reverse): + comics = await DB.add_all(*gen_comic) + ids = [c.id for c in sorted(comics, key=lambda c: len(c.tags), reverse=reverse)] + + response = Response(await query_comic_sort(sort)) + response.assert_is("ComicFilterResult") + + assert ids == [edge["id"] for edge in response.edges] + + +@pytest.mark.anyio +async def test_query_comics_sort_random(gen_comic, query_comic_sort): + comics = await DB.add_all(*gen_comic) + ids = set([c.id for c in comics]) + + response = Response(await query_comic_sort({"on": "RANDOM"})) + response.assert_is("ComicFilterResult") + + assert ids == set(edge["id"] for edge in response.edges) + + +@pytest.mark.anyio +async def test_query_comics_sort_random_seed_direction(gen_comic, query_comic_sort): + comics = await DB.add_all(*gen_comic) + ids = set([c.id for c in comics]) + + response = Response( + await query_comic_sort( + {"on": "RANDOM", "seed": 42069, "direction": "ASCENDING"} + ) + ) + response.assert_is("ComicFilterResult") + + ascending_ids = [edge["id"] for edge in response.edges] + + assert ids == set(ascending_ids) + + response = Response( + await query_comic_sort( + {"on": "RANDOM", "seed": 42069, "direction": "DESCENDING"} + ) + ) + response.assert_is("ComicFilterResult") + + descending_ids = [edge["id"] for edge in response.edges] + + assert ascending_ids == descending_ids[::-1] + + +@pytest.mark.anyio +async def test_query_namespace_sort_sort_name(query_namespace_sort): + await DB.add(Namespace(name="one", sort_name="2")) + await DB.add(Namespace(name="two", sort_name="1")) + + response = Response(await query_namespace_sort({"on": "SORT_NAME"})) + response.assert_is("NamespaceFilterResult") + + assert ["two", "one"] == [edge["name"] for edge in response.edges] diff --git a/tests/api/test_tag.py b/tests/api/test_tag.py new file mode 100644 index 0000000..c863a00 --- /dev/null +++ b/tests/api/test_tag.py @@ -0,0 +1,441 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace, Tag + + +@pytest.fixture +def query_tag(execute_id): + query = """ + query tag($id: Int!) { + tag(id: $id) { + __typename + ... on FullTag { + id + name + description + namespaces { + __typename + id + } + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_tags(execute): + query = """ + query tags { + tags { + __typename + count + edges { + id + name + description + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_tag(execute_add): + mutation = """ + mutation addTag($input: AddTagInput!) { + addTag(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_tags(execute_update): + mutation = """ + mutation updateTags($ids: [Int!]!, $input: UpdateTagInput!) { + updateTags(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_tags(execute_delete): + mutation = """ + mutation deleteTags($ids: [Int!]!) { + deleteTags(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_tag(query_tag, gen_tag): + tag = await DB.add(next(gen_tag)) + + response = Response(await query_tag(tag.id)) + response.assert_is("FullTag") + + assert response.id == tag.id + assert response.name == tag.name + assert response.description == tag.description + assert set([n["id"] for n in response.namespaces]) == set( + [n.id for n in tag.namespaces] + ) + + +@pytest.mark.anyio +async def test_query_tag_fails_not_found(query_tag): + response = Response(await query_tag(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_tags(query_tags, gen_tag): + tags = await DB.add_all(*gen_tag) + response = Response(await query_tags()) + response.assert_is("TagFilterResult") + + assert response.count == len(tags) + assert isinstance((response.edges), list) + assert len(response.edges) == len(tags) + + edges = iter(response.edges) + for tag in sorted(tags, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == tag.id + assert edge["name"] == tag.name + assert edge["description"] == tag.description + + +@pytest.mark.anyio +async def test_add_tag(add_tag): + response = Response( + await add_tag({"name": "added", "description": "it's been added!"}) + ) + response.assert_is("AddSuccess") + + tag = await DB.get(Tag, response.id) + assert tag is not None + assert tag.name == "added" + assert tag.description == "it's been added!" + + +@pytest.mark.anyio +async def test_add_tag_with_namespace(add_tag): + namespace = await DB.add(Namespace(id=1, name="new")) + + response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) + response.assert_is("AddSuccess") + + tag = await DB.get(Tag, response.id, full=True) + assert tag is not None + assert tag.name == "added" + assert tag.namespaces[0].id == namespace.id + assert tag.namespaces[0].name == namespace.name + + +@pytest.mark.anyio +async def test_add_tag_fails_empty_parameter(add_tag): + response = Response(await add_tag({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_tag_fails_namespace_not_found(add_tag): + response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_add_tag_fails_exists(add_tag, gen_tag): + tag = await DB.add(next(gen_tag)) + + response = Response(await add_tag({"name": tag.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Tag with this name exists" + + +@pytest.mark.anyio +async def test_delete_tag(delete_tags, gen_tag): + tag = await DB.add(next(gen_tag)) + id = tag.id + + response = Response(await delete_tags(id)) + response.assert_is("DeleteSuccess") + + tag = await DB.get(Tag, id) + assert tag is None + + +@pytest.mark.anyio +async def test_delete_tag_not_found(delete_tags): + response = Response(await delete_tags(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_tag(update_tags, gen_tag, gen_namespace): + tag = await DB.add(next(gen_tag)) + namespace = await DB.add(next(gen_namespace)) + + input = { + "name": "updated", + "description": "how different, how unique", + "namespaces": {"ids": [1]}, + } + response = Response(await update_tags(tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, tag.id, full=True) + assert tag is not None + assert tag.name == "updated" + assert tag.description == "how different, how unique" + assert tag.namespaces[0].id == namespace.id + assert tag.namespaces[0].name == namespace.name + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "with None", + "with empty string", + ], +) +@pytest.mark.anyio +async def test_update_tag_clears_description(update_tags, gen_tag, empty): + tag = await DB.add(next(gen_tag)) + + input = { + "description": empty, + } + response = Response(await update_tags(tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, tag.id) + assert tag is not None + assert tag.description is None + + +@pytest.mark.anyio +async def test_update_tag_fails_exists(update_tags, gen_tag): + first = await DB.add(next(gen_tag)) + second = await DB.add(next(gen_tag)) + + response = Response(await update_tags(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Tag with this name exists" + + +@pytest.mark.anyio +async def test_update_tag_fails_not_found(update_tags): + response = Response(await update_tags(1, {"name": "updated"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_tags_cannot_bulk_edit_name(update_tags, gen_tag): + first = await DB.add(next(gen_tag)) + second = await DB.add(next(gen_tag)) + + response = Response(await update_tags([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_tag_fails_empty_parameter(update_tags, gen_tag, empty): + tag = await DB.add(next(gen_tag)) + response = Response(await update_tags(tag.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_tag_fails_namespace_not_found(update_tags, gen_tag): + tag = await DB.add(next(gen_tag)) + response = Response(await update_tags(tag.id, {"namespaces": {"ids": [1]}})) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.parametrize( + "options", + [ + None, + {}, + {"mode": "REPLACE"}, + ], + ids=[ + "by default (none)", + "by default (empty record)", + "when defined explicitly", + ], +) +@pytest.mark.anyio +async def test_update_tag_replaces_assocs(update_tags, gen_tag, options): + original_tag = await DB.add(next(gen_tag)) + new_namespace = await DB.add(Namespace(name="new")) + + input = { + "namespaces": {"ids": [new_namespace.id]}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([o.id for o in [new_namespace]]) + + +@pytest.mark.anyio +async def test_update_tag_adds_assocs(update_tags, gen_tag): + original_tag = await DB.add(next(gen_tag)) + new_namespace = await DB.add(Namespace(name="new")) + added_namespaces = original_tag.namespaces + [new_namespace] + + input = { + "namespaces": {"ids": [new_namespace.id], "options": {"mode": "ADD"}}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([o.id for o in added_namespaces]) + + +@pytest.mark.anyio +async def test_update_tag_removes_assocs(update_tags): + removed_namespace = Namespace(id=1, name="new") + remaining_namespace = Namespace(id=2, name="newtwo") + original_tag = await DB.add( + Tag(id=1, name="tag", namespaces=[removed_namespace, remaining_namespace]) + ) + + input = { + "namespaces": {"ids": [removed_namespace.id], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([remaining_namespace.id]) + + +@pytest.mark.anyio +async def test_update_tag_changes_updated_at(update_tags): + original_tag = Tag(name="tag") + original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_tag = await DB.add(original_tag) + + response = Response(await update_tags(original_tag.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id) + assert tag.updated_at > original_tag.updated_at + + +@pytest.mark.anyio +async def test_update_tag_assoc_changes_updated_at(update_tags): + original_tag = Tag(name="tag") + original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_tag = await DB.add(original_tag) + await DB.add(Namespace(id=1, name="namespace")) + + response = Response( + await update_tags(original_tag.id, {"namespaces": {"ids": [1]}}) + ) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id) + assert tag.updated_at > original_tag.updated_at diff --git a/tests/api/test_world.py b/tests/api/test_world.py new file mode 100644 index 0000000..a3926d1 --- /dev/null +++ b/tests/api/test_world.py @@ -0,0 +1,278 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import World + + +@pytest.fixture +def query_world(execute_id): + query = """ + query world($id: Int!) { + world(id: $id) { + __typename + ... on World { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_worlds(execute): + query = """ + query worlds { + worlds { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_world(execute_add): + mutation = """ + mutation addWorld($input: AddWorldInput!) { + addWorld(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_worlds(execute_update): + mutation = """ + mutation updateWorlds($ids: [Int!]!, $input: UpdateWorldInput!) { + updateWorlds(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_worlds(execute_delete): + mutation = """ + mutation deleteWorlds($ids: [Int!]!) { + deleteWorlds(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_world(query_world, gen_world): + world = await DB.add(next(gen_world)) + + response = Response(await query_world(world.id)) + response.assert_is("World") + + assert response.id == world.id + assert response.name == world.name + + +@pytest.mark.anyio +async def test_query_world_fails_not_found(query_world): + response = Response(await query_world(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "World ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_worlds(query_worlds, gen_world): + worlds = await DB.add_all(*gen_world) + + response = Response(await query_worlds()) + response.assert_is("WorldFilterResult") + + assert response.count == len(worlds) + assert isinstance((response.edges), list) + assert len(response.edges) == len(worlds) + + edges = iter(response.edges) + for world in sorted(worlds, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == world.id + assert edge["name"] == world.name + + +@pytest.mark.anyio +async def test_add_world(add_world): + response = Response(await add_world({"name": "added world"})) + response.assert_is("AddSuccess") + + world = await DB.get(World, response.id) + assert world is not None + assert world.name == "added world" + + +@pytest.mark.anyio +async def test_add_world_fails_empty_parameter(add_world): + response = Response(await add_world({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_world_fails_exists(add_world, gen_world): + world = await DB.add(next(gen_world)) + + response = Response(await add_world({"name": world.name})) + response.assert_is("NameExistsError") + assert response.message == "Another World with this name exists" + + +@pytest.mark.anyio +async def test_delete_world(delete_worlds, gen_world): + world = await DB.add(next(gen_world)) + id = world.id + + response = Response(await delete_worlds(id)) + response.assert_is("DeleteSuccess") + + world = await DB.get(World, id) + assert world is None + + +@pytest.mark.anyio +async def test_delete_world_not_found(delete_worlds): + response = Response(await delete_worlds(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "World ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_world(update_worlds, gen_world): + world = await DB.add(next(gen_world)) + + input = {"name": "updated world"} + response = Response(await update_worlds(world.id, input)) + response.assert_is("UpdateSuccess") + + world = await DB.get(World, world.id) + assert world is not None + assert world.name == "updated world" + + +@pytest.mark.anyio +async def test_update_world_fails_exists(update_worlds, gen_world): + first = await DB.add(next(gen_world)) + second = await DB.add(next(gen_world)) + + response = Response(await update_worlds(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another World with this name exists" + + +@pytest.mark.anyio +async def test_update_world_fails_not_found(update_worlds): + response = Response(await update_worlds(1, {"name": "updated world"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "World ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_worlds_cannot_bulk_edit_name(update_worlds, gen_world): + first = await DB.add(next(gen_world)) + second = await DB.add(next(gen_world)) + + response = Response(await update_worlds([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_world_fails_empty_parameter(update_worlds, gen_world, empty): + world = await DB.add(next(gen_world)) + + response = Response(await update_worlds(world.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_world_changes_updated_at(update_worlds): + original_world = World(name="world") + original_world.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_world = await DB.add(original_world) + + response = Response(await update_worlds(original_world.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + world = await DB.get(World, original_world.id) + assert world.updated_at > original_world.updated_at diff --git a/tests/config/data/config.toml b/tests/config/data/config.toml new file mode 100644 index 0000000..2a21e03 --- /dev/null +++ b/tests/config/data/config.toml @@ -0,0 +1,3 @@ +database = "foo" +scan = "bar" +objects = "baz" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a36be2d --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,594 @@ +import os +import shutil +from datetime import date, timedelta +from datetime import datetime as dt +from datetime import timezone as tz + +import hircine +import hircine.db as database +import hircine.db.models as models +import hircine.plugins +import pytest +from hircine.app import schema +from hircine.enums import Category, Censorship, Direction, Language, Layout, Rating +from sqlalchemy.ext.asyncio import AsyncSession + + +@pytest.fixture(scope="session") +def anyio_backend(): + return "asyncio" + + +def pytest_addoption(parser): + parser.addoption( + "--sql-echo", + action="store_true", + help="Enable logging of SQL statements", + ) + + +@pytest.fixture +def empty_plugins(monkeypatch): + monkeypatch.setattr(hircine.plugins, "scraper_registry", {}) + monkeypatch.setattr(hircine.plugins, "transformers", []) + + +@pytest.fixture +def data(tmpdir, request): + file = request.module.__file__ + data = os.path.join(os.path.dirname(file), "data") + + if os.path.isdir(data): + shutil.copytree(data, tmpdir, dirs_exist_ok=True) + + return lambda dir: os.path.join(tmpdir, dir) + + +@pytest.fixture(scope="session") +def engine(pytestconfig): + yield database.create_engine(":memory:", echo=pytestconfig.option.sql_echo) + + +@pytest.fixture +async def session(anyio_backend, engine): + async with engine.begin() as conn: + await conn.begin_nested() + yield AsyncSession(conn, expire_on_commit=False, autoflush=False) + await conn.rollback() + + +@pytest.fixture(autouse=True) +async def patch_session(anyio_backend, session, engine, monkeypatch): + monkeypatch.setattr(hircine.db, "session", lambda: session) + + +@pytest.fixture(scope="session", autouse=True) +async def metadata(engine, anyio_backend): + await database.initialize(engine) + + +@pytest.fixture +def schema_execute(): + async def _execute(endpoint, variables=None): + return await schema.execute(endpoint, variable_values=variables) + + return _execute + + +@pytest.fixture +def execute(schema_execute): + def wrapper(q): + async def _execute(): + return await schema_execute(q) + + return _execute + + return wrapper + + +@pytest.fixture +def execute_add(schema_execute): + def wrapper(q): + async def _execute(input): + return await schema_execute(q, {"input": input}) + + return _execute + + return wrapper + + +@pytest.fixture +def execute_update(schema_execute): + def wrapper(q): + async def _execute(ids, input): + return await schema_execute(q, {"ids": ids, "input": input}) + + return _execute + + return wrapper + + +@pytest.fixture +def execute_update_single(schema_execute): + def wrapper(q): + async def _execute(id, input): + return await schema_execute(q, {"id": id, "input": input}) + + return _execute + + return wrapper + + +@pytest.fixture +def execute_delete(schema_execute): + def wrapper(q): + async def _execute(ids): + return await schema_execute(q, {"ids": ids}) + + return _execute + + return wrapper + + +@pytest.fixture +def execute_id(schema_execute): + def wrapper(q): + async def _execute(id): + return await schema_execute(q, {"id": id}) + + return _execute + + return wrapper + + +@pytest.fixture +def execute_filter(schema_execute): + def wrapper(q): + async def _execute(filter=None): + return await schema_execute(q, {"filter": filter} if filter else None) + + return _execute + + return wrapper + + +@pytest.fixture +def execute_sort(schema_execute): + def wrapper(q): + async def _execute(sort=None): + return await schema_execute(q, {"sort": sort} if sort else None) + + return _execute + + return wrapper + + +class DB: + @staticmethod + async def add(model): + async with database.session() as s: + s.add(model) + await s.commit() + return model + + @staticmethod + async def add_all(*models): + async with database.session() as s: + s.add_all(models) + await s.commit() + return models + + @staticmethod + async def get(modelcls, id, full=False): + async with database.session() as s: + options = modelcls.load_full() if full else [] + model = await s.get(modelcls, id, options=options) + return model + + @staticmethod + async def delete(modelcls, id): + async with database.session() as s: + model = await s.get(modelcls, id) + await s.delete(model) + await s.commit() + return + + +class Response: + def __init__(self, response, key=None): + assert response.errors is None + + if key is None: + assert response.data is not None + assert len(response.data) == 1 + key = next(iter(response.data.keys())) + + assert key in response.data + self.data = response.data.get(key) + self.errors = response.errors + + def __getattr__(self, name): + assert name in self.data + return self.data.get(name) + + def assert_is(self, typename): + assert self.data["__typename"] == typename + + +@pytest.fixture +def gen_artist(): + def _gen(): + yield models.Artist(id=1, name="alan smithee") + yield models.Artist(id=2, name="david agnew") + yield models.Artist(id=3, name="robin bland") + yield models.Artist(id=4, name="robin smith") + + return _gen() + + +@pytest.fixture +def gen_character(): + def _gen(): + yield models.Character(id=1, name="greta giraffe") + yield models.Character(id=2, name="bob bear") + yield models.Character(id=3, name="rico rhinoceros") + yield models.Character(id=4, name="ziggy zebra") + + return _gen() + + +@pytest.fixture +def gen_circle(): + def _gen(): + yield models.Circle(id=1, name="archimedes") + yield models.Circle(id=2, name="bankoff") + yield models.Circle(id=3, name="carlyle") + yield models.Circle(id=4, name="ford") + + return _gen() + + +@pytest.fixture +def gen_namespace(): + def _gen(): + yield models.Namespace(id=1, name="animal", sort_name="animal") + yield models.Namespace(id=2, name="human", sort_name="human") + + return _gen() + + +@pytest.fixture +def gen_tag(): + def _gen(): + yield models.Tag( + id=1, name="small", description="barely visible", namespaces=[] + ) + yield models.Tag( + id=2, + name="medium", + description="mostly average", + namespaces=[], + ) + yield models.Tag(id=3, name="big", description="impressive", namespaces=[]) + yield models.Tag( + id=4, name="massive", description="what is THAT", namespaces=[] + ) + + return _gen() + + +@pytest.fixture +def gen_world(): + def _gen(): + yield models.World(id=1, name="animal friends") + yield models.World(id=2, name="criminanimals") + yield models.World(id=3, name="in the nude") + yield models.World(id=4, name="wall street") + + return _gen() + + +@pytest.fixture +def gen_image(): + def _gen(): + yield models.Image( + id=1, hash="1bb05614b44bf177589632a51ce216a2", width=3024, height=2106 + ) + yield models.Image( + id=2, hash="77dfd96aee1bc8c36ab7095fcf18f7ff", width=3024, height=2094 + ) + yield models.Image( + id=3, hash="109aac22f29bd361fbfb19f975a1b7f0", width=3019, height=2089 + ) + yield models.Image( + id=4, hash="e18fc95f00a087ff001ecd8675eddd14", width=3024, height=2097 + ) + yield models.Image( + id=5, hash="0e2cd2f176e792a3777710978768bc90", width=1607, height=2259 + ) + yield models.Image( + id=6, hash="64e50730eb842750ebe5417a524b83e6", width=1556, height=2264 + ) + yield models.Image( + id=7, hash="d906ef54788cae72e1a511c9775e6d68", width=1525, height=2259 + ) + yield models.Image( + id=8, hash="0f8ead4a60df09a1dd071617b5d5583b", width=1545, height=2264 + ) + yield models.Image( + id=9, hash="912ccb4350fb17ea1248e26ecfb5d983", width=1607, height=2259 + ) + yield models.Image( + id=10, hash="108edee1b417f022a6d1f999bd32d16d", width=1546, height=2224 + ) + yield models.Image( + id=11, hash="97c0903cb0962741174f264aaa7015d4", width=1528, height=2257 + ) + yield models.Image( + id=12, hash="b5490ad31d2a8910087ba932073b4e52", width=1543, height=2271 + ) + yield models.Image( + id=13, hash="c9ab7febcb81974a992ed1de60c728ba", width=1611, height=2257 + ) + yield models.Image( + id=14, hash="bcfdf22ec17a09cd4f6a0af86e966e8f", width=1553, height=2265 + ) + yield models.Image( + id=15, hash="1f58f4b08bf6f4ca92bd29cbce26241e", width=1526, height=2258 + ) + yield models.Image( + id=16, hash="f87d7e55203b5e7cf9c801db48624ef0", width=1645, height=2262 + ) + + return _gen() + + +@pytest.fixture +def gen_page(gen_image): + def _gen(): + yield models.Page(id=1, index=1, path="001.png", image=next(gen_image)) + yield models.Page(id=2, index=2, path="002.png", image=next(gen_image)) + yield models.Page(id=3, index=3, path="003.png", image=next(gen_image)) + yield models.Page(id=4, index=4, path="004.png", image=next(gen_image)) + yield models.Page(id=5, index=1, path="00.jpg", image=next(gen_image)) + yield models.Page(id=6, index=2, path="01.jpg", image=next(gen_image)) + yield models.Page(id=7, index=3, path="02.jpg", image=next(gen_image)) + yield models.Page(id=8, index=4, path="03.jpg", image=next(gen_image)) + yield models.Page(id=9, index=1, path="1.jpg", image=next(gen_image)) + yield models.Page(id=10, index=2, path="2.jpg", image=next(gen_image)) + yield models.Page(id=11, index=3, path="10.jpg", image=next(gen_image)) + yield models.Page(id=12, index=4, path="11.jpg", image=next(gen_image)) + yield models.Page(id=13, index=1, path="010.png", image=next(gen_image)) + yield models.Page(id=14, index=2, path="011.png", image=next(gen_image)) + yield models.Page(id=15, index=3, path="012.png", image=next(gen_image)) + yield models.Page(id=16, index=4, path="013.png", image=next(gen_image)) + + return _gen() + + +@pytest.fixture +def gen_jumbled_pages(gen_image): + def _gen(): + yield models.Page(id=101, index=3, path="3.png", image=next(gen_image)) + yield models.Page(id=52, index=9, path="9.png", image=next(gen_image)) + yield models.Page(id=13, index=2, path="2.png", image=next(gen_image)) + yield models.Page(id=258, index=10, path="10.png", image=next(gen_image)) + yield models.Page(id=7, index=7, path="7.jpg", image=next(gen_image)) + yield models.Page(id=25, index=5, path="5.jpg", image=next(gen_image)) + yield models.Page(id=150, index=1, path="1.jpg", image=next(gen_image)) + yield models.Page(id=69, index=4, path="4.jpg", image=next(gen_image)) + yield models.Page(id=219, index=6, path="6.jpg", image=next(gen_image)) + yield models.Page(id=34, index=8, path="8.jpg", image=next(gen_image)) + + return _gen() + + +@pytest.fixture +def gen_jumbled_archive(gen_jumbled_pages): + def _gen(): + pages = [next(gen_jumbled_pages) for _ in range(10)] + yield models.Archive( + id=100, + hash="4e1243bd22c66e76c2ba9eddc1f91394", + path="comics/jumbled.zip", + size=32559235, + mtime=dt(2002, 1, 23).astimezone(), + cover=pages[0].image, + pages=pages, + page_count=len(pages), + ) + + return _gen() + + +@pytest.fixture +def gen_archive(gen_page): + def _gen(): + pages = [next(gen_page) for _ in range(4)] + yield models.Archive( + id=1, + hash="1d394f66c49ccb1d3c30870904d31bd4", + path="comics/archive-01.zip", + size=7340032, + mtime=dt(2016, 5, 10).astimezone(), + cover=pages[0].image, + pages=pages, + page_count=len(pages), + ) + + pages = [next(gen_page) for _ in range(4)] + yield models.Archive( + id=2, + hash="d7d8929b2e606200e863d390f71b53bb", + path="comics/archive-02.zip", + size=11335106, + mtime=dt(2008, 10, 2, tzinfo=tz(timedelta(hours=+6))), + cover=pages[0].image, + pages=pages, + page_count=len(pages), + ) + + pages = [next(gen_page) for _ in range(4)] + yield models.Archive( + id=3, + hash="02669dbe08c4a5f4820c10b3ff2178fa", + path="comics/sub/archive-new.zip", + size=51841969, + mtime=dt(2005, 11, 17, tzinfo=tz(timedelta(hours=+2))), + cover=pages[0].image, + pages=pages, + page_count=len(pages), + ) + + pages = [next(gen_page) for _ in range(4)] + yield models.Archive( + id=4, + hash="6b2ecf5ceb8befd6d0c1cd353a3df709", + path="comics/archive-03.zip", + size=13568769, + mtime=dt(1999, 5, 8, tzinfo=tz(timedelta(hours=-2))), + cover=pages[0].image, + pages=pages, + page_count=len(pages), + ) + + return _gen() + + +@pytest.fixture +def gen_comic( + gen_archive, + gen_artist, + gen_character, + gen_circle, + gen_world, + gen_tag, + gen_namespace, +): + def _gen(): + artists = {a.id: a for a in gen_artist} + characters = {c.id: c for c in gen_character} + + namespaces = {ns.id: ns for ns in gen_namespace} + tags = {t.id: t for t in gen_tag} + + def tag(nid, tid): + return models.ComicTag(namespace=namespaces[nid], tag=tags[tid]) + + archive = next(gen_archive) + yield models.Comic( + id=1, + title="Arid Savannah Adventures", + url="file:///home/savannah/adventures", + category=Category.MANGA, + censorship=Censorship.NONE, + date=date(2010, 7, 5), + direction=Direction.LEFT_TO_RIGHT, + favourite=True, + language=Language.EN, + layout=Layout.SINGLE, + rating=Rating.SAFE, + archive=archive, + artists=[artists[1], artists[2]], + characters=list(characters.values()), + circles=[next(gen_circle)], + worlds=[next(gen_world)], + cover=archive.cover, + pages=archive.pages, + tags=[ + tag(1, 1), + tag(1, 2), + tag(1, 3), + tag(1, 4), + ], + ) + + archive = next(gen_archive) + yield models.Comic( + id=2, + title="This Giraffe Stole My Wallet", + original_title="Diese Giraffe hat mein Geldbeutel geklaut", + url="ftp://crimes.local/giraffes.zip", + category=Category.MANGA, + censorship=Censorship.BAR, + date=date(2002, 2, 17), + direction=Direction.LEFT_TO_RIGHT, + favourite=False, + language=Language.EN, + layout=Layout.SINGLE, + rating=Rating.QUESTIONABLE, + archive=archive, + artists=[artists[3]], + characters=[characters[1]], + circles=[next(gen_circle)], + worlds=[next(gen_world)], + cover=archive.cover, + pages=archive.pages, + tags=[ + tag(1, 3), + tag(2, 1), + ], + ) + + archive = next(gen_archive) + yield models.Comic( + id=3, + title="サイのスパ", + category=Category.ARTBOOK, + censorship=Censorship.MOSAIC, + date=date(2017, 5, 3), + direction=Direction.RIGHT_TO_LEFT, + favourite=False, + language=Language.JA, + layout=Layout.DOUBLE_OFFSET, + rating=Rating.EXPLICIT, + archive=archive, + artists=[artists[1], artists[4]], + characters=[characters[3]], + circles=[next(gen_circle)], + worlds=[next(gen_world)], + cover=archive.cover, + pages=archive.pages, + tags=[ + tag(1, 4), + ], + ) + + archive = next(gen_archive) + yield models.Comic( + id=4, + title="In the Company of Vultures", + category=Category.DOUJINSHI, + date=date(2023, 3, 10), + direction=Direction.LEFT_TO_RIGHT, + favourite=False, + language=Language.EN, + layout=Layout.SINGLE, + rating=Rating.SAFE, + archive=archive, + artists=[artists[4]], + characters=[characters[4]], + circles=[next(gen_circle)], + worlds=[next(gen_world)], + cover=archive.cover, + pages=archive.pages, + tags=[ + tag(2, 1), + tag(2, 2), + tag(2, 3), + ], + ) + + return _gen() + + +@pytest.fixture +def empty_comic(gen_archive): + archive = next(gen_archive) + yield models.Comic( + id=100, + title="Hic Sunt Dracones", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) diff --git a/tests/plugins/test_plugins.py b/tests/plugins/test_plugins.py new file mode 100644 index 0000000..dd7042e --- /dev/null +++ b/tests/plugins/test_plugins.py @@ -0,0 +1,9 @@ +import hircine.plugins + + +def test_plugin_transformer_decorator(empty_plugins): + @hircine.plugins.transformer + def ignore(generator, info): + return + + assert hircine.plugins.transformers == [ignore] diff --git a/tests/scanner/data/contents/archive.zip b/tests/scanner/data/contents/archive.zip Binary files differnew file mode 100644 index 0000000..990eb98 --- /dev/null +++ b/tests/scanner/data/contents/archive.zip diff --git a/tests/scanner/test_scanner.py b/tests/scanner/test_scanner.py new file mode 100644 index 0000000..45a966f --- /dev/null +++ b/tests/scanner/test_scanner.py @@ -0,0 +1,311 @@ +import configparser +import os +import shutil +from datetime import datetime, timezone +from pathlib import Path +from zipfile import ZipFile + +import hircine.thumbnailer +import pytest +from conftest import DB +from hircine.config import DirectoryStructure +from hircine.db.models import Archive, Image, Page +from hircine.scanner import Scanner, Status +from hircine.thumbnailer import object_path + + +def pageset(pages): + return set([(page.path, page.archive_id, page.image.hash) for page in pages]) + + +@pytest.fixture +def archive(data): + stat = os.stat(data("contents/archive.zip")) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + cover = Image( + id=1, + hash="4ac228082aaf8bedc0fbd4859c5324c2acf0d1c63f9097d55e9be88d0804eaa4", + width=0, + height=0, + ) + + archive = Archive( + id=1, + path=data("contents/archive.zip"), + hash="8aa2fd72954fb9103776114172d345ad4446babf292e876a892cfbed1c283523", + size=stat.st_size, + mtime=mtime, + cover=cover, + pages=[ + Page( + id=1, + archive_id=1, + index=1, + path="01.png", + image=cover, + ), + Page( + id=2, + archive_id=1, + index=2, + path="02.png", + image=Image( + id=2, + hash="9b2c7a9c1f3d1c5a07fa1492d9d91ace5122262559c7f513e3b97464d2edb753", + width=0, + height=0, + ), + ), + Page( + id=3, + archive_id=1, + index=3, + path="03.png", + image=Image( + id=3, + hash="ed132e79daf9e93970d14d9443b7870f1aefd12aa9d3fba8cab0096984754ff5", + width=0, + height=0, + ), + ), + ], + page_count=3, + ) + + yield archive + + +@pytest.fixture +def scanner(data, monkeypatch): + monkeypatch.setattr( + hircine.thumbnailer.Thumbnailer, "process", lambda s, a, b: (0, 0) + ) + + dirs = DirectoryStructure(scan=data("contents/"), objects=data("objects/")) + yield Scanner(configparser.ConfigParser(), dirs) + + +@pytest.mark.anyio +async def test_scanner_adds_new_archive(archive, scanner, capsys): + await scanner.scan() + added_archive = await DB.get(Archive, 1, full=True) + + assert added_archive.hash == archive.hash + assert pageset(added_archive.pages) == pageset(archive.pages) + + captured = capsys.readouterr() + assert captured.out == "[+] archive.zip\n" + + +@pytest.mark.anyio +async def test_scanner_dedups_archive_contents(archive, scanner, capsys): + archive = await DB.add(archive) + + dedup_path = archive.path + ".dedup" + with ZipFile(archive.path, "r") as zin: + with ZipFile(dedup_path, "w") as zout: + for info in zin.infolist(): + base, ext = os.path.splitext(info.filename) + + if base == "03": + continue + + if ext == ".png": + zout.writestr(f"0{base}.png", zin.read(info)) + else: + zout.writestr(info.filename, zin.read(info)) + + await scanner.scan() + added_archive = await DB.get(Archive, 2, full=True) + + assert ( + added_archive.hash + == "fc2ea810eddc231824aef44db62d5f3de89b3747e4aea6b5728c1532aabdeccd" + ) + + pages = set() + for page in archive.pages: + if page.path == "03.png": + continue + + pages.add((f"0{page.path}", 2, page.image.hash)) + + assert pageset(added_archive.pages) == pages + + captured = capsys.readouterr() + assert captured.out == "[+] archive.zip.dedup\n" + + +@pytest.mark.anyio +async def test_scanner_skips_same_mtime(archive, scanner, capsys): + archive = await DB.add(archive) + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_finds_existing_before_duplicate(archive, scanner, capsys): + stat = os.stat(archive.path) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + before = await DB.add(archive) + + copy_path = before.path + ".copy" + shutil.copyfile(Path(before.path), copy_path) + + await scanner.scan() + + after = await DB.get(Archive, before.id, full=True) + assert after.hash == before.hash + assert after.path == before.path + assert after.mtime == mtime + assert pageset(after.pages) == pageset(before.pages) + + captured = capsys.readouterr() + assert captured.out == "[I] archive.zip.copy\n" + + +@pytest.mark.anyio +async def test_scanner_skips_non_zip(data, scanner, capsys): + Path(data("contents/archive.zip")).unlink() + Path(data("contents/non_zip.txt")).touch() + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_skips_link(data, scanner, capsys): + Path(data("contents/archive.zip")).rename(data("archive.zip")) + os.symlink(data("archive.zip"), data("contents/archive.zip")) + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_updates_mtime(archive, scanner, capsys): + Path(archive.path).touch() + stat = os.stat(archive.path) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + archive = await DB.add(archive) + await scanner.scan() + + updated_archive = await DB.get(Archive, archive.id, full=True) + assert updated_archive.hash == archive.hash + assert updated_archive.path == archive.path + assert updated_archive.mtime == mtime + assert pageset(updated_archive.pages) == pageset(archive.pages) + + captured = capsys.readouterr() + assert captured.out == "[*] archive.zip\n" + + +@pytest.mark.anyio +async def test_scanner_updates_path(archive, scanner, capsys): + new_path = archive.path + ".new" + + Path(archive.path).rename(new_path) + stat = os.stat(new_path) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + archive = await DB.add(archive) + await scanner.scan() + + updated_archive = await DB.get(Archive, archive.id, full=True) + assert updated_archive.hash == archive.hash + assert updated_archive.path == new_path + assert updated_archive.mtime == mtime + assert pageset(updated_archive.pages) == pageset(archive.pages) + + captured = capsys.readouterr() + assert captured.out == "[>] archive.zip -> archive.zip.new\n" + + +@pytest.mark.anyio +async def test_scanner_reports_missing(archive, scanner): + archive = await DB.add(archive) + Path(archive.path).unlink() + await scanner.scan() + + assert scanner.registry.orphans == {archive.hash: (archive.id, archive.path)} + + +@pytest.mark.anyio +async def test_scanner_reports_duplicate(archive, scanner, capsys): + archive = await DB.add(archive) + copy_path = archive.path + ".copy" + shutil.copyfile(Path(archive.path), copy_path) + await scanner.scan() + + assert list(scanner.registry.duplicates) == [ + [ + (archive.path, Status.UNCHANGED), + (copy_path, Status.IGNORED), + ] + ] + + captured = capsys.readouterr() + assert captured.out == "[I] archive.zip.copy\n" + + +@pytest.mark.anyio +async def test_scanner_ignores_empty_archive(archive, scanner, capsys): + Path(archive.path).unlink() + + empty_path = archive.path + ".empty" + ZipFile(empty_path, "w").close() + + await scanner.scan() + + assert scanner.registry.marked == {} + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_reports_conflict(archive, scanner, capsys): + archive = await DB.add(archive) + ZipFile(archive.path, "w").close() + + await scanner.scan() + + assert scanner.registry.conflicts == { + archive.path: ( + archive.hash, + "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262", + ) + } + + captured = capsys.readouterr() + assert captured.out == "[!] archive.zip\n" + + +@pytest.mark.anyio +async def test_scanner_reprocess(archive, data, scanner, capsys): + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "[+] archive.zip\n" + + old_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full"))) + old_mtime = datetime.fromtimestamp(old_stat.st_mtime, tz=timezone.utc) + + scanner.reprocess = True + + await scanner.scan() + + new_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full"))) + new_mtime = datetime.fromtimestamp(new_stat.st_mtime, tz=timezone.utc) + + assert new_mtime > old_mtime + + captured = capsys.readouterr() + assert captured.out == "[~] archive.zip\n" diff --git a/tests/scrapers/test_scraper.py b/tests/scrapers/test_scraper.py new file mode 100644 index 0000000..6f6f29d --- /dev/null +++ b/tests/scrapers/test_scraper.py @@ -0,0 +1,55 @@ +from hircine.scraper import Scraper, ScrapeWarning + + +class MockScraper(Scraper): + is_available = True + + def scrape(self): + yield lambda: "foo" + yield "bar" + + +class WarningScraper(Scraper): + is_available = True + + def warn(self, str): + raise ScrapeWarning("Invalid input") + + def scrape(self): + yield lambda: "foo" + yield lambda: self.warn("bar") + yield "baz" + + +class ParserlessScraper(Scraper): + is_available = True + + def scrape(self): + yield "literal" + + +def test_scraper_collects(): + generator = MockScraper(None).collect() + + assert set(generator) == set(["foo", "bar"]) + + +def test_scraper_collects_with_transformer(): + generator = MockScraper(None).collect([lambda gen, info: map(str.upper, gen)]) + + assert set(generator) == set(["FOO", "BAR"]) + + +def test_scraper_collects_warnings(): + scraper = WarningScraper(None) + generator = scraper.collect() + + assert set(generator) == set(["foo", "baz"]) + assert scraper.get_warnings() == ["Invalid input"] + + +def test_scraper_collects_literal(): + scraper = ParserlessScraper(None) + generator = scraper.collect() + + assert set(generator) == set(["literal"]) diff --git a/tests/scrapers/test_scraper_utils.py b/tests/scrapers/test_scraper_utils.py new file mode 100644 index 0000000..193cf2a --- /dev/null +++ b/tests/scrapers/test_scraper_utils.py @@ -0,0 +1,28 @@ +from hircine.scraper.utils import parse_dict + + +def test_parse_dict(): + dict = { + "scalar": "foo", + "list": ["bar", "baz"], + "dict": {"nested_scalar": "qux", "nested_list": ["plugh", "xyzzy"]}, + } + + def id(type): + return lambda item: f"{type}_{item}" + + parsers = { + "scalar": id("scalar"), + "list": id("list"), + "dict": {"nested_scalar": id("scalar"), "nested_list": id("list")}, + "missing": id("missing"), + } + + assert [f() for f in parse_dict(parsers, dict)] == [ + "scalar_foo", + "list_bar", + "list_baz", + "scalar_qux", + "list_plugh", + "list_xyzzy", + ] diff --git a/tests/scrapers/test_types.py b/tests/scrapers/test_types.py new file mode 100644 index 0000000..ed937e7 --- /dev/null +++ b/tests/scrapers/test_types.py @@ -0,0 +1,131 @@ +from datetime import date + +import pytest +from hircine.api.types import ScrapedComic +from hircine.scraper import ScrapeWarning +from hircine.scraper.types import ( + Artist, + Category, + Character, + Circle, + Date, + Language, + OriginalTitle, + Rating, + Tag, + Title, + World, +) + + +@pytest.mark.parametrize( + "input,options,want", + [ + ("foo", {}, Tag(namespace="none", tag="foo")), + ("foo:bar", {}, Tag(namespace="foo", tag="bar")), + ("foo:bar:baz", {}, Tag(namespace="foo", tag="bar:baz")), + ("foo/bar", {"delimiter": "/"}, Tag(namespace="foo", tag="bar")), + ], + ids=[ + "tag only", + "tag and namespace", + "tag with delimiter", + "custom delimiter", + ], +) +def test_tag_from_string(input, options, want): + assert Tag.from_string(input, **options) == want + + +@pytest.mark.parametrize( + "input,want", + [ + ("1998-02-07", Date(value=date(1998, 2, 7))), + ("2018-07-18T19:15", Date(value=date(2018, 7, 18))), + ( + "2003-12-30T10:37Z", + Date(value=date(2003, 12, 30)), + ), + ], +) +def test_date_from_iso(input, want): + assert Date.from_iso(input) == want + + +@pytest.mark.parametrize( + "input", + [ + ("text"), + ("1997 02 07"), + ("1997/02/07"), + ], +) +def test_date_from_iso_fails(input): + with pytest.raises(ScrapeWarning, match="Could not parse date:"): + Date.from_iso(input) + + +@pytest.mark.parametrize( + "input,want", + [ + ("886806000", Date(value=date(1998, 2, 7))), + (886806000, Date(value=date(1998, 2, 7))), + ], +) +def test_date_from_timestamp(input, want): + assert Date.from_timestamp(input) == want + + +@pytest.mark.parametrize( + "input", + [ + ("text"), + ], +) +def test_date_from_timestamp_fails(input): + with pytest.raises(ScrapeWarning, match="Could not parse date:"): + Date.from_timestamp(input) + + +@pytest.mark.parametrize( + "item,attr,empty", + [ + (Title(""), "title", None), + (OriginalTitle(""), "original_title", None), + (Language(None), "language", None), + (Date(None), "date", None), + (Rating(None), "rating", None), + (Category(None), "category", None), + (Tag("", ""), "tags", []), + (Tag(namespace="", tag=""), "tags", []), + (Tag(namespace=None, tag=""), "tags", []), + (Tag(namespace="foo", tag=""), "tags", []), + (Artist(""), "artists", []), + (Character(""), "characters", []), + (Circle(""), "circles", []), + (World(""), "worlds", []), + ], + ids=[ + "title", + "original title", + "language", + "date", + "rating", + "category", + "tag (both empty, positional)", + "tag (both empty)", + "tag (namespace None, tag empty)", + "tag (tag empty)", + "artist", + "character", + "circle", + "world", + ], +) +def test_scraped_comic_silently_ignores_empty(item, attr, empty): + def gen(): + yield item + + comic = ScrapedComic.from_generator(gen()) + + assert getattr(comic, attr) == empty diff --git a/tests/thumbnailer/data/example_palette.png b/tests/thumbnailer/data/example_palette.png Binary files differnew file mode 100644 index 0000000..6bf25e1 --- /dev/null +++ b/tests/thumbnailer/data/example_palette.png diff --git a/tests/thumbnailer/data/example_rgb.png b/tests/thumbnailer/data/example_rgb.png Binary files differnew file mode 100644 index 0000000..a245642 --- /dev/null +++ b/tests/thumbnailer/data/example_rgb.png diff --git a/tests/thumbnailer/test_thumbnailer.py b/tests/thumbnailer/test_thumbnailer.py new file mode 100644 index 0000000..62bf127 --- /dev/null +++ b/tests/thumbnailer/test_thumbnailer.py @@ -0,0 +1,74 @@ +import os +from pathlib import Path + +import pytest +from hircine.thumbnailer import Thumbnailer, ThumbnailParameters +from PIL import Image + +mock_params = ThumbnailParameters(bounds=(1440, 2880), options={}) + + +def test_thumbnailer_object(): + thumb = Thumbnailer("objects/", params={}) + assert thumb.object("abcdef", "foo") == os.path.join("objects/", "ab/cdef_foo.webp") + + +@pytest.mark.parametrize( + "extension, can_process", + [ + (".png", True), + (".jpeg", True), + (".jpg", True), + (".gif", True), + (".bmp", True), + (".json", False), + (".txt", False), + ], + ids=["png", "jpeg", "jpg", "gif", "bmp", "json", "txt"], +) +def test_thumbnailer_can_process(extension, can_process): + assert Thumbnailer.can_process(extension) == can_process + + +def test_thumbnailer_process(data): + thumb = Thumbnailer(data("objects/"), params={"mock": mock_params}) + + with open(data("example_rgb.png"), "rb") as f: + size = Image.open(f, mode="r").size + reported_size = thumb.process(f, "abcdef") + + assert reported_size == size + + output = thumb.object("abcdef", "mock") + + assert os.path.exists(output) + + +def test_thumbnailer_converts_non_rgb(data): + thumb = Thumbnailer(data("objects/"), params={"mock": mock_params}) + + with open(data("example_palette.png"), "rb") as f: + size = Image.open(f, mode="r").size + reported_size = thumb.process(f, "abcdef") + + assert reported_size == size + + output = thumb.object("abcdef", "mock") + + assert os.path.exists(output) + + output_image = Image.open(output) + assert output_image.mode == "RGB" + + +def test_thumbnailer_process_ignores_existing(data): + thumb = Thumbnailer(data("objects/"), params={"mock": mock_params}) + + output = Path(thumb.object("abcdef", "mock")) + os.makedirs(os.path.dirname(output)) + output.touch() + + with open(data("example_palette.png"), "rb") as f: + thumb.process(f, "abcdef") + + assert output.stat().st_size == 0 |