From d1d654ebac2d51e3841675faeb56480e440f622f Mon Sep 17 00:00:00 2001 From: Wolfgang Müller Date: Tue, 5 Mar 2024 18:08:09 +0100 Subject: Initial commit --- tests/api/test_archive.py | 388 +++++++++++ tests/api/test_artist.py | 278 ++++++++ tests/api/test_character.py | 285 ++++++++ tests/api/test_circle.py | 278 ++++++++ tests/api/test_collection.py | 0 tests/api/test_comic.py | 1505 +++++++++++++++++++++++++++++++++++++++++ tests/api/test_comic_tag.py | 134 ++++ tests/api/test_db.py | 324 +++++++++ tests/api/test_filter.py | 521 ++++++++++++++ tests/api/test_image.py | 16 + tests/api/test_namespace.py | 291 ++++++++ tests/api/test_page.py | 39 ++ tests/api/test_pagination.py | 61 ++ tests/api/test_scraper_api.py | 395 +++++++++++ tests/api/test_sort.py | 137 ++++ tests/api/test_tag.py | 441 ++++++++++++ tests/api/test_world.py | 278 ++++++++ 17 files changed, 5371 insertions(+) create mode 100644 tests/api/test_archive.py create mode 100644 tests/api/test_artist.py create mode 100644 tests/api/test_character.py create mode 100644 tests/api/test_circle.py create mode 100644 tests/api/test_collection.py create mode 100644 tests/api/test_comic.py create mode 100644 tests/api/test_comic_tag.py create mode 100644 tests/api/test_db.py create mode 100644 tests/api/test_filter.py create mode 100644 tests/api/test_image.py create mode 100644 tests/api/test_namespace.py create mode 100644 tests/api/test_page.py create mode 100644 tests/api/test_pagination.py create mode 100644 tests/api/test_scraper_api.py create mode 100644 tests/api/test_sort.py create mode 100644 tests/api/test_tag.py create mode 100644 tests/api/test_world.py (limited to 'tests/api') diff --git a/tests/api/test_archive.py b/tests/api/test_archive.py new file mode 100644 index 0000000..0ef3425 --- /dev/null +++ b/tests/api/test_archive.py @@ -0,0 +1,388 @@ +import os +from datetime import datetime as dt +from pathlib import Path + +import hircine.config +import hircine.db as database +import hircine.thumbnailer as thumb +import pytest +from conftest import DB, Response +from hircine.db.models import Archive, Comic, Image, Page +from sqlalchemy import select + + +@pytest.fixture +def query_archive(execute_id): + query = """ + query archive($id: Int!) { + archive(id: $id) { + __typename + ... on FullArchive { + id + name + createdAt + mtime + size + path + pageCount + organized + comics { + __typename + id + } + cover { + __typename + id + } + pages { + __typename + id + image { + __typename + id + } + } + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_archives(execute): + query = """ + query archives { + archives { + __typename + count + edges { + id + name + size + pageCount + organized + cover { + __typename + id + } + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def update_archives(execute_update): + mutation = """ + mutation updateArchives($ids: [Int!]!, $input: UpdateArchiveInput!) { + updateArchives(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on PageRemoteError { + id + archiveId + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_update(mutation) + + +@pytest.fixture +def delete_archives(execute_delete): + mutation = """ + mutation deleteArchives($ids: [Int!]!) { + deleteArchives(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +def assert_image_matches(obj, model): + assert obj["__typename"] == "Image" + assert obj["id"] == model.id + + +def assert_page_matches(obj, model): + assert obj["__typename"] == "Page" + assert obj["id"] == model.id + + +@pytest.mark.anyio +async def test_query_archive(query_archive, gen_archive): + archive = next(gen_archive) + pages = archive.pages + + await DB.add(archive) + + response = Response(await query_archive(archive.id)) + response.assert_is("FullArchive") + + assert response.id == archive.id + assert response.name == archive.name + assert dt.fromisoformat(response.createdAt) == archive.created_at + assert dt.fromisoformat(response.mtime) == archive.mtime + assert response.size == archive.size + assert response.path == archive.path + assert response.comics == [] + assert response.pageCount == archive.page_count + assert response.organized == archive.organized + assert_image_matches(response.cover, pages[0].image) + + assert len(response.pages) == len(pages) + + page_iter = iter(sorted(pages, key=lambda page: page.index)) + for page in response.pages: + matching_page = next(page_iter) + assert_page_matches(page, matching_page) + assert_image_matches(page["image"], matching_page.image) + + +@pytest.mark.anyio +async def test_query_archive_sorts_pages(query_archive, gen_jumbled_archive): + archive = await DB.add(next(gen_jumbled_archive)) + + response = Response(await query_archive(archive.id)) + response.assert_is("FullArchive") + + page_iter = iter(sorted(archive.pages, key=lambda page: page.index)) + for page in response.pages: + matching_page = next(page_iter) + assert_page_matches(page, matching_page) + assert_image_matches(page["image"], matching_page.image) + + +@pytest.mark.anyio +async def test_query_archive_fails_not_found(query_archive): + response = Response(await query_archive(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Archive ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_archives(query_archives, gen_archive): + archives = await DB.add_all(*gen_archive) + + response = Response(await query_archives()) + response.assert_is("ArchiveFilterResult") + + assert response.count == len(archives) + assert isinstance((response.edges), list) + assert len(response.edges) == len(archives) + + edges = iter(response.edges) + for archive in sorted(archives, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == archive.id + assert edge["name"] == archive.name + assert edge["size"] == archive.size + assert edge["pageCount"] == archive.page_count + assert_image_matches(edge["cover"], archive.cover) + + +@pytest.fixture +def gen_archive_with_files(tmpdir, monkeypatch, gen_archive): + content_dir = os.path.join(tmpdir, "content/") + object_dir = os.path.join(tmpdir, "objects/") + os.mkdir(content_dir) + os.mkdir(object_dir) + + dirs = hircine.config.DirectoryStructure(scan=content_dir, objects=object_dir) + monkeypatch.setattr(hircine.config, "dir_structure", dirs) + + archive = next(gen_archive) + + archive_path = Path(os.path.join(content_dir, "archive.zip")) + archive_path.touch() + archive.path = str(archive_path) + + img_paths = [] + for page in archive.pages: + for suffix in ["full", "thumb"]: + img_path = Path(thumb.object_path(object_dir, page.image.hash, suffix)) + os.makedirs(os.path.dirname(img_path), exist_ok=True) + img_path.touch() + + img_paths.append(img_path) + + yield archive, content_dir, object_dir, img_paths + + +@pytest.mark.anyio +async def test_delete_archive(delete_archives, gen_archive_with_files): + archive, content_dir, object_dir, img_paths = gen_archive_with_files + archive_path = archive.path + + archive = await DB.add(archive) + page_ids = [page.id for page in archive.pages] + image_ids = [page.image.id for page in archive.pages] + + response = Response(await delete_archives(archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + async with database.session() as s: + db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all() + db_images = ( + await s.scalars(select(Image).where(Image.id.in_(image_ids))) + ).all() + + assert db_pages == [] + assert db_images == [] + + assert os.path.exists(archive_path) is False + for img_path in img_paths: + assert os.path.exists(img_path) is False + + +@pytest.mark.anyio +async def test_delete_archive_deletes_images_only_when_necessary( + delete_archives, gen_archive_with_files, gen_archive +): + archive, content_dir, object_dir, img_paths = gen_archive_with_files + archive_path = archive.path + + archive = await DB.add(archive) + page_ids = [page.id for page in archive.pages] + image_ids = [page.image.id for page in archive.pages] + + another = next(gen_archive) + another.pages = [ + Page(path="foo", index=1, image_id=id, archive=another) for id in image_ids + ] + another.cover = archive.cover + await DB.add(another) + + response = Response(await delete_archives(archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + async with database.session() as s: + db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all() + db_images = ( + await s.scalars(select(Image.id).where(Image.id.in_(image_ids))) + ).all() + + assert db_pages == [] + assert db_images == image_ids + + assert os.path.exists(archive_path) is False + for img_path in img_paths: + assert os.path.exists(img_path) is True + + +@pytest.mark.anyio +async def test_delete_archive_cascades_on_comic( + delete_archives, gen_archive_with_files +): + archive, *_ = gen_archive_with_files + comic = Comic( + id=1, + title="Hic Sunt Dracones", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + + comic = await DB.add(comic) + + response = Response(await delete_archives(comic.archive.id)) + response.assert_is("DeleteSuccess") + + archive = await DB.get(Archive, archive.id) + assert archive is None + + comic = await DB.get(Comic, comic.id) + assert comic is None + + +@pytest.mark.anyio +async def test_update_archives(update_archives, gen_archive): + old_archive = await DB.add(next(gen_archive)) + + response = Response( + await update_archives( + old_archive.id, + {"cover": {"id": old_archive.pages[1].id}, "organized": True}, + ) + ) + response.assert_is("UpdateSuccess") + + archive = await DB.get(Archive, old_archive.id) + + assert archive.cover_id == old_archive.pages[1].image.id + assert archive.organized is True + + +@pytest.mark.anyio +async def test_update_archive_fails_archive_not_found(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + + response = Response( + await update_archives(100, {"cover": {"id": archive.pages[1].id}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Archive ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_archive_cover_fails_page_not_found(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + + response = Response(await update_archives(archive.id, {"cover": {"id": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_archive_cover_fails_page_remote(update_archives, gen_archive): + archive = await DB.add(next(gen_archive)) + another = await DB.add(next(gen_archive)) + remote_id = another.pages[0].id + + response = Response(await update_archives(archive.id, {"cover": {"id": remote_id}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == another.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {another.id}" + ) diff --git a/tests/api/test_artist.py b/tests/api/test_artist.py new file mode 100644 index 0000000..8cb2f1a --- /dev/null +++ b/tests/api/test_artist.py @@ -0,0 +1,278 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Artist + + +@pytest.fixture +def query_artist(execute_id): + query = """ + query artist($id: Int!) { + artist(id: $id) { + __typename + ... on Artist { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_artists(execute): + query = """ + query artists { + artists { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_artist(execute_add): + mutation = """ + mutation addArtist($input: AddArtistInput!) { + addArtist(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_artists(execute_update): + mutation = """ + mutation updateArtists($ids: [Int!]!, $input: UpdateArtistInput!) { + updateArtists(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_artists(execute_delete): + mutation = """ + mutation deleteArtists($ids: [Int!]!) { + deleteArtists(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_artist(query_artist, gen_artist): + artist = await DB.add(next(gen_artist)) + + response = Response(await query_artist(artist.id)) + response.assert_is("Artist") + + assert response.id == artist.id + assert response.name == artist.name + + +@pytest.mark.anyio +async def test_query_artist_fails_not_found(query_artist): + response = Response(await query_artist(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Artist ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_artists(query_artists, gen_artist): + artists = await DB.add_all(*gen_artist) + + response = Response(await query_artists()) + response.assert_is("ArtistFilterResult") + + assert response.count == len(artists) + assert isinstance((response.edges), list) + assert len(response.edges) == len(artists) + + edges = iter(response.edges) + for artist in sorted(artists, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == artist.id + assert edge["name"] == artist.name + + +@pytest.mark.anyio +async def test_add_artist(add_artist): + response = Response(await add_artist({"name": "added artist"})) + response.assert_is("AddSuccess") + + artist = await DB.get(Artist, response.id) + assert artist is not None + assert artist.name == "added artist" + + +@pytest.mark.anyio +async def test_add_artist_fails_empty_parameter(add_artist): + response = Response(await add_artist({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_artist_fails_exists(add_artist, gen_artist): + artist = await DB.add(next(gen_artist)) + + response = Response(await add_artist({"name": artist.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Artist with this name exists" + + +@pytest.mark.anyio +async def test_delete_artist(delete_artists, gen_artist): + artist = await DB.add(next(gen_artist)) + id = artist.id + + response = Response(await delete_artists(id)) + response.assert_is("DeleteSuccess") + + artist = await DB.get(Artist, id) + assert artist is None + + +@pytest.mark.anyio +async def test_delete_artist_not_found(delete_artists): + response = Response(await delete_artists(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Artist ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_artist(update_artists, gen_artist): + artist = await DB.add(next(gen_artist)) + + input = {"name": "updated artist"} + response = Response(await update_artists(artist.id, input)) + response.assert_is("UpdateSuccess") + + artist = await DB.get(Artist, artist.id) + assert artist is not None + assert artist.name == "updated artist" + + +@pytest.mark.anyio +async def test_update_artist_fails_exists(update_artists, gen_artist): + first = await DB.add(next(gen_artist)) + second = await DB.add(next(gen_artist)) + + response = Response(await update_artists(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Artist with this name exists" + + +@pytest.mark.anyio +async def test_update_artist_fails_not_found(update_artists): + response = Response(await update_artists(1, {"name": "updated artist"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Artist ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_artists_cannot_bulk_edit_name(update_artists, gen_artist): + first = await DB.add(next(gen_artist)) + second = await DB.add(next(gen_artist)) + + response = Response(await update_artists([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_artist_fails_empty_parameter(update_artists, gen_artist, empty): + artist = await DB.add(next(gen_artist)) + + response = Response(await update_artists(artist.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_artist_changes_updated_at(update_artists): + original_artist = Artist(name="artist") + original_artist.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_artist = await DB.add(original_artist) + + response = Response(await update_artists(original_artist.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + artist = await DB.get(Artist, original_artist.id) + assert artist.updated_at > original_artist.updated_at diff --git a/tests/api/test_character.py b/tests/api/test_character.py new file mode 100644 index 0000000..567d2a4 --- /dev/null +++ b/tests/api/test_character.py @@ -0,0 +1,285 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Character + + +@pytest.fixture +def query_character(execute_id): + query = """ + query character($id: Int!) { + character(id: $id) { + __typename + ... on Character { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_characters(execute): + query = """ + query characters { + characters { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_character(execute_add): + mutation = """ + mutation addCharacter($input: AddCharacterInput!) { + addCharacter(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_characters(execute_update): + mutation = """ + mutation updateCharacters($ids: [Int!]!, $input: UpdateCharacterInput!) { + updateCharacters(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_characters(execute_delete): + mutation = """ + mutation deleteCharacters($ids: [Int!]!) { + deleteCharacters(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_character(query_character, gen_character): + character = await DB.add(next(gen_character)) + + response = Response(await query_character(character.id)) + response.assert_is("Character") + + assert response.id == character.id + assert response.name == character.name + + +@pytest.mark.anyio +async def test_query_character_fails_not_found(query_character): + response = Response(await query_character(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Character ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_characters(query_characters, gen_character): + characters = await DB.add_all(*gen_character) + + response = Response(await query_characters()) + response.assert_is("CharacterFilterResult") + + assert response.count == len(characters) + assert isinstance((response.edges), list) + assert len(response.edges) == len(characters) + + edges = iter(response.edges) + for character in sorted(characters, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == character.id + assert edge["name"] == character.name + + +@pytest.mark.anyio +async def test_add_character(add_character): + response = Response(await add_character({"name": "added character"})) + response.assert_is("AddSuccess") + + character = await DB.get(Character, response.id) + assert character is not None + assert character.name == "added character" + + +@pytest.mark.anyio +async def test_add_character_fails_empty_parameter(add_character): + response = Response(await add_character({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_character_fails_exists(add_character, gen_character): + character = await DB.add(next(gen_character)) + + response = Response(await add_character({"name": character.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Character with this name exists" + + +@pytest.mark.anyio +async def test_delete_character(delete_characters, gen_character): + character = await DB.add(next(gen_character)) + id = character.id + + response = Response(await delete_characters(id)) + response.assert_is("DeleteSuccess") + + character = await DB.get(Character, id) + assert character is None + + +@pytest.mark.anyio +async def test_delete_character_not_found(delete_characters): + response = Response(await delete_characters(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Character ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_character(update_characters, gen_character): + character = await DB.add(next(gen_character)) + + input = {"name": "updated character"} + response = Response(await update_characters(character.id, input)) + response.assert_is("UpdateSuccess") + + character = await DB.get(Character, character.id) + assert character is not None + assert character.name == "updated character" + + +@pytest.mark.anyio +async def test_update_character_fails_exists(update_characters, gen_character): + first = await DB.add(next(gen_character)) + second = await DB.add(next(gen_character)) + + response = Response(await update_characters(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Character with this name exists" + + +@pytest.mark.anyio +async def test_update_character_fails_not_found(update_characters): + response = Response(await update_characters(1, {"name": "updated_character"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Character ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_characters_cannot_bulk_edit_name( + update_characters, gen_character +): + first = await DB.add(next(gen_character)) + second = await DB.add(next(gen_character)) + + response = Response( + await update_characters([first.id, second.id], {"name": "unique"}) + ) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_character_fails_empty_parameter( + update_characters, gen_character, empty +): + character = await DB.add(next(gen_character)) + response = Response(await update_characters(character.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_character_changes_updated_at(update_characters): + original_character = Character(name="character") + original_character.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_character = await DB.add(original_character) + + response = Response( + await update_characters(original_character.id, {"name": "updated"}) + ) + response.assert_is("UpdateSuccess") + + character = await DB.get(Character, original_character.id) + assert character.updated_at > original_character.updated_at diff --git a/tests/api/test_circle.py b/tests/api/test_circle.py new file mode 100644 index 0000000..a03ba89 --- /dev/null +++ b/tests/api/test_circle.py @@ -0,0 +1,278 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Circle + + +@pytest.fixture +def query_circle(execute_id): + query = """ + query circle($id: Int!) { + circle(id: $id) { + __typename + ... on Circle { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_circles(execute): + query = """ + query circles { + circles { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_circle(execute_add): + mutation = """ + mutation addCircle($input: AddCircleInput!) { + addCircle(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_circles(execute_update): + mutation = """ + mutation updateCircles($ids: [Int!]!, $input: UpdateCircleInput!) { + updateCircles(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on IDNotFoundError { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_circles(execute_delete): + mutation = """ + mutation deleteCircles($ids: [Int!]!) { + deleteCircles(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_circle(query_circle, gen_circle): + circle = await DB.add(next(gen_circle)) + + response = Response(await query_circle(circle.id)) + response.assert_is("Circle") + + assert response.id == circle.id + assert response.name == circle.name + + +@pytest.mark.anyio +async def test_query_circle_fails_not_found(query_circle): + response = Response(await query_circle(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Circle ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_circles(query_circles, gen_circle): + circles = await DB.add_all(*gen_circle) + + response = Response(await query_circles()) + response.assert_is("CircleFilterResult") + + assert response.count == len(circles) + assert isinstance((response.edges), list) + assert len(response.edges) == len(circles) + + edges = iter(response.edges) + for circle in sorted(circles, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == circle.id + assert edge["name"] == circle.name + + +@pytest.mark.anyio +async def test_add_circle(add_circle): + response = Response(await add_circle({"name": "added circle"})) + response.assert_is("AddSuccess") + + circle = await DB.get(Circle, response.id) + assert circle is not None + assert circle.name == "added circle" + + +@pytest.mark.anyio +async def test_add_circle_fails_empty_parameter(add_circle): + response = Response(await add_circle({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_circle_fails_exists(add_circle, gen_circle): + circle = await DB.add(next(gen_circle)) + + response = Response(await add_circle({"name": circle.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Circle with this name exists" + + +@pytest.mark.anyio +async def test_delete_circle(delete_circles, gen_circle): + circle = await DB.add(next(gen_circle)) + id = circle.id + + response = Response(await delete_circles(id)) + response.assert_is("DeleteSuccess") + + circle = await DB.get(Circle, id) + assert circle is None + + +@pytest.mark.anyio +async def test_delete_circle_not_found(delete_circles): + response = Response(await delete_circles(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Circle ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_circle(update_circles, gen_circle): + circle = await DB.add(next(gen_circle)) + + input = {"name": "updated circle"} + response = Response(await update_circles(circle.id, input)) + response.assert_is("UpdateSuccess") + + circle = await DB.get(Circle, circle.id) + assert circle is not None + assert circle.name == "updated circle" + + +@pytest.mark.anyio +async def test_update_circle_fails_exists(update_circles, gen_circle): + first = await DB.add(next(gen_circle)) + second = await DB.add(next(gen_circle)) + + response = Response(await update_circles(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Circle with this name exists" + + +@pytest.mark.anyio +async def test_update_circle_fails_not_found(update_circles): + response = Response(await update_circles(1, {"name": "updated circle"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Circle ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_circles_cannot_bulk_edit_name(update_circles, gen_circle): + first = await DB.add(next(gen_circle)) + second = await DB.add(next(gen_circle)) + + response = Response(await update_circles([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_circle_fails_empty_parameter(update_circles, gen_circle, empty): + circle = await DB.add(next(gen_circle)) + + response = Response(await update_circles(circle.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_circle_changes_updated_at(update_circles): + original_circle = Circle(name="circle") + original_circle.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_circle = await DB.add(original_circle) + + response = Response(await update_circles(original_circle.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + circle = await DB.get(Circle, original_circle.id) + assert circle.updated_at > original_circle.updated_at diff --git a/tests/api/test_collection.py b/tests/api/test_collection.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/test_comic.py b/tests/api/test_comic.py new file mode 100644 index 0000000..d3fa51e --- /dev/null +++ b/tests/api/test_comic.py @@ -0,0 +1,1505 @@ +from datetime import date, timezone +from datetime import datetime as dt + +import pytest +from conftest import DB, Response +from hircine.db.models import ( + Artist, + Circle, + Comic, + ComicArtist, + ComicTag, + Namespace, + Tag, + World, +) +from hircine.enums import Category, Censorship, Direction, Language, Layout, Rating + +full_comic_fragment = """ + fragment FullComic on FullComic { + id + title + category + censorship + createdAt + date + direction + language + layout + originalTitle + url + rating + pageCount + updatedAt + organized + bookmarked + archive { + __typename + id + } + artists { + __typename + id + } + characters { + __typename + id + } + circles { + __typename + id + } + cover { + __typename + id + } + pages { + __typename + id + } + tags { + __typename + id + name + } + worlds { + __typename + id + } + } +""" + +comic_fragment = """ + fragment Comic on Comic { + id + title + category + censorship + date + language + originalTitle + rating + pageCount + organized + bookmarked + artists { + __typename + id + } + characters { + __typename + id + } + circles { + __typename + id + } + cover { + __typename + id + } + tags { + __typename + id + name + } + worlds { + __typename + id + } + } +""" + + +@pytest.fixture +def query_comic(execute_id): + query = """ + query comic($id: Int!) { + comic(id: $id) { + __typename + ... on FullComic { + ...FullComic + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(full_comic_fragment + query) + + +@pytest.fixture +def query_comics(execute): + query = """ + query comics { + comics { + __typename + count + edges { + ...Comic + } + } + } + """ + + return execute(comic_fragment + query) + + +@pytest.fixture +def add_comic(execute_add): + mutation = """ + mutation addComic($input: AddComicInput!) { + addComic(input: $input) { + __typename + ... on AddComicSuccess { + id + archivePagesRemaining + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on PageClaimedError { + comicId + id + } + ... on PageRemoteError { + archiveId + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def delete_comics(execute_delete): + mutation = """ + mutation deleteComics($ids: [Int!]!) { + deleteComics(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.fixture +def update_comics(execute_update): + mutation = """ + mutation updateComics($ids: [Int!]!, $input: UpdateComicInput!) { + updateComics(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on PageRemoteError { + id + archiveId + } + ... on PageClaimedError { + id + comicId + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def upsert_comics(execute_update): + mutation = """ + mutation upsertComics($ids: [Int!]!, $input: UpsertComicInput!) { + upsertComics(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +def assert_association_matches(obj, model, typename): + assert obj["__typename"] == typename + assert obj["id"] == model.id + + +def assert_associations_match(objlist, modellist, typename, sortkey): + assert isinstance((objlist), list) + assert len(objlist) == len(modellist) + model = iter(sorted(modellist, key=sortkey)) + for obj in objlist: + assert_association_matches(obj, next(model), typename) + + +def assert_comic_item_matches(data, comic): + assert data["id"] == comic.id + assert data["title"] == comic.title + assert data["originalTitle"] == comic.original_title + assert date.fromisoformat(data["date"]) == comic.date + assert Rating[data["rating"]] == comic.rating + assert Language[data["language"]] == comic.language + assert data["pageCount"] == comic.page_count + assert data["organized"] == comic.organized + assert data["bookmarked"] == comic.bookmarked + + if data["category"]: + assert Category[data["category"]] == comic.category + else: + assert comic.category is None + + if data["censorship"]: + assert Censorship[data["censorship"]] == comic.censorship + else: + assert comic.censorship is None + + assert_association_matches(data["cover"], comic.cover, "Image") + assert_associations_match( + data["artists"], comic.artists, "Artist", lambda a: a.name + ) + assert_associations_match( + data["characters"], comic.characters, "Character", lambda c: c.name + ) + assert_associations_match( + data["circles"], comic.circles, "Circle", lambda c: c.name + ) + assert_associations_match(data["tags"], comic.tags, "ComicTag", lambda t: t.name) + assert_associations_match(data["worlds"], comic.worlds, "World", lambda w: w.name) + + +def assert_comic_matches(data, comic): + assert_comic_item_matches(data, comic) + assert dt.fromisoformat(data["createdAt"]) == comic.created_at + assert dt.fromisoformat(data["updatedAt"]) == comic.updated_at + assert Direction[data["direction"]] == comic.direction + assert Layout[data["layout"]] == comic.layout + assert data["url"] == comic.url + + assert_association_matches(data["archive"], comic.archive, "Archive") + assert_associations_match(data["pages"], comic.pages, "Page", lambda p: p.index) + + +@pytest.mark.anyio +async def test_query_comic(query_comic, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_comic(comic.id)) + response.assert_is("FullComic") + + assert_comic_matches(response.data, comic) + + +@pytest.mark.anyio +async def test_query_comic_sorts_pages(query_comic, gen_jumbled_archive): + archive = next(gen_jumbled_archive) + + comic = await DB.add( + Comic( + id=1, + title="A Jumbled Mess", + archive=archive, + pages=archive.pages, + cover=archive.cover, + ) + ) + + response = Response(await query_comic(comic.id)) + response.assert_is("FullComic") + + assert_associations_match(response.pages, comic.pages, "Page", lambda p: p.index) + + +@pytest.mark.anyio +async def test_query_comic_fails_not_found(query_comic): + response = Response(await query_comic(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_comics(query_comics, gen_comic): + comics = await DB.add_all(*gen_comic) + + response = Response(await query_comics()) + response.assert_is("ComicFilterResult") + + assert response.count == len(comics) + assert isinstance((response.edges), list) + assert len(response.edges) == len(comics) + + edge = iter(response.edges) + for comic in sorted(comics, key=lambda c: c.title): + assert_comic_item_matches(next(edge), comic) + + +@pytest.mark.anyio +async def test_add_comic(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + before = dt.now(timezone.utc).replace(microsecond=0) + + response = Response( + await add_comic( + { + "title": "The Comically Bad Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("AddComicSuccess") + assert response.archivePagesRemaining is False + + after = dt.now(timezone.utc).replace(microsecond=0) + + comic = await DB.get(Comic, response.id, full=True) + assert comic is not None + assert comic.title == "The Comically Bad Comic" + + assert comic.archive.id == archive.id + assert comic.archive.organized is True + + assert set([page.id for page in comic.pages]) == set( + [page.id for page in archive.pages] + ) + + assert comic.cover.id == archive.cover.id + + assert comic.category is None + assert comic.censorship is None + assert comic.created_at >= before + assert comic.created_at <= after + assert comic.date is None + assert comic.language is None + assert comic.layout == Layout.SINGLE + assert comic.original_title is None + assert comic.url is None + assert comic.rating is None + + assert comic.artists == [] + assert comic.characters == [] + assert comic.circles == [] + assert comic.tags == [] + assert comic.worlds == [] + + +@pytest.mark.anyio +async def test_add_comic_pages_remaining(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "The Unfinished Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages][:2]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("AddComicSuccess") + assert response.archivePagesRemaining is True + + comic = await DB.get(Comic, response.id, full=True) + assert comic.archive.organized is False + + +@pytest.mark.anyio +async def test_add_comic_fails_archive_not_found(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Voidful Comic", + "archive": {"id": 10}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("IDNotFoundError") + assert response.id == 10 + assert response.message == "Archive ID not found: '10'" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_not_found(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Pageless Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [10]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("IDNotFoundError") + assert response.id == 10 + assert response.message == "Page ID not found: '10'" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_claimed(add_comic, gen_archive): + other_archive = next(gen_archive) + other_comic = await DB.add( + Comic( + title="Lawful Comic", + archive=other_archive, + cover=other_archive.cover, + pages=other_archive.pages, + ) + ) + + claimed_page = other_comic.pages[0] + + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Comic of Attempted Burglary", + "archive": {"id": archive.id}, + "pages": {"ids": [claimed_page.id]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + + response.assert_is("PageClaimedError") + assert response.id == claimed_page.id + assert response.comicId == other_comic.id + assert ( + response.message + == f"Page ID {claimed_page.id} is already claimed by comic ID {other_comic.id}" + ) + + +@pytest.mark.anyio +async def test_add_comic_fails_empty_parameter(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("InvalidParameterError") + assert response.parameter == "title" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_remote(add_comic, gen_archive): + other_archive = await DB.add(next(gen_archive)) + other_page = other_archive.pages[0] + + archive = await DB.add(next(gen_archive)) + + response = Response( + await add_comic( + { + "title": "Comic of Multiple Archives", + "archive": {"id": archive.id}, + "pages": {"ids": [other_page.id]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + + response.assert_is("PageRemoteError") + assert response.id == other_page.id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_add_comic_fails_cover_remote(add_comic, gen_archive): + other_archive = await DB.add(next(gen_archive)) + other_page = other_archive.pages[0] + + archive = await DB.add(next(gen_archive)) + + response = Response( + await add_comic( + { + "title": "Comic of Multiple Archives", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": other_page.id}, + } + ) + ) + + response.assert_is("PageRemoteError") + assert response.id == other_page.id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_delete_comic(delete_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await delete_comics(comic.id)) + response.assert_is("DeleteSuccess") + + comic = await DB.get(Comic, comic.id) + assert comic is None + + +@pytest.mark.anyio +async def test_delete_comic_not_found(delete_comics): + response = Response(await delete_comics(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +def assert_assocs_match(assocs, collection, name_only=False): + assert set([o.name for o in assocs]) == set([o.name for o in collection]) + assert set([o.id for o in assocs]) == set([o.id for o in collection]) + + +@pytest.mark.anyio +async def test_update_comic(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + artists = await DB.add_all(Artist(name="arty"), Artist(name="farty")) + circles = await DB.add_all(Circle(name="round"), Circle(name="oval")) + worlds = await DB.add_all(World(name="animal world"), World(name="no spiders")) + + namespace = await DB.add(Namespace(name="emus")) + tag = await DB.add(Tag(name="creepy")) + ct = ComicTag(namespace=namespace, tag=tag) + + new_tags = [ct] + original_comic.tags + new_pages = [p.id for p in original_comic.pages[:2]] + + input = { + "title": "Saucy Savannah Adventures (now in Italian)", + "url": "file:///home/savannah/avventura", + "originalTitle": original_comic.title, + "cover": {"id": original_comic.pages[1].id}, + "pages": {"ids": new_pages}, + "favourite": False, + "organized": True, + "bookmarked": True, + "artists": {"ids": [a.id for a in artists]}, + "circles": {"ids": [c.id for c in circles]}, + "worlds": {"ids": [w.id for w in worlds]}, + "tags": {"ids": [ct.id for ct in new_tags]}, + "date": "2010-07-06", + "direction": "LEFT_TO_RIGHT", + "language": "IT", + "layout": "DOUBLE_OFFSET", + "rating": "EXPLICIT", + "censorship": "BAR", + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + assert comic.title == "Saucy Savannah Adventures (now in Italian)" + assert comic.original_title == original_comic.title + assert comic.cover.id == original_comic.pages[1].image.id + assert comic.url == "file:///home/savannah/avventura" + assert comic.favourite is False + assert comic.organized is True + assert comic.bookmarked is True + + assert set([p.id for p in comic.pages]) == set(new_pages) + + assert_assocs_match(comic.artists, artists) + assert_assocs_match(comic.circles, circles) + assert_assocs_match(comic.characters, original_comic.characters) + assert_assocs_match(comic.worlds, worlds) + assert_assocs_match(comic.tags, new_tags) + + assert comic.date == date(2010, 7, 6) + assert comic.direction == Direction.LEFT_TO_RIGHT + assert comic.layout == Layout.DOUBLE_OFFSET + assert comic.rating == Rating.EXPLICIT + assert comic.language == Language.IT + assert comic.censorship == Censorship.BAR + + +@pytest.mark.anyio +async def test_update_comic_clears_associations(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + empty = {"ids": []} + + input = { + "artists": empty, + "circles": empty, + "worlds": empty, + "tags": empty, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.artists == [] + assert comic.circles == [] + assert comic.worlds == [] + assert comic.tags == [] + + +@pytest.mark.anyio +async def test_update_comic_clears_enums(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + input = { + "category": None, + "censorship": None, + "rating": None, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.rating is None + assert comic.category is None + assert comic.censorship is None + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "with None", + "with empty string", + ], +) +@pytest.mark.anyio +async def test_update_comic_clears_string_fields(update_comics, gen_comic, empty): + original_comic = await DB.add(next(gen_comic)) + + input = { + "originalTitle": empty, + "url": empty, + "date": None, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.original_title is None + assert comic.date is None + assert comic.url is None + + +@pytest.mark.anyio +async def test_update_comic_fails_comic_not_found(update_comics): + response = Response(await update_comics(1, {"title": "This Will Not Happen"})) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.parametrize( + "parameter,empty", + [ + ("title", ""), + ("title", None), + ("direction", None), + ("layout", None), + ], + ids=[ + "title (empty string)", + "title (none)", + "direction", + "layout", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_empty_parameter( + update_comics, gen_archive, parameter, empty +): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {parameter: empty})) + response.assert_is("InvalidParameterError") + assert response.parameter == parameter + assert response.message == f"Invalid parameter '{parameter}': cannot be empty" + + +@pytest.mark.anyio +async def test_update_comic_fails_namespace_not_found(update_comics, gen_archive): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + tag = await DB.add(Tag(name="shiny")) + + response = Response( + await update_comics(comic.id, {"tags": {"ids": [f"1:{tag.id}"]}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_comic_fails_tag_not_found(update_comics, gen_archive): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + namespace = await DB.add(Namespace(name="height")) + + response = Response( + await update_comics(comic.id, {"tags": {"ids": [f"{namespace.id}:1"]}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.parametrize( + "input", + [ + "", + ":1", + "1:", + "a:b", + "tag", + ], + ids=[ + "empty", + "namespacing missing", + "tag missing", + "no numeric ids", + "wrong format", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_invalid_tag(update_comics, gen_archive, input): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {"tags": {"ids": [input]}})) + response.assert_is("InvalidParameterError") + assert response.parameter == "id" + + msg = "Invalid parameter 'id': ComicTag ID must be specified as :" # noqa: E501 + assert response.message == msg + + +@pytest.mark.parametrize( + "name,key,id", + [ + ("Artist", "artists", 1), + ("Character", "characters", 1), + ("Circle", "circles", 1), + ("World", "worlds", 1), + ], + ids=[ + "artist", + "character", + "circle", + "world", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_assoc_not_found( + update_comics, gen_archive, name, key, id +): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {key: {"ids": [id]}})) + response.assert_is("IDNotFoundError") + assert response.id == id + assert response.message == f"{name} ID not found: '{id}'" + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + ], + ids=[ + "default option", + "empty option", + "explicit option", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignores_with(upsert_comics, gen_comic, option): + original_comic = await DB.add(next(gen_comic)) + + new_artists = await DB.add_all(Artist(name="arty"), Artist(name="farty")) + + input = { + "artists": { + "names": [a.name for a in new_artists] + ["newy"], + "options": option, + }, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.artists, original_comic.artists + list(new_artists)) + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + ], + ids=[ + "default option", + "empty option", + "explicit option", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignores_missing_tags_with(upsert_comics, gen_comic, option): + original_comic = await DB.add(next(gen_comic)) + + tags = await DB.add_all( + ComicTag( + comic_id=original_comic.id, + namespace=Namespace(name="foo"), + tag=Tag(name="bar"), + ) + ) + + input = { + "tags": {"names": ["foo:bar", "baz:qux"], "options": option}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, original_comic.tags + list(tags)) + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + {"onMissing": "CREATE"}, + ], + ids=[ + "default option", + "empty option", + "IGNORE", + "CREATE", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_skips_existing_tags(upsert_comics, gen_comic, option): + comic = await DB.add(next(gen_comic)) + + ctag = comic.tags[0] + names = [f"{ctag.namespace.name}:{ctag.tag.name}"] + + input = { + "tags": {"names": names, "options": option}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, comic.tags) + + +@pytest.mark.parametrize( + "valid", + [ + True, + False, + ], + ids=[ + "valid combination", + "invalid combination", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignore_missing_handles_resident( + upsert_comics, gen_comic, valid +): + original_comic = await DB.add(next(gen_comic)) + + namespace = await DB.add(Namespace(name="foo")) + tag = Tag(name="bar") + if valid: + tag.namespaces = [namespace] + + tag = await DB.add(tag) + ctag = ComicTag(namespace=namespace, tag=tag) + + expected_tags = original_comic.tags + + if valid: + expected_tags.append(ctag) + + input = { + "tags": {"names": ["foo:bar"], "options": {"onMissing": "IGNORE"}}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, expected_tags) + + +@pytest.mark.anyio +async def test_upsert_comic_tags_uses_existing(upsert_comics, empty_comic): + original_comic = await DB.add(empty_comic) + + await DB.add_all(Namespace(name="foo")) + await DB.add_all(Tag(name="bar")) + + tag_names = ["foo:bar"] + + response = Response( + await upsert_comics( + original_comic.id, + {"tags": {"names": tag_names, "options": {"onMissing": "CREATE"}}}, + ) + ) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert set(tag_names) == set( + [f"{t.namespace.name}:{t.tag.name}" for t in comic.tags] + ) + + +@pytest.mark.parametrize( + "key,list", + [ + ("artists", ["arty", "farty"]), + ("tags", ["alien:medium", "human:tiny"]), + ("artists", ["arty", "arty"]), + ("tags", ["foo:good", "bar:good"]), + ("tags", ["foo:good", "foo:bad"]), + ("artists", []), + ], + ids=[ + "artists", + "tags", + "artists (duplicate)", + "tags (duplicate)", + "namespace (duplicate)", + "artists (empty)", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_creates(upsert_comics, empty_comic, key, list): + original_comic = await DB.add(empty_comic) + + input = { + key: {"names": list, "options": {"onMissing": "CREATE"}}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert set(list) == set([o.name for o in getattr(comic, key)]) + + +@pytest.mark.anyio +async def test_upsert_comic_fails_creating_empty_assoc_name(upsert_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + input = { + "artists": {"names": ""}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("InvalidParameterError") + assert response.parameter == "Artist.name" + + +@pytest.mark.anyio +async def test_upsert_comic_does_not_replace(upsert_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + original_artists = set([a.name for a in original_comic.artists]) + + input = { + "artists": {"names": []}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id) + artists = set([a.name for a in comic.artists]) + + assert artists == original_artists + + +@pytest.mark.parametrize( + "input", + [ + "", + ":tiny", + "human:", + "medium", + ], + ids=[ + "empty", + "namespace missing", + "tag missing", + "wrong format", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_fails_creating_invalid_tag(upsert_comics, gen_comic, input): + comic = await DB.add(next(gen_comic)) + + input = { + "tags": {"names": [input]}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + msg = "Invalid parameter 'name': ComicTag name must be specified as :" # noqa: E501 + assert response.message == msg + + +@pytest.mark.parametrize( + "options", + [ + None, + {}, + {"mode": "REPLACE"}, + ], + ids=[ + "by default (none)", + "by default (empty record)", + "when defined explicitly", + ], +) +@pytest.mark.anyio +async def test_update_comic_replaces_assocs(update_comics, gen_comic, options): + original_comic = await DB.add(next(gen_comic)) + new_artist = await DB.add(Artist(name="max")) + + input = { + "artists": {"ids": [new_artist.id]}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, [new_artist]) + + +@pytest.mark.anyio +async def test_update_comic_adds_assocs(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + new_artist = await DB.add(Artist(name="max")) + added_artists = original_comic.artists + [new_artist] + + input = { + "artists": {"ids": [new_artist.id], "options": {"mode": "ADD"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, added_artists) + + +@pytest.mark.anyio +async def test_update_comic_adds_existing_assocs(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + artists = original_comic.artists + + input = { + "artists": { + "ids": [artist.id for artist in artists], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, artists) + + +@pytest.mark.anyio +async def test_update_comic_adds_tags(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + new_namespace = await DB.add(Namespace(name="new")) + new_tag = await DB.add(Tag(name="new")) + added_tags = original_comic.tags + [ + ComicTag(comic_id=original_comic.id, tag=new_tag, namespace=new_namespace) + ] + + input = { + "tags": { + "ids": [f"{new_namespace.id}:{new_tag.id}"], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert_assocs_match(comic.tags, added_tags) + + +@pytest.mark.anyio +async def test_update_comic_adds_existing_tags(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + tags = original_comic.tags + + input = { + "tags": { + "ids": [f"{tag.namespace.id}:{tag.tag.id}" for tag in tags], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert_assocs_match(comic.tags, tags) + + +@pytest.mark.anyio +async def test_update_comic_removes_assocs(update_comics, empty_comic): + original_comic = empty_comic + removed_artist = Artist(id=1, name="sam") + remaining_artist = Artist(id=2, name="max") + original_comic.artists = [removed_artist, remaining_artist] + original_comic = await DB.add(original_comic) + + input = { + "artists": {"ids": [removed_artist.id], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, [remaining_artist]) + + +@pytest.mark.anyio +async def test_update_comic_removes_tags(update_comics, empty_comic): + original_comic = empty_comic + removed_tag = ComicTag( + comic_id=original_comic.id, + tag=Tag(id=1, name="gone"), + namespace=Namespace(id=1, name="all"), + ) + remaining_tag = ComicTag( + comic_id=original_comic.id, + tag=Tag(id=2, name="there"), + namespace=Namespace(id=2, name="still"), + ) + original_comic.tags = [removed_tag, remaining_tag] + original_comic = await DB.add(original_comic) + + input = { + "tags": {"ids": ["1:1"], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.tags, [remaining_tag]) + + +@pytest.mark.parametrize( + "rows,input", + [ + ([], {"title": "Updated Comic"}), + ( + [Artist(id=1, name="artist")], + {"artists": {"ids": [1]}}, + ), + ( + [Artist(id=1, name="artist"), ComicArtist(artist_id=1, comic_id=100)], + {"title": "Updated Comic", "artists": {"ids": [1]}}, + ), + ( + [ + Namespace(id=1, name="ns"), + Tag(id=1, name="artist"), + ], + {"tags": {"ids": ["1:1"]}}, + ), + ( + [ + Namespace(id=1, name="ns"), + Tag(id=1, name="artist"), + ComicTag(namespace_id=1, tag_id=1, comic_id=100), + ], + {"title": "Updated Comic", "tags": {"ids": ["1:1"]}}, + ), + ], + ids=[ + "with scalar", + "with assoc", + "with scalar and existing assoc", + "with tag", + "with scalar and existing tag", + ], +) +@pytest.mark.anyio +async def test_update_comic_changes_updated_at(update_comics, empty_comic, rows, input): + original_comic = empty_comic + original_comic.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_comic = await DB.add(original_comic) + + await DB.add_all(*rows) + + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id) + assert comic.updated_at > original_comic.updated_at + + +@pytest.mark.anyio +async def test_update_comic_cover_fails_page_not_found(update_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await update_comics(comic.id, {"cover": {"id": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_comic_cover_fails_page_remote( + update_comics, gen_comic, gen_archive +): + comic = await DB.add(next(gen_comic)) + other_archive = await DB.add(next(gen_archive)) + remote_id = other_archive.pages[0].id + + response = Response(await update_comics(comic.id, {"cover": {"id": remote_id}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_not_found(update_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await update_comics(comic.id, {"pages": {"ids": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_remote( + update_comics, gen_comic, gen_archive +): + comic = await DB.add(next(gen_comic)) + other_archive = await DB.add(next(gen_archive)) + remote_id = other_archive.pages[0].id + + response = Response(await update_comics(comic.id, {"pages": {"ids": [remote_id]}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_claimed(update_comics, gen_archive): + archive = await DB.add(next(gen_archive)) + + comic = await DB.add( + Comic( + id=1, + title="A Very Good Comic", + archive=archive, + cover=archive.pages[0].image, + pages=[archive.pages[0], archive.pages[1]], + ) + ) + + claiming = await DB.add( + Comic( + id=2, + title="A Very Claiming Comic", + archive=archive, + cover=archive.pages[2].image, + pages=[archive.pages[2], archive.pages[3]], + ) + ) + + claimed_id = claiming.pages[0].id + + response = Response(await update_comics(comic.id, {"pages": {"ids": [claimed_id]}})) + response.assert_is("PageClaimedError") + assert response.id == claimed_id + assert response.comicId == claiming.id + assert ( + response.message + == f"Page ID {claimed_id} is already claimed by comic ID {claiming.id}" + ) + + +@pytest.mark.parametrize( + "mode", + [ + ("REPLACE"), + ("REMOVE"), + ], +) +@pytest.mark.anyio +async def test_update_comic_pages_fails_empty(update_comics, gen_comic, mode): + comic = await DB.add(next(gen_comic)) + + ids = [] if mode == "REPLACE" else [p.id for p in comic.pages] + + response = Response( + await update_comics( + comic.id, {"pages": {"ids": ids, "options": {"mode": mode}}} + ) + ) + response.assert_is("InvalidParameterError") + assert response.parameter == "pages" + assert response.message == "Invalid parameter 'pages': cannot be empty" diff --git a/tests/api/test_comic_tag.py b/tests/api/test_comic_tag.py new file mode 100644 index 0000000..f536b79 --- /dev/null +++ b/tests/api/test_comic_tag.py @@ -0,0 +1,134 @@ +from functools import partial + +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace, Tag + + +@pytest.fixture +def query_comic_tags(schema_execute): + query = """ + query comicTags($forFilter: Boolean) { + comicTags(forFilter: $forFilter) { + edges { + __typename + id + name + } + count + } + } + """ + + def wrapper(q): + async def _execute(for_filter=False): + return await schema_execute(q, {"forFilter": for_filter}) + + return _execute + + return wrapper(query) + + +def build_item(namespace, tag): + nid, tid = "", "" + nname, tname = "", "" + + if namespace: + nid, nname = namespace.id, namespace.name + + if tag: + tid, tname = tag.id, tag.name + + item = { + "__typename": "ComicTag", + "id": f"{nid}:{tid}", + "name": f"{nname}:{tname}", + } + + return item + + +@pytest.mark.anyio +async def test_query_comic_tags_cross(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_foo, ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo, ns_bar]) + + await DB.add_all(ns_foo, ns_bar) + await DB.add_all(tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags()) + assert response.data["edges"] == [ + builder(ns_bar, tag_qar), + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + builder(ns_foo, tag_qoo), + ] + + +@pytest.mark.anyio +async def test_query_comic_tags_restricted_namespace(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo]) + + await DB.add_all(ns_foo, ns_bar) + await DB.add_all(tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags()) + assert response.data["edges"] == [ + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + ] + + +@pytest.mark.anyio +async def test_query_comic_tag_matchers_cross(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_foo, ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo, ns_bar]) + + await DB.add_all(ns_foo, ns_bar, tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags(for_filter=True)) + assert response.data["edges"] == [ + builder(ns_bar, None), + builder(ns_foo, None), + builder(None, tag_qar), + builder(None, tag_qoo), + builder(ns_bar, tag_qar), + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + builder(ns_foo, tag_qoo), + ] + + +@pytest.mark.anyio +async def test_query_comic_tag_matchers_restricted_namespace(query_comic_tags): + ns_foo = Namespace(id=1, name="foo") + ns_bar = Namespace(id=2, name="bar") + tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_bar]) + tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo]) + + await DB.add_all(ns_foo, ns_bar, tag_qoo, tag_qar) + + builder = partial(build_item) + + response = Response(await query_comic_tags(for_filter=True)) + assert response.data["edges"] == [ + builder(ns_bar, None), + builder(ns_foo, None), + builder(None, tag_qar), + builder(None, tag_qoo), + builder(ns_bar, tag_qoo), + builder(ns_foo, tag_qar), + ] diff --git a/tests/api/test_db.py b/tests/api/test_db.py new file mode 100644 index 0000000..f53b90f --- /dev/null +++ b/tests/api/test_db.py @@ -0,0 +1,324 @@ +from datetime import datetime, timedelta, timezone + +import hircine.db as database +import hircine.db.models as models +import hircine.db.ops as ops +import pytest +from conftest import DB +from hircine.db.models import ( + Artist, + Base, + Comic, + ComicTag, + DateTimeUTC, + MixinID, + Namespace, + Tag, + TagNamespaces, +) +from sqlalchemy.exc import StatementError +from sqlalchemy.orm import ( + Mapped, + mapped_column, +) + + +class Date(MixinID, Base): + date: Mapped[datetime] = mapped_column(DateTimeUTC) + + +@pytest.mark.anyio +async def test_db_requires_tzinfo(): + with pytest.raises(StatementError, match="tzinfo is required"): + await DB.add(Date(date=datetime(2019, 4, 22))) + + +@pytest.mark.anyio +async def test_db_converts_date_input_to_utc(): + date = datetime(2019, 4, 22, tzinfo=timezone(timedelta(hours=-4))) + await DB.add(Date(date=date)) + + item = await DB.get(Date, 1) + + assert item.date.tzinfo == timezone.utc + assert item.date == date + + +@pytest.mark.parametrize( + "modelcls,assoccls", + [ + (models.Artist, models.ComicArtist), + (models.Circle, models.ComicCircle), + (models.Character, models.ComicCharacter), + (models.World, models.ComicWorld), + ], + ids=["artists", "circles", "characters", "worlds"], +) +@pytest.mark.anyio +async def test_models_retained_when_clearing_association( + empty_comic, modelcls, assoccls +): + model = modelcls(id=1, name="foo") + key = f"{modelcls.__name__.lower()}s" + + comic = empty_comic + setattr(comic, key, [model]) + comic = await DB.add(comic) + + async with database.session() as s: + object = await s.get(Comic, comic.id) + setattr(object, key, []) + await s.commit() + + assert await DB.get(assoccls, (comic.id, model.id)) is None + assert await DB.get(Comic, comic.id) is not None + assert await DB.get(modelcls, model.id) is not None + + +@pytest.mark.anyio +async def test_models_retained_when_clearing_comictag(empty_comic): + comic = await DB.add(empty_comic) + + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="bar") + ct = ComicTag(comic_id=comic.id, namespace=namespace, tag=tag) + + await DB.add(ct) + + async with database.session() as s: + object = await s.get(Comic, comic.id) + object.tags = [] + await s.commit() + + assert await DB.get(ComicTag, (comic.id, ct.namespace_id, ct.tag_id)) is None + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Tag, tag.id) is not None + assert await DB.get(Comic, comic.id) is not None + + +@pytest.mark.parametrize( + "modelcls,assoccls", + [ + (models.Artist, models.ComicArtist), + (models.Circle, models.ComicCircle), + (models.Character, models.ComicCharacter), + (models.World, models.ComicWorld), + ], + ids=["artists", "circles", "characters", "worlds"], +) +@pytest.mark.anyio +async def test_only_association_cleared_when_deleting(empty_comic, modelcls, assoccls): + model = modelcls(id=1, name="foo") + + comic = empty_comic + setattr(comic, f"{modelcls.__name__.lower()}s", [model]) + comic = await DB.add(comic) + + await DB.delete(modelcls, model.id) + assert await DB.get(assoccls, (comic.id, model.id)) is None + assert await DB.get(Comic, comic.id) is not None + + +@pytest.mark.parametrize( + "deleted", + [ + "namespace", + "tag", + ], +) +@pytest.mark.anyio +async def test_only_comictag_association_cleared_when_deleting(empty_comic, deleted): + comic = await DB.add(empty_comic) + + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="bar") + + await DB.add(ComicTag(comic_id=comic.id, namespace=namespace, tag=tag)) + + if deleted == "namespace": + await DB.delete(Namespace, namespace.id) + elif deleted == "tag": + await DB.delete(Tag, tag.id) + + assert await DB.get(ComicTag, (comic.id, namespace.id, tag.id)) is None + if deleted == "namespace": + assert await DB.get(Tag, tag.id) is not None + elif deleted == "tag": + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Comic, comic.id) is not None + + +@pytest.mark.parametrize( + "modelcls,assoccls", + [ + (models.Artist, models.ComicArtist), + (models.Circle, models.ComicCircle), + (models.Character, models.ComicCharacter), + (models.World, models.ComicWorld), + ], + ids=["artists", "circles", "characters", "worlds"], +) +@pytest.mark.anyio +async def test_deleting_comic_only_clears_association(empty_comic, modelcls, assoccls): + model = modelcls(id=1, name="foo") + + comic = empty_comic + setattr(comic, f"{modelcls.__name__.lower()}s", [model]) + comic = await DB.add(comic) + + await DB.delete(Comic, comic.id) + assert await DB.get(assoccls, (comic.id, model.id)) is None + assert await DB.get(modelcls, model.id) is not None + + +@pytest.mark.anyio +async def test_deleting_comic_only_clears_comictag(empty_comic): + comic = await DB.add(empty_comic) + + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="bar") + + await DB.add(ComicTag(comic_id=comic.id, namespace=namespace, tag=tag)) + await DB.delete(Comic, comic.id) + + assert await DB.get(ComicTag, (comic.id, namespace.id, tag.id)) is None + assert await DB.get(Tag, tag.id) is not None + assert await DB.get(Namespace, namespace.id) is not None + + +@pytest.mark.anyio +async def test_models_retained_when_clearing_tagnamespace(): + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="foo", namespaces=[namespace]) + + tag = await DB.add(tag) + + async with database.session() as s: + db_tag = await s.get(Tag, tag.id, options=Tag.load_full()) + db_tag.namespaces = [] + await s.commit() + + assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Tag, tag.id) is not None + + +@pytest.mark.anyio +async def test_only_tagnamespace_cleared_when_deleting_tag(): + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="foo", namespaces=[namespace]) + + tag = await DB.add(tag) + + await DB.delete(Tag, tag.id) + + assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None + assert await DB.get(Namespace, namespace.id) is not None + assert await DB.get(Tag, tag.id) is None + + +@pytest.mark.anyio +async def test_only_tagnamespace_cleared_when_deleting_namespace(): + namespace = Namespace(id=1, name="foo") + tag = Tag(id=1, name="foo", namespaces=[namespace]) + + tag = await DB.add(tag) + + await DB.delete(Namespace, namespace.id) + + assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None + assert await DB.get(Namespace, namespace.id) is None + assert await DB.get(Tag, tag.id) is not None + + +@pytest.mark.parametrize( + "use_identity_map", + [False, True], + ids=["without identity lookup", "with identity lookup"], +) +@pytest.mark.anyio +async def test_ops_get_all(gen_artist, use_identity_map): + artist = await DB.add(next(gen_artist)) + have = list(await DB.add_all(*gen_artist)) + have.append(artist) + + missing_ids = [10, 20] + + async with database.session() as s: + if use_identity_map: + s.add(artist) + + artists, missing = await ops.get_all( + s, + Artist, + [a.id for a in have] + missing_ids, + use_identity_map=use_identity_map, + ) + + assert set([a.id for a in artists]) == set([a.id for a in have]) + assert missing == set(missing_ids) + + +@pytest.mark.anyio +async def test_ops_get_all_names(gen_artist): + have = await DB.add_all(*gen_artist) + missing_names = ["arty", "farty"] + + async with database.session() as s: + artists, missing = await ops.get_all_names( + s, Artist, [a.name for a in have] + missing_names + ) + + assert set([a.name for a in artists]) == set([a.name for a in have]) + assert missing == set(missing_names) + + +@pytest.mark.parametrize( + "missing", + [[("foo", "bar"), ("qux", "qaz")], []], + ids=["missing", "no missing"], +) +@pytest.mark.anyio +async def test_ops_get_ctag_names(gen_comic, gen_tag, gen_namespace, missing): + comic = await DB.add(next(gen_comic)) + have = [(ct.namespace.name, ct.tag.name) for ct in comic.tags] + + async with database.session() as s: + cts, missing = await ops.get_ctag_names(s, comic.id, have + missing) + + assert set(have) == set([(ct.namespace.name, ct.tag.name) for ct in cts]) + assert missing == set(missing) + + +@pytest.mark.anyio +async def test_ops_lookup_identity(gen_artist): + one = await DB.add(next(gen_artist)) + two = await DB.add(next(gen_artist)) + rest = await DB.add_all(*gen_artist) + + async with database.session() as s: + get_one = await s.get(Artist, one.id) + get_two = await s.get(Artist, two.id) + s.add(get_one, get_two) + + artists, satisfied = ops.lookup_identity( + s, Artist, [a.id for a in [one, two] + list(rest)] + ) + + assert set([a.name for a in artists]) == set([a.name for a in [one, two]]) + assert satisfied == set([one.id, two.id]) + + +@pytest.mark.anyio +async def test_ops_get_image_orphans(gen_archive, gen_image): + await DB.add(next(gen_archive)) + + orphan_one = await DB.add(next(gen_image)) + orphan_two = await DB.add(next(gen_image)) + + async with database.session() as s: + orphans = set(await ops.get_image_orphans(s)) + + assert orphans == set( + [(orphan_one.id, orphan_one.hash), (orphan_two.id, orphan_two.hash)] + ) diff --git a/tests/api/test_filter.py b/tests/api/test_filter.py new file mode 100644 index 0000000..67a953f --- /dev/null +++ b/tests/api/test_filter.py @@ -0,0 +1,521 @@ +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace, Tag + + +@pytest.fixture +def query_comic_filter(execute_filter): + query = """ + query comics($filter: ComicFilterInput) { + comics(filter: $filter) { + __typename + count + edges { + id + title + } + } + } + """ + + return execute_filter(query) + + +@pytest.fixture +def query_string_filter(execute_filter): + query = """ + query artists($filter: ArtistFilterInput) { + artists(filter: $filter) { + __typename + count + edges { + id + name + } + } + } + """ + + return execute_filter(query) + + +@pytest.fixture +def query_tag_filter(execute_filter): + query = """ + query tags($filter: TagFilterInput) { + tags(filter: $filter) { + __typename + count + edges { + id + name + } + } + } + """ + + return execute_filter(query) + + +def id_list(edges): + return sorted([int(edge["id"]) for edge in edges]) + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"name": {"contains": "robin"}}}, + [3, 4], + ), + ({"exclude": {"name": {"contains": "smith"}}}, [2, 3]), + ( + { + "exclude": {"name": {"contains": "robin"}}, + "include": {"name": {"contains": "smith"}}, + }, + [1], + ), + ], + ids=[ + "includes", + "excludes", + "includes and excludes", + ], +) +@pytest.mark.anyio +async def test_string_filter(query_string_filter, gen_artist, filter, ids): + await DB.add_all(*gen_artist) + + response = Response(await query_string_filter(filter)) + response.assert_is("ArtistFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,empty_response", + [ + ({"include": {"name": {"contains": ""}}}, False), + ({"include": {"name": {}}}, False), + ({"exclude": {"name": {"contains": ""}}}, True), + ({"exclude": {"name": {}}}, False), + ], + ids=[ + "string (include)", + "field (include)", + "string (exclude)", + "field (exclude)", + ], +) +@pytest.mark.anyio +async def test_string_filter_handles_empty( + query_string_filter, gen_artist, filter, empty_response +): + artists = await DB.add_all(*gen_artist) + + response = Response(await query_string_filter(filter)) + response.assert_is("ArtistFilterResult") + + if empty_response: + assert response.edges == [] + assert response.count == 0 + else: + assert len(response.edges) == len(artists) + assert response.count == len(artists) + + +@pytest.mark.parametrize( + "filter", + [ + {"include": {}}, + {"exclude": {}}, + ], + ids=[ + "include", + "exclude", + ], +) +@pytest.mark.anyio +async def test_filter_handles_empty_field(query_string_filter, gen_artist, filter): + artists = await DB.add_all(*gen_artist) + + response = Response(await query_string_filter(filter)) + response.assert_is("ArtistFilterResult") + + assert len(response.edges) == len(artists) + assert response.count == len(artists) + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"artists": {"any": 1}}}, + [1, 3], + ), + ( + {"include": {"artists": {"all": 1}}}, + [1, 3], + ), + ( + {"include": {"artists": {"any": [1, 4]}}}, + [1, 3, 4], + ), + ( + {"include": {"artists": {"all": [1, 4]}}}, + [3], + ), + ( + {"exclude": {"artists": {"any": 1}}}, + [2, 4], + ), + ( + {"exclude": {"artists": {"all": 1}}}, + [2, 4], + ), + ( + {"exclude": {"artists": {"any": [1, 4]}}}, + [2], + ), + ( + {"exclude": {"artists": {"all": [1, 4]}}}, + [1, 2, 4], + ), + ( + { + "include": {"artists": {"any": [1]}}, + "exclude": {"artists": {"all": [4]}}, + }, + [1], + ), + ( + { + "include": {"artists": {"any": [1, 4]}}, + "exclude": {"artists": {"all": [1, 4]}}, + }, + [1, 4], + ), + ( + { + "include": {"artists": {"any": [1, 4], "all": [1, 2]}}, + }, + [1], + ), + ], + ids=[ + "includes any (single)", + "includes all (single)", + "includes any (list)", + "includes all (list)", + "excludes any (single)", + "excludes all (single)", + "excludes any (list)", + "excludes all (list)", + "includes and excludes (single)", + "includes and excludes (list)", + "includes any and all", + ], +) +@pytest.mark.anyio +async def test_assoc_filter(query_comic_filter, gen_comic, filter, ids): + await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,empty_response", + [ + ({"include": {"artists": {"any": []}}}, True), + ({"include": {"artists": {"all": []}}}, True), + ({"include": {"artists": {}}}, False), + ({"exclude": {"artists": {"any": []}}}, False), + ({"exclude": {"artists": {"all": []}}}, False), + ({"exclude": {"artists": {}}}, False), + ({"include": {"tags": {"any": ""}}}, True), + ({"include": {"tags": {"any": ":"}}}, True), + ], + ids=[ + "list (include any)", + "list (include all)", + "field (include)", + "list (exclude any)", + "list (exclude all)", + "field (exclude)", + "string (tags)", + "specifier (tags)", + ], +) +@pytest.mark.anyio +async def test_assoc_filter_handles_empty( + query_comic_filter, gen_comic, filter, empty_response +): + comics = await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + + response.assert_is("ComicFilterResult") + + if empty_response: + assert response.edges == [] + assert response.count == 0 + else: + assert len(response.edges) == len(comics) + assert response.count == len(comics) + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"tags": {"any": "1:"}}}, + [1, 2, 3], + ), + ( + {"include": {"tags": {"any": ":2"}}}, + [1, 4], + ), + ( + {"include": {"tags": {"exact": ["1:3", "2:1"]}}}, + [2], + ), + ( + {"include": {"tags": {"exact": ["1:"]}}}, + [3], + ), + ( + {"include": {"tags": {"exact": [":4"]}}}, + [3], + ), + ( + {"exclude": {"tags": {"all": ["1:4", "1:1"]}}}, + [2, 3, 4], + ), + ( + {"exclude": {"tags": {"exact": ["2:1", "2:2", "2:3"]}}}, + [1, 2, 3], + ), + ( + {"exclude": {"tags": {"exact": ["1:"]}}}, + [1, 2, 4], + ), + ( + {"exclude": {"tags": {"exact": [":4"]}}}, + [1, 2, 4], + ), + ], + ids=[ + "includes any namespace", + "includes any tag", + "includes exact tags", + "includes exact namespace", + "includes exact tag", + "excludes all tags", + "includes exact tags", + "includes exact namespace", + "includes exact tag", + ], +) +@pytest.mark.anyio +async def test_assoc_tag_filter(query_comic_filter, gen_comic, filter, ids): + await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"favourite": True}}, + [1], + ), + ( + {"include": {"rating": {"any": "EXPLICIT"}}}, + [3], + ), + ( + {"include": {"category": {"any": "MANGA"}}}, + [1, 2], + ), + ( + {"include": {"censorship": {"any": "MOSAIC"}}}, + [3], + ), + ( + {"exclude": {"favourite": True}}, + [2, 3, 4], + ), + ( + {"exclude": {"rating": {"any": ["EXPLICIT", "QUESTIONABLE"]}}}, + [1, 4], + ), + ], + ids=[ + "includes favourite", + "includes rating", + "includes category", + "includes censorship", + "excludes favourite", + "excludes ratings", + ], +) +@pytest.mark.anyio +async def test_field_filter(query_comic_filter, gen_comic, filter, ids): + await DB.add_all(*gen_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"rating": {"empty": True}}}, + [100], + ), + ( + {"include": {"rating": {"empty": False}}}, + [1, 2], + ), + ( + {"exclude": {"rating": {"empty": True}}}, + [1, 2], + ), + ( + {"exclude": {"rating": {"empty": False}}}, + [100], + ), + ], + ids=[ + "includes rating empty", + "includes rating not empty", + "excludes rating empty", + "excludes rating not empty", + ], +) +@pytest.mark.anyio +async def test_field_presence(query_comic_filter, gen_comic, empty_comic, filter, ids): + await DB.add(next(gen_comic)) + await DB.add(next(gen_comic)) + await DB.add(empty_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"artists": {"empty": True}}}, + [100], + ), + ( + {"include": {"artists": {"empty": False}}}, + [1, 2], + ), + ( + {"exclude": {"artists": {"empty": True}}}, + [1, 2], + ), + ( + {"exclude": {"artists": {"empty": False}}}, + [100], + ), + ( + {"include": {"tags": {"empty": True}}}, + [100], + ), + ( + {"include": {"tags": {"empty": False}}}, + [1, 2], + ), + ( + {"exclude": {"tags": {"empty": True}}}, + [1, 2], + ), + ( + {"exclude": {"tags": {"empty": False}}}, + [100], + ), + ], + ids=[ + "includes artist empty", + "includes artist not empty", + "excludes artist empty", + "excludes artist not empty", + "includes tags empty", + "includes tags not empty", + "excludes tags empty", + "excludes tags not empty", + ], +) +@pytest.mark.anyio +async def test_assoc_presence(query_comic_filter, gen_comic, empty_comic, filter, ids): + await DB.add(next(gen_comic)) + await DB.add(next(gen_comic)) + await DB.add(empty_comic) + + response = Response(await query_comic_filter(filter)) + response.assert_is("ComicFilterResult") + + assert id_list(response.edges) == ids + + +@pytest.mark.parametrize( + "filter,ids", + [ + ( + {"include": {"namespaces": {"any": 1}}}, + [1, 2], + ), + ( + {"include": {"namespaces": {"all": [1, 2]}}}, + [2], + ), + ( + {"include": {"namespaces": {"exact": [1]}}}, + [1], + ), + ( + {"exclude": {"namespaces": {"any": 2}}}, + [1], + ), + ( + {"exclude": {"namespaces": {"exact": [1]}}}, + [2], + ), + ], + ids=[ + "includes any namespace", + "includes all namespace", + "includes exact namespaces", + "excludes any namespace", + "excludes exact namespaces", + ], +) +@pytest.mark.anyio +async def test_tag_assoc_filter(query_tag_filter, gen_namespace, gen_tag, filter, ids): + foo = await DB.add(Namespace(id=1, name="foo")) + bar = await DB.add(Namespace(id=2, name="bar")) + + await DB.add(Tag(id=1, name="small", namespaces=[foo])) + await DB.add(Tag(id=2, name="large", namespaces=[foo, bar])) + + response = Response(await query_tag_filter(filter)) + response.assert_is("TagFilterResult") + + assert id_list(response.edges) == ids diff --git a/tests/api/test_image.py b/tests/api/test_image.py new file mode 100644 index 0000000..c8c26b3 --- /dev/null +++ b/tests/api/test_image.py @@ -0,0 +1,16 @@ +import pytest +from conftest import DB +from hircine.api.types import Image + + +@pytest.mark.anyio +async def test_image(gen_image): + images = await DB.add_all(*gen_image) + + for db_image in images: + image = Image(db_image) + assert image.id == db_image.id + assert image.hash == db_image.hash + assert image.width == db_image.width + assert image.height == db_image.height + assert image.aspect_ratio == db_image.width / db_image.height diff --git a/tests/api/test_namespace.py b/tests/api/test_namespace.py new file mode 100644 index 0000000..450075b --- /dev/null +++ b/tests/api/test_namespace.py @@ -0,0 +1,291 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace + + +@pytest.fixture +def query_namespace(execute_id): + query = """ + query namespace($id: Int!) { + namespace(id: $id) { + __typename + ... on Namespace { + id + name + sortName + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_namespaces(execute): + query = """ + query namespaces { + namespaces { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_namespace(execute_add): + mutation = """ + mutation addNamespace($input: AddNamespaceInput!) { + addNamespace(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_namespaces(execute_update): + mutation = """ + mutation updateNamespaces($ids: [Int!]!, $input: UpdateNamespaceInput!) { + updateNamespaces(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_namespaces(execute_delete): + mutation = """ + mutation deleteNamespaces($ids: [Int!]!) { + deleteNamespaces(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_namespace(query_namespace, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + + response = Response(await query_namespace(namespace.id)) + response.assert_is("Namespace") + + assert response.id == namespace.id + assert response.name == namespace.name + assert response.sortName == namespace.sort_name + + +@pytest.mark.anyio +async def test_query_namespace_fails_not_found(query_namespace): + response = Response(await query_namespace(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_namespaces(query_namespaces, gen_namespace): + namespaces = await DB.add_all(*gen_namespace) + response = Response(await query_namespaces()) + response.assert_is("NamespaceFilterResult") + + assert response.count == len(namespaces) + assert isinstance((response.edges), list) + assert len(response.edges) == len(namespaces) + + edges = iter(response.edges) + for namespace in sorted(namespaces, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == namespace.id + assert edge["name"] == namespace.name + + +@pytest.mark.anyio +async def test_add_namespace(add_namespace): + response = Response(await add_namespace({"name": "added", "sortName": "foo"})) + response.assert_is("AddSuccess") + + namespace = await DB.get(Namespace, response.id) + assert namespace is not None + assert namespace.name == "added" + assert namespace.sort_name == "foo" + + +@pytest.mark.anyio +async def test_add_namespace_fails_empty_parameter(add_namespace): + response = Response(await add_namespace({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_namespace_fails_exists(add_namespace, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + + response = Response(await add_namespace({"name": namespace.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Namespace with this name exists" + + +@pytest.mark.anyio +async def test_delete_namespace(delete_namespaces, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + id = namespace.id + + response = Response(await delete_namespaces(id)) + response.assert_is("DeleteSuccess") + + namespace = await DB.get(Namespace, id) + assert namespace is None + + +@pytest.mark.anyio +async def test_delete_namespace_not_found(delete_namespaces): + response = Response(await delete_namespaces(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_namespace(update_namespaces, gen_namespace): + namespace = await DB.add(next(gen_namespace)) + + input = {"name": "updated", "sortName": "foo"} + response = Response(await update_namespaces(namespace.id, input)) + response.assert_is("UpdateSuccess") + + namespace = await DB.get(Namespace, namespace.id) + assert namespace is not None + assert namespace.name == "updated" + assert namespace.sort_name == "foo" + + +@pytest.mark.anyio +async def test_update_namespace_fails_exists(update_namespaces, gen_namespace): + first = await DB.add(next(gen_namespace)) + second = await DB.add(next(gen_namespace)) + + response = Response(await update_namespaces(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Namespace with this name exists" + + +@pytest.mark.anyio +async def test_update_namespace_fails_not_found(update_namespaces): + response = Response(await update_namespaces(1, {"name": "updated"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_namespaces_cannot_bulk_edit_name( + update_namespaces, gen_namespace +): + first = await DB.add(next(gen_namespace)) + second = await DB.add(next(gen_namespace)) + + response = Response( + await update_namespaces([first.id, second.id], {"name": "unique"}) + ) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_namespace_fails_empty_parameter( + update_namespaces, gen_namespace, empty +): + namespace = await DB.add(next(gen_namespace)) + response = Response(await update_namespaces(namespace.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_namespace_changes_updated_at(update_namespaces): + original_namespace = Namespace(name="namespace") + original_namespace.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_namespace = await DB.add(original_namespace) + + response = Response( + await update_namespaces(original_namespace.id, {"name": "updated"}) + ) + response.assert_is("UpdateSuccess") + + namespace = await DB.get(Namespace, original_namespace.id) + assert namespace.updated_at > original_namespace.updated_at diff --git a/tests/api/test_page.py b/tests/api/test_page.py new file mode 100644 index 0000000..debd69a --- /dev/null +++ b/tests/api/test_page.py @@ -0,0 +1,39 @@ +from datetime import datetime, timezone + +import pytest +from conftest import DB +from hircine.api.types import Page +from hircine.db.models import Archive + + +@pytest.mark.anyio +async def test_page(gen_page): + pages = list(gen_page) + images = [p.image for p in pages] + + # persist images and pages in database by binding them to a throwaway + # archive + archive = await DB.add( + Archive( + hash="339e3a32e5648fdeb2597f05cb2e1ef6", + path="some/archive.zip", + size=78631597, + mtime=datetime(1999, 12, 27, tzinfo=timezone.utc), + cover=images[0], + pages=pages, + page_count=len(pages), + ) + ) + + assert len(archive.pages) == len(pages) + + page_iter = iter(pages) + image_iter = iter(images) + for page in [Page(p) for p in archive.pages]: + matching_page = next(page_iter) + matching_image = next(image_iter) + + assert page.id == matching_page.id + assert page.comic_id is None + assert page.image.id == matching_image.id + assert page.path == matching_page.path diff --git a/tests/api/test_pagination.py b/tests/api/test_pagination.py new file mode 100644 index 0000000..67854c3 --- /dev/null +++ b/tests/api/test_pagination.py @@ -0,0 +1,61 @@ +import pytest +from conftest import DB, Response + + +@pytest.fixture +def query_pagination(schema_execute): + query = """ + query artists($pagination: Pagination) { + artists(pagination: $pagination) { + __typename + count + edges { + id + } + } + } + """ + + async def _execute(pagination=None): + return await schema_execute( + query, {"pagination": pagination} if pagination else None + ) + + return _execute + + +@pytest.mark.parametrize( + "pagination,count,length", + [ + (None, 4, 4), + ({"items": 3, "page": 1}, 4, 3), + ({"items": 3, "page": 2}, 4, 1), + ({"items": 3, "page": 3}, 0, 0), + ({"items": 10, "page": 1}, 4, 4), + ({"items": 0, "page": 1}, 0, 0), + ({"items": 2, "page": 0}, 0, 0), + ({"items": -1, "page": 0}, 0, 0), + ({"items": 0, "page": -1}, 0, 0), + ], + ids=[ + "is missing and lists all", + "lists first page", + "lists last page", + "lists none (no more items)", + "lists all", + "lists none (zero items)", + "lists none (page zero)", + "lists none (negative items)", + "lists none (negative page)", + ], +) +@pytest.mark.anyio +async def test_pagination(query_pagination, gen_artist, pagination, count, length): + await DB.add_all(*gen_artist) + + response = Response(await query_pagination(pagination)) + response.assert_is("ArtistFilterResult") + + assert response.count == count + assert isinstance((response.edges), list) + assert len(response.edges) == length diff --git a/tests/api/test_scraper_api.py b/tests/api/test_scraper_api.py new file mode 100644 index 0000000..1edd74f --- /dev/null +++ b/tests/api/test_scraper_api.py @@ -0,0 +1,395 @@ +import hircine.enums as enums +import hircine.plugins +import hircine.scraper.types as scraped +import pytest +from conftest import DB, Response +from hircine.scraper import ScrapeError, Scraper, ScrapeWarning + + +@pytest.fixture +def query_comic_scrapers(schema_execute): + query = """ + query comicScrapers($id: Int!) { + comicScrapers(id: $id) { + __typename + id + name + } + } + """ + + async def _execute(id): + return await schema_execute(query, {"id": id}) + + return _execute + + +@pytest.fixture +def query_scrape_comic(schema_execute): + query = """ + query scrapeComic($id: Int!, $scraper: String!) { + scrapeComic(id: $id, scraper: $scraper) { + __typename + ... on ScrapeComicResult { + data { + title + originalTitle + url + artists + category + censorship + characters + circles + date + direction + language + layout + rating + tags + worlds + } + warnings + } + ... on Error { + message + } + ... on ScraperNotFoundError { + name + } + ... on ScraperNotAvailableError { + scraper + comicId + } + ... on IDNotFoundError { + id + } + } + } + """ + + async def _execute(id, scraper): + return await schema_execute(query, {"id": id, "scraper": scraper}) + + return _execute + + +@pytest.fixture +def scrapers(empty_plugins): + class GoodScraper(Scraper): + name = "Good Scraper" + is_available = True + source = "good" + + def scrape(self): + yield scraped.Title("Arid Savannah Adventures") + yield scraped.OriginalTitle("Arid Savannah Hijinx") + yield scraped.URL("file:///home/savannah/adventures") + yield scraped.Language(enums.Language.EN) + yield scraped.Date.from_iso("2010-07-05") + yield scraped.Direction(enums.Direction["LEFT_TO_RIGHT"]) + yield scraped.Layout(enums.Layout.SINGLE) + yield scraped.Rating(enums.Rating.SAFE) + yield scraped.Category(enums.Category.MANGA) + yield scraped.Censorship(enums.Censorship.NONE) + yield scraped.Tag.from_string("animal:small") + yield scraped.Tag.from_string("animal:medium") + yield scraped.Tag.from_string("animal:big") + yield scraped.Tag.from_string("animal:massive") + yield scraped.Artist("alan smithee") + yield scraped.Artist("david agnew") + yield scraped.Character("greta giraffe") + yield scraped.Character("bob bear") + yield scraped.Character("rico rhinoceros") + yield scraped.Character("ziggy zebra") + yield scraped.Circle("archimedes") + yield scraped.World("animal friends") + + class DuplicateScraper(Scraper): + name = "Duplicate Scraper" + is_available = True + source = "dupe" + + def gen(self): + yield scraped.Title("Arid Savannah Adventures") + yield scraped.OriginalTitle("Arid Savannah Hijinx") + yield scraped.URL("file:///home/savannah/adventures") + yield scraped.Language(enums.Language.EN) + yield scraped.Date.from_iso("2010-07-05") + yield scraped.Direction(enums.Direction["LEFT_TO_RIGHT"]) + yield scraped.Layout(enums.Layout.SINGLE) + yield scraped.Rating(enums.Rating.SAFE) + yield scraped.Category(enums.Category.MANGA) + yield scraped.Censorship(enums.Censorship.NONE) + yield scraped.Tag.from_string("animal:small") + yield scraped.Tag.from_string("animal:medium") + yield scraped.Tag.from_string("animal:big") + yield scraped.Tag.from_string("animal:massive") + yield scraped.Artist("alan smithee") + yield scraped.Artist("david agnew") + yield scraped.Character("greta giraffe") + yield scraped.Character("bob bear") + yield scraped.Character("rico rhinoceros") + yield scraped.Character("ziggy zebra") + yield scraped.Circle("archimedes") + yield scraped.World("animal friends") + + def scrape(self): + yield from self.gen() + yield from self.gen() + + class WarnScraper(Scraper): + name = "Warn Scraper" + is_available = True + source = "warn" + + def warn_on_purpose(self, item): + raise ScrapeWarning(f"Could not parse: {item}") + + def scrape(self): + yield scraped.Title("Arid Savannah Adventures") + yield lambda: self.warn_on_purpose("Arid Savannah Hijinx") + yield scraped.Language(enums.Language.EN) + + class FailScraper(Scraper): + name = "Fail Scraper" + is_available = True + source = "fail" + + def scrape(self): + yield scraped.Title("Arid Savannah Adventures") + raise ScrapeError("Could not continue") + yield scraped.Language(enums.Language.EN) + + class UnavailableScraper(Scraper): + name = "Unavailable Scraper" + is_available = False + source = "unavail" + + def scrape(self): + yield None + + hircine.plugins.register_scraper("good", GoodScraper) + hircine.plugins.register_scraper("dupe", DuplicateScraper) + hircine.plugins.register_scraper("warn", WarnScraper) + hircine.plugins.register_scraper("fail", FailScraper) + hircine.plugins.register_scraper("unavail", UnavailableScraper) + + return [ + ("good", GoodScraper), + ("dupe", DuplicateScraper), + ("warn", WarnScraper), + ("fail", FailScraper), + ("unavail", UnavailableScraper), + ] + + +@pytest.mark.anyio +async def test_comic_scrapers(gen_comic, query_comic_scrapers, scrapers): + comic = await DB.add(next(gen_comic)) + response = Response(await query_comic_scrapers(comic.id)) + + assert isinstance((response.data), list) + + available_scrapers = [] + for name, cls in sorted(scrapers, key=lambda s: s[1].name): + instance = cls(comic) + if instance.is_available: + available_scrapers.append((name, cls)) + + assert len(response.data) == len(available_scrapers) + + data = iter(response.data) + for id, scraper in available_scrapers: + field = next(data) + assert field["__typename"] == "ComicScraper" + assert field["id"] == id + assert field["name"] == scraper.name + + +@pytest.mark.anyio +async def test_comic_empty_for_missing_comic(gen_comic, query_comic_scrapers, scrapers): + response = Response(await query_comic_scrapers(1)) + + assert response.data == [] + + +@pytest.mark.anyio +async def test_scrape_comic(gen_comic, query_scrape_comic, scrapers): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "good")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == [] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] == "Arid Savannah Hijinx" + assert scraped_comic["url"] == "file:///home/savannah/adventures" + assert scraped_comic["language"] == "EN" + assert scraped_comic["date"] == "2010-07-05" + assert scraped_comic["rating"] == "SAFE" + assert scraped_comic["category"] == "MANGA" + assert scraped_comic["direction"] == "LEFT_TO_RIGHT" + assert scraped_comic["layout"] == "SINGLE" + assert scraped_comic["tags"] == [ + "animal:small", + "animal:medium", + "animal:big", + "animal:massive", + ] + assert scraped_comic["artists"] == ["alan smithee", "david agnew"] + assert scraped_comic["characters"] == [ + "greta giraffe", + "bob bear", + "rico rhinoceros", + "ziggy zebra", + ] + assert scraped_comic["circles"] == ["archimedes"] + assert scraped_comic["worlds"] == ["animal friends"] + + +@pytest.mark.anyio +async def test_scrape_comic_removes_duplicates(gen_comic, query_scrape_comic, scrapers): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "dupe")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == [] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] == "Arid Savannah Hijinx" + assert scraped_comic["url"] == "file:///home/savannah/adventures" + assert scraped_comic["language"] == "EN" + assert scraped_comic["date"] == "2010-07-05" + assert scraped_comic["rating"] == "SAFE" + assert scraped_comic["category"] == "MANGA" + assert scraped_comic["direction"] == "LEFT_TO_RIGHT" + assert scraped_comic["layout"] == "SINGLE" + assert scraped_comic["tags"] == [ + "animal:small", + "animal:medium", + "animal:big", + "animal:massive", + ] + assert scraped_comic["artists"] == ["alan smithee", "david agnew"] + assert scraped_comic["characters"] == [ + "greta giraffe", + "bob bear", + "rico rhinoceros", + "ziggy zebra", + ] + assert scraped_comic["circles"] == ["archimedes"] + assert scraped_comic["worlds"] == ["animal friends"] + + +@pytest.mark.anyio +async def test_scrape_comic_fails_comic_not_found(query_scrape_comic, scrapers): + response = Response(await query_scrape_comic(1, "good")) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.anyio +async def test_scrape_comic_fails_scraper_not_found( + gen_comic, query_scrape_comic, scrapers +): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "missing")) + response.assert_is("ScraperNotFoundError") + + assert response.name == "missing" + assert response.message == "Scraper not found: 'missing'" + + +@pytest.mark.anyio +async def test_scrape_comic_fails_scraper_not_available( + gen_comic, query_scrape_comic, scrapers +): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "unavail")) + response.assert_is("ScraperNotAvailableError") + + assert response.scraper == "unavail" + assert response.comicId == comic.id + assert response.message == f"Scraper unavail not available for comic ID {comic.id}" + + +async def test_scrape_comic_with_transformer(gen_comic, query_scrape_comic, scrapers): + def keep(generator, info): + for item in generator: + match item: + case scraped.Title(): + yield item + + hircine.plugins.transformers = [keep] + + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "good")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == [] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] is None + assert scraped_comic["url"] is None + assert scraped_comic["language"] is None + assert scraped_comic["date"] is None + assert scraped_comic["rating"] is None + assert scraped_comic["category"] is None + assert scraped_comic["censorship"] is None + assert scraped_comic["direction"] is None + assert scraped_comic["layout"] is None + assert scraped_comic["tags"] == [] + assert scraped_comic["artists"] == [] + assert scraped_comic["characters"] == [] + assert scraped_comic["circles"] == [] + assert scraped_comic["worlds"] == [] + + +@pytest.mark.anyio +async def test_scrape_comic_catches_warnings(gen_comic, query_scrape_comic, scrapers): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "warn")) + response.assert_is("ScrapeComicResult") + + assert response.warnings == ["Could not parse: Arid Savannah Hijinx"] + + scraped_comic = response.data["data"] + + assert scraped_comic["title"] == "Arid Savannah Adventures" + assert scraped_comic["originalTitle"] is None + assert scraped_comic["language"] == "EN" + assert scraped_comic["date"] is None + assert scraped_comic["rating"] is None + assert scraped_comic["category"] is None + assert scraped_comic["direction"] is None + assert scraped_comic["layout"] is None + assert scraped_comic["tags"] == [] + assert scraped_comic["artists"] == [] + assert scraped_comic["characters"] == [] + assert scraped_comic["circles"] == [] + assert scraped_comic["worlds"] == [] + + +@pytest.mark.anyio +async def test_scrape_comic_fails_with_scraper_error( + gen_comic, query_scrape_comic, scrapers +): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_scrape_comic(comic.id, "fail")) + response.assert_is("ScraperError") + assert response.message == "Scraping failed: Could not continue" diff --git a/tests/api/test_sort.py b/tests/api/test_sort.py new file mode 100644 index 0000000..b3c8562 --- /dev/null +++ b/tests/api/test_sort.py @@ -0,0 +1,137 @@ +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace + + +@pytest.fixture +def query_comic_sort(execute_sort): + query = """ + query comics($sort: ComicSortInput) { + comics(sort: $sort) { + __typename + count + edges { + id + title + } + } + } + """ + + return execute_sort(query) + + +@pytest.fixture +def query_namespace_sort(execute_sort): + query = """ + query namespaces($sort: NamespaceSortInput) { + namespaces(sort: $sort) { + __typename + count + edges { + id + name + } + } + } + """ + + return execute_sort(query) + + +@pytest.mark.parametrize( + "sort,reverse", + [ + ({"on": "DATE"}, False), + ({"on": "DATE", "direction": "DESCENDING"}, True), + ({"on": "DATE", "direction": "ASCENDING"}, False), + ], + ids=[ + "ascending (default)", + "descending", + "ascending", + ], +) +@pytest.mark.anyio +async def test_query_comics_sort_date(gen_comic, query_comic_sort, sort, reverse): + comics = await DB.add_all(*gen_comic) + ids = [c.id for c in sorted(comics, key=lambda c: c.date, reverse=reverse)] + + response = Response(await query_comic_sort(sort)) + response.assert_is("ComicFilterResult") + + assert ids == [edge["id"] for edge in response.edges] + + +@pytest.mark.parametrize( + "sort,reverse", + [ + ({"on": "TAG_COUNT"}, False), + ({"on": "TAG_COUNT", "direction": "DESCENDING"}, True), + ({"on": "TAG_COUNT", "direction": "ASCENDING"}, False), + ], + ids=[ + "ascending (default)", + "descending", + "ascending", + ], +) +@pytest.mark.anyio +async def test_query_comics_sort_tag_count(gen_comic, query_comic_sort, sort, reverse): + comics = await DB.add_all(*gen_comic) + ids = [c.id for c in sorted(comics, key=lambda c: len(c.tags), reverse=reverse)] + + response = Response(await query_comic_sort(sort)) + response.assert_is("ComicFilterResult") + + assert ids == [edge["id"] for edge in response.edges] + + +@pytest.mark.anyio +async def test_query_comics_sort_random(gen_comic, query_comic_sort): + comics = await DB.add_all(*gen_comic) + ids = set([c.id for c in comics]) + + response = Response(await query_comic_sort({"on": "RANDOM"})) + response.assert_is("ComicFilterResult") + + assert ids == set(edge["id"] for edge in response.edges) + + +@pytest.mark.anyio +async def test_query_comics_sort_random_seed_direction(gen_comic, query_comic_sort): + comics = await DB.add_all(*gen_comic) + ids = set([c.id for c in comics]) + + response = Response( + await query_comic_sort( + {"on": "RANDOM", "seed": 42069, "direction": "ASCENDING"} + ) + ) + response.assert_is("ComicFilterResult") + + ascending_ids = [edge["id"] for edge in response.edges] + + assert ids == set(ascending_ids) + + response = Response( + await query_comic_sort( + {"on": "RANDOM", "seed": 42069, "direction": "DESCENDING"} + ) + ) + response.assert_is("ComicFilterResult") + + descending_ids = [edge["id"] for edge in response.edges] + + assert ascending_ids == descending_ids[::-1] + + +@pytest.mark.anyio +async def test_query_namespace_sort_sort_name(query_namespace_sort): + await DB.add(Namespace(name="one", sort_name="2")) + await DB.add(Namespace(name="two", sort_name="1")) + + response = Response(await query_namespace_sort({"on": "SORT_NAME"})) + response.assert_is("NamespaceFilterResult") + + assert ["two", "one"] == [edge["name"] for edge in response.edges] diff --git a/tests/api/test_tag.py b/tests/api/test_tag.py new file mode 100644 index 0000000..c863a00 --- /dev/null +++ b/tests/api/test_tag.py @@ -0,0 +1,441 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace, Tag + + +@pytest.fixture +def query_tag(execute_id): + query = """ + query tag($id: Int!) { + tag(id: $id) { + __typename + ... on FullTag { + id + name + description + namespaces { + __typename + id + } + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_tags(execute): + query = """ + query tags { + tags { + __typename + count + edges { + id + name + description + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_tag(execute_add): + mutation = """ + mutation addTag($input: AddTagInput!) { + addTag(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_tags(execute_update): + mutation = """ + mutation updateTags($ids: [Int!]!, $input: UpdateTagInput!) { + updateTags(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_tags(execute_delete): + mutation = """ + mutation deleteTags($ids: [Int!]!) { + deleteTags(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_tag(query_tag, gen_tag): + tag = await DB.add(next(gen_tag)) + + response = Response(await query_tag(tag.id)) + response.assert_is("FullTag") + + assert response.id == tag.id + assert response.name == tag.name + assert response.description == tag.description + assert set([n["id"] for n in response.namespaces]) == set( + [n.id for n in tag.namespaces] + ) + + +@pytest.mark.anyio +async def test_query_tag_fails_not_found(query_tag): + response = Response(await query_tag(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_tags(query_tags, gen_tag): + tags = await DB.add_all(*gen_tag) + response = Response(await query_tags()) + response.assert_is("TagFilterResult") + + assert response.count == len(tags) + assert isinstance((response.edges), list) + assert len(response.edges) == len(tags) + + edges = iter(response.edges) + for tag in sorted(tags, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == tag.id + assert edge["name"] == tag.name + assert edge["description"] == tag.description + + +@pytest.mark.anyio +async def test_add_tag(add_tag): + response = Response( + await add_tag({"name": "added", "description": "it's been added!"}) + ) + response.assert_is("AddSuccess") + + tag = await DB.get(Tag, response.id) + assert tag is not None + assert tag.name == "added" + assert tag.description == "it's been added!" + + +@pytest.mark.anyio +async def test_add_tag_with_namespace(add_tag): + namespace = await DB.add(Namespace(id=1, name="new")) + + response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) + response.assert_is("AddSuccess") + + tag = await DB.get(Tag, response.id, full=True) + assert tag is not None + assert tag.name == "added" + assert tag.namespaces[0].id == namespace.id + assert tag.namespaces[0].name == namespace.name + + +@pytest.mark.anyio +async def test_add_tag_fails_empty_parameter(add_tag): + response = Response(await add_tag({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_tag_fails_namespace_not_found(add_tag): + response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_add_tag_fails_exists(add_tag, gen_tag): + tag = await DB.add(next(gen_tag)) + + response = Response(await add_tag({"name": tag.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Tag with this name exists" + + +@pytest.mark.anyio +async def test_delete_tag(delete_tags, gen_tag): + tag = await DB.add(next(gen_tag)) + id = tag.id + + response = Response(await delete_tags(id)) + response.assert_is("DeleteSuccess") + + tag = await DB.get(Tag, id) + assert tag is None + + +@pytest.mark.anyio +async def test_delete_tag_not_found(delete_tags): + response = Response(await delete_tags(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_tag(update_tags, gen_tag, gen_namespace): + tag = await DB.add(next(gen_tag)) + namespace = await DB.add(next(gen_namespace)) + + input = { + "name": "updated", + "description": "how different, how unique", + "namespaces": {"ids": [1]}, + } + response = Response(await update_tags(tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, tag.id, full=True) + assert tag is not None + assert tag.name == "updated" + assert tag.description == "how different, how unique" + assert tag.namespaces[0].id == namespace.id + assert tag.namespaces[0].name == namespace.name + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "with None", + "with empty string", + ], +) +@pytest.mark.anyio +async def test_update_tag_clears_description(update_tags, gen_tag, empty): + tag = await DB.add(next(gen_tag)) + + input = { + "description": empty, + } + response = Response(await update_tags(tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, tag.id) + assert tag is not None + assert tag.description is None + + +@pytest.mark.anyio +async def test_update_tag_fails_exists(update_tags, gen_tag): + first = await DB.add(next(gen_tag)) + second = await DB.add(next(gen_tag)) + + response = Response(await update_tags(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Tag with this name exists" + + +@pytest.mark.anyio +async def test_update_tag_fails_not_found(update_tags): + response = Response(await update_tags(1, {"name": "updated"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_tags_cannot_bulk_edit_name(update_tags, gen_tag): + first = await DB.add(next(gen_tag)) + second = await DB.add(next(gen_tag)) + + response = Response(await update_tags([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_tag_fails_empty_parameter(update_tags, gen_tag, empty): + tag = await DB.add(next(gen_tag)) + response = Response(await update_tags(tag.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_tag_fails_namespace_not_found(update_tags, gen_tag): + tag = await DB.add(next(gen_tag)) + response = Response(await update_tags(tag.id, {"namespaces": {"ids": [1]}})) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.parametrize( + "options", + [ + None, + {}, + {"mode": "REPLACE"}, + ], + ids=[ + "by default (none)", + "by default (empty record)", + "when defined explicitly", + ], +) +@pytest.mark.anyio +async def test_update_tag_replaces_assocs(update_tags, gen_tag, options): + original_tag = await DB.add(next(gen_tag)) + new_namespace = await DB.add(Namespace(name="new")) + + input = { + "namespaces": {"ids": [new_namespace.id]}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([o.id for o in [new_namespace]]) + + +@pytest.mark.anyio +async def test_update_tag_adds_assocs(update_tags, gen_tag): + original_tag = await DB.add(next(gen_tag)) + new_namespace = await DB.add(Namespace(name="new")) + added_namespaces = original_tag.namespaces + [new_namespace] + + input = { + "namespaces": {"ids": [new_namespace.id], "options": {"mode": "ADD"}}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([o.id for o in added_namespaces]) + + +@pytest.mark.anyio +async def test_update_tag_removes_assocs(update_tags): + removed_namespace = Namespace(id=1, name="new") + remaining_namespace = Namespace(id=2, name="newtwo") + original_tag = await DB.add( + Tag(id=1, name="tag", namespaces=[removed_namespace, remaining_namespace]) + ) + + input = { + "namespaces": {"ids": [removed_namespace.id], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([remaining_namespace.id]) + + +@pytest.mark.anyio +async def test_update_tag_changes_updated_at(update_tags): + original_tag = Tag(name="tag") + original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_tag = await DB.add(original_tag) + + response = Response(await update_tags(original_tag.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id) + assert tag.updated_at > original_tag.updated_at + + +@pytest.mark.anyio +async def test_update_tag_assoc_changes_updated_at(update_tags): + original_tag = Tag(name="tag") + original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_tag = await DB.add(original_tag) + await DB.add(Namespace(id=1, name="namespace")) + + response = Response( + await update_tags(original_tag.id, {"namespaces": {"ids": [1]}}) + ) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id) + assert tag.updated_at > original_tag.updated_at diff --git a/tests/api/test_world.py b/tests/api/test_world.py new file mode 100644 index 0000000..a3926d1 --- /dev/null +++ b/tests/api/test_world.py @@ -0,0 +1,278 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import World + + +@pytest.fixture +def query_world(execute_id): + query = """ + query world($id: Int!) { + world(id: $id) { + __typename + ... on World { + id + name + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_worlds(execute): + query = """ + query worlds { + worlds { + __typename + count + edges { + id + name + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_world(execute_add): + mutation = """ + mutation addWorld($input: AddWorldInput!) { + addWorld(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_worlds(execute_update): + mutation = """ + mutation updateWorlds($ids: [Int!]!, $input: UpdateWorldInput!) { + updateWorlds(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_worlds(execute_delete): + mutation = """ + mutation deleteWorlds($ids: [Int!]!) { + deleteWorlds(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_world(query_world, gen_world): + world = await DB.add(next(gen_world)) + + response = Response(await query_world(world.id)) + response.assert_is("World") + + assert response.id == world.id + assert response.name == world.name + + +@pytest.mark.anyio +async def test_query_world_fails_not_found(query_world): + response = Response(await query_world(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "World ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_worlds(query_worlds, gen_world): + worlds = await DB.add_all(*gen_world) + + response = Response(await query_worlds()) + response.assert_is("WorldFilterResult") + + assert response.count == len(worlds) + assert isinstance((response.edges), list) + assert len(response.edges) == len(worlds) + + edges = iter(response.edges) + for world in sorted(worlds, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == world.id + assert edge["name"] == world.name + + +@pytest.mark.anyio +async def test_add_world(add_world): + response = Response(await add_world({"name": "added world"})) + response.assert_is("AddSuccess") + + world = await DB.get(World, response.id) + assert world is not None + assert world.name == "added world" + + +@pytest.mark.anyio +async def test_add_world_fails_empty_parameter(add_world): + response = Response(await add_world({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_world_fails_exists(add_world, gen_world): + world = await DB.add(next(gen_world)) + + response = Response(await add_world({"name": world.name})) + response.assert_is("NameExistsError") + assert response.message == "Another World with this name exists" + + +@pytest.mark.anyio +async def test_delete_world(delete_worlds, gen_world): + world = await DB.add(next(gen_world)) + id = world.id + + response = Response(await delete_worlds(id)) + response.assert_is("DeleteSuccess") + + world = await DB.get(World, id) + assert world is None + + +@pytest.mark.anyio +async def test_delete_world_not_found(delete_worlds): + response = Response(await delete_worlds(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "World ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_world(update_worlds, gen_world): + world = await DB.add(next(gen_world)) + + input = {"name": "updated world"} + response = Response(await update_worlds(world.id, input)) + response.assert_is("UpdateSuccess") + + world = await DB.get(World, world.id) + assert world is not None + assert world.name == "updated world" + + +@pytest.mark.anyio +async def test_update_world_fails_exists(update_worlds, gen_world): + first = await DB.add(next(gen_world)) + second = await DB.add(next(gen_world)) + + response = Response(await update_worlds(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another World with this name exists" + + +@pytest.mark.anyio +async def test_update_world_fails_not_found(update_worlds): + response = Response(await update_worlds(1, {"name": "updated world"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "World ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_worlds_cannot_bulk_edit_name(update_worlds, gen_world): + first = await DB.add(next(gen_world)) + second = await DB.add(next(gen_world)) + + response = Response(await update_worlds([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_world_fails_empty_parameter(update_worlds, gen_world, empty): + world = await DB.add(next(gen_world)) + + response = Response(await update_worlds(world.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_world_changes_updated_at(update_worlds): + original_world = World(name="world") + original_world.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_world = await DB.add(original_world) + + response = Response(await update_worlds(original_world.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + world = await DB.get(World, original_world.id) + assert world.updated_at > original_world.updated_at -- cgit v1.2.3-2-gb3c3