From d1d654ebac2d51e3841675faeb56480e440f622f Mon Sep 17 00:00:00 2001 From: Wolfgang Müller Date: Tue, 5 Mar 2024 18:08:09 +0100 Subject: Initial commit --- tests/api/test_comic.py | 1505 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1505 insertions(+) create mode 100644 tests/api/test_comic.py (limited to 'tests/api/test_comic.py') diff --git a/tests/api/test_comic.py b/tests/api/test_comic.py new file mode 100644 index 0000000..d3fa51e --- /dev/null +++ b/tests/api/test_comic.py @@ -0,0 +1,1505 @@ +from datetime import date, timezone +from datetime import datetime as dt + +import pytest +from conftest import DB, Response +from hircine.db.models import ( + Artist, + Circle, + Comic, + ComicArtist, + ComicTag, + Namespace, + Tag, + World, +) +from hircine.enums import Category, Censorship, Direction, Language, Layout, Rating + +full_comic_fragment = """ + fragment FullComic on FullComic { + id + title + category + censorship + createdAt + date + direction + language + layout + originalTitle + url + rating + pageCount + updatedAt + organized + bookmarked + archive { + __typename + id + } + artists { + __typename + id + } + characters { + __typename + id + } + circles { + __typename + id + } + cover { + __typename + id + } + pages { + __typename + id + } + tags { + __typename + id + name + } + worlds { + __typename + id + } + } +""" + +comic_fragment = """ + fragment Comic on Comic { + id + title + category + censorship + date + language + originalTitle + rating + pageCount + organized + bookmarked + artists { + __typename + id + } + characters { + __typename + id + } + circles { + __typename + id + } + cover { + __typename + id + } + tags { + __typename + id + name + } + worlds { + __typename + id + } + } +""" + + +@pytest.fixture +def query_comic(execute_id): + query = """ + query comic($id: Int!) { + comic(id: $id) { + __typename + ... on FullComic { + ...FullComic + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(full_comic_fragment + query) + + +@pytest.fixture +def query_comics(execute): + query = """ + query comics { + comics { + __typename + count + edges { + ...Comic + } + } + } + """ + + return execute(comic_fragment + query) + + +@pytest.fixture +def add_comic(execute_add): + mutation = """ + mutation addComic($input: AddComicInput!) { + addComic(input: $input) { + __typename + ... on AddComicSuccess { + id + archivePagesRemaining + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on PageClaimedError { + comicId + id + } + ... on PageRemoteError { + archiveId + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def delete_comics(execute_delete): + mutation = """ + mutation deleteComics($ids: [Int!]!) { + deleteComics(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.fixture +def update_comics(execute_update): + mutation = """ + mutation updateComics($ids: [Int!]!, $input: UpdateComicInput!) { + updateComics(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on PageRemoteError { + id + archiveId + } + ... on PageClaimedError { + id + comicId + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def upsert_comics(execute_update): + mutation = """ + mutation upsertComics($ids: [Int!]!, $input: UpsertComicInput!) { + upsertComics(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +def assert_association_matches(obj, model, typename): + assert obj["__typename"] == typename + assert obj["id"] == model.id + + +def assert_associations_match(objlist, modellist, typename, sortkey): + assert isinstance((objlist), list) + assert len(objlist) == len(modellist) + model = iter(sorted(modellist, key=sortkey)) + for obj in objlist: + assert_association_matches(obj, next(model), typename) + + +def assert_comic_item_matches(data, comic): + assert data["id"] == comic.id + assert data["title"] == comic.title + assert data["originalTitle"] == comic.original_title + assert date.fromisoformat(data["date"]) == comic.date + assert Rating[data["rating"]] == comic.rating + assert Language[data["language"]] == comic.language + assert data["pageCount"] == comic.page_count + assert data["organized"] == comic.organized + assert data["bookmarked"] == comic.bookmarked + + if data["category"]: + assert Category[data["category"]] == comic.category + else: + assert comic.category is None + + if data["censorship"]: + assert Censorship[data["censorship"]] == comic.censorship + else: + assert comic.censorship is None + + assert_association_matches(data["cover"], comic.cover, "Image") + assert_associations_match( + data["artists"], comic.artists, "Artist", lambda a: a.name + ) + assert_associations_match( + data["characters"], comic.characters, "Character", lambda c: c.name + ) + assert_associations_match( + data["circles"], comic.circles, "Circle", lambda c: c.name + ) + assert_associations_match(data["tags"], comic.tags, "ComicTag", lambda t: t.name) + assert_associations_match(data["worlds"], comic.worlds, "World", lambda w: w.name) + + +def assert_comic_matches(data, comic): + assert_comic_item_matches(data, comic) + assert dt.fromisoformat(data["createdAt"]) == comic.created_at + assert dt.fromisoformat(data["updatedAt"]) == comic.updated_at + assert Direction[data["direction"]] == comic.direction + assert Layout[data["layout"]] == comic.layout + assert data["url"] == comic.url + + assert_association_matches(data["archive"], comic.archive, "Archive") + assert_associations_match(data["pages"], comic.pages, "Page", lambda p: p.index) + + +@pytest.mark.anyio +async def test_query_comic(query_comic, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await query_comic(comic.id)) + response.assert_is("FullComic") + + assert_comic_matches(response.data, comic) + + +@pytest.mark.anyio +async def test_query_comic_sorts_pages(query_comic, gen_jumbled_archive): + archive = next(gen_jumbled_archive) + + comic = await DB.add( + Comic( + id=1, + title="A Jumbled Mess", + archive=archive, + pages=archive.pages, + cover=archive.cover, + ) + ) + + response = Response(await query_comic(comic.id)) + response.assert_is("FullComic") + + assert_associations_match(response.pages, comic.pages, "Page", lambda p: p.index) + + +@pytest.mark.anyio +async def test_query_comic_fails_not_found(query_comic): + response = Response(await query_comic(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_comics(query_comics, gen_comic): + comics = await DB.add_all(*gen_comic) + + response = Response(await query_comics()) + response.assert_is("ComicFilterResult") + + assert response.count == len(comics) + assert isinstance((response.edges), list) + assert len(response.edges) == len(comics) + + edge = iter(response.edges) + for comic in sorted(comics, key=lambda c: c.title): + assert_comic_item_matches(next(edge), comic) + + +@pytest.mark.anyio +async def test_add_comic(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + before = dt.now(timezone.utc).replace(microsecond=0) + + response = Response( + await add_comic( + { + "title": "The Comically Bad Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("AddComicSuccess") + assert response.archivePagesRemaining is False + + after = dt.now(timezone.utc).replace(microsecond=0) + + comic = await DB.get(Comic, response.id, full=True) + assert comic is not None + assert comic.title == "The Comically Bad Comic" + + assert comic.archive.id == archive.id + assert comic.archive.organized is True + + assert set([page.id for page in comic.pages]) == set( + [page.id for page in archive.pages] + ) + + assert comic.cover.id == archive.cover.id + + assert comic.category is None + assert comic.censorship is None + assert comic.created_at >= before + assert comic.created_at <= after + assert comic.date is None + assert comic.language is None + assert comic.layout == Layout.SINGLE + assert comic.original_title is None + assert comic.url is None + assert comic.rating is None + + assert comic.artists == [] + assert comic.characters == [] + assert comic.circles == [] + assert comic.tags == [] + assert comic.worlds == [] + + +@pytest.mark.anyio +async def test_add_comic_pages_remaining(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "The Unfinished Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages][:2]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("AddComicSuccess") + assert response.archivePagesRemaining is True + + comic = await DB.get(Comic, response.id, full=True) + assert comic.archive.organized is False + + +@pytest.mark.anyio +async def test_add_comic_fails_archive_not_found(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Voidful Comic", + "archive": {"id": 10}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("IDNotFoundError") + assert response.id == 10 + assert response.message == "Archive ID not found: '10'" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_not_found(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Pageless Comic", + "archive": {"id": archive.id}, + "pages": {"ids": [10]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("IDNotFoundError") + assert response.id == 10 + assert response.message == "Page ID not found: '10'" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_claimed(add_comic, gen_archive): + other_archive = next(gen_archive) + other_comic = await DB.add( + Comic( + title="Lawful Comic", + archive=other_archive, + cover=other_archive.cover, + pages=other_archive.pages, + ) + ) + + claimed_page = other_comic.pages[0] + + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "Comic of Attempted Burglary", + "archive": {"id": archive.id}, + "pages": {"ids": [claimed_page.id]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + + response.assert_is("PageClaimedError") + assert response.id == claimed_page.id + assert response.comicId == other_comic.id + assert ( + response.message + == f"Page ID {claimed_page.id} is already claimed by comic ID {other_comic.id}" + ) + + +@pytest.mark.anyio +async def test_add_comic_fails_empty_parameter(add_comic, gen_archive): + archive = next(gen_archive) + await DB.add(archive) + + response = Response( + await add_comic( + { + "title": "", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + response.assert_is("InvalidParameterError") + assert response.parameter == "title" + + +@pytest.mark.anyio +async def test_add_comic_fails_page_remote(add_comic, gen_archive): + other_archive = await DB.add(next(gen_archive)) + other_page = other_archive.pages[0] + + archive = await DB.add(next(gen_archive)) + + response = Response( + await add_comic( + { + "title": "Comic of Multiple Archives", + "archive": {"id": archive.id}, + "pages": {"ids": [other_page.id]}, + "cover": {"id": archive.pages[0].id}, + } + ) + ) + + response.assert_is("PageRemoteError") + assert response.id == other_page.id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_add_comic_fails_cover_remote(add_comic, gen_archive): + other_archive = await DB.add(next(gen_archive)) + other_page = other_archive.pages[0] + + archive = await DB.add(next(gen_archive)) + + response = Response( + await add_comic( + { + "title": "Comic of Multiple Archives", + "archive": {"id": archive.id}, + "pages": {"ids": [p.id for p in archive.pages]}, + "cover": {"id": other_page.id}, + } + ) + ) + + response.assert_is("PageRemoteError") + assert response.id == other_page.id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_delete_comic(delete_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await delete_comics(comic.id)) + response.assert_is("DeleteSuccess") + + comic = await DB.get(Comic, comic.id) + assert comic is None + + +@pytest.mark.anyio +async def test_delete_comic_not_found(delete_comics): + response = Response(await delete_comics(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +def assert_assocs_match(assocs, collection, name_only=False): + assert set([o.name for o in assocs]) == set([o.name for o in collection]) + assert set([o.id for o in assocs]) == set([o.id for o in collection]) + + +@pytest.mark.anyio +async def test_update_comic(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + artists = await DB.add_all(Artist(name="arty"), Artist(name="farty")) + circles = await DB.add_all(Circle(name="round"), Circle(name="oval")) + worlds = await DB.add_all(World(name="animal world"), World(name="no spiders")) + + namespace = await DB.add(Namespace(name="emus")) + tag = await DB.add(Tag(name="creepy")) + ct = ComicTag(namespace=namespace, tag=tag) + + new_tags = [ct] + original_comic.tags + new_pages = [p.id for p in original_comic.pages[:2]] + + input = { + "title": "Saucy Savannah Adventures (now in Italian)", + "url": "file:///home/savannah/avventura", + "originalTitle": original_comic.title, + "cover": {"id": original_comic.pages[1].id}, + "pages": {"ids": new_pages}, + "favourite": False, + "organized": True, + "bookmarked": True, + "artists": {"ids": [a.id for a in artists]}, + "circles": {"ids": [c.id for c in circles]}, + "worlds": {"ids": [w.id for w in worlds]}, + "tags": {"ids": [ct.id for ct in new_tags]}, + "date": "2010-07-06", + "direction": "LEFT_TO_RIGHT", + "language": "IT", + "layout": "DOUBLE_OFFSET", + "rating": "EXPLICIT", + "censorship": "BAR", + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + assert comic.title == "Saucy Savannah Adventures (now in Italian)" + assert comic.original_title == original_comic.title + assert comic.cover.id == original_comic.pages[1].image.id + assert comic.url == "file:///home/savannah/avventura" + assert comic.favourite is False + assert comic.organized is True + assert comic.bookmarked is True + + assert set([p.id for p in comic.pages]) == set(new_pages) + + assert_assocs_match(comic.artists, artists) + assert_assocs_match(comic.circles, circles) + assert_assocs_match(comic.characters, original_comic.characters) + assert_assocs_match(comic.worlds, worlds) + assert_assocs_match(comic.tags, new_tags) + + assert comic.date == date(2010, 7, 6) + assert comic.direction == Direction.LEFT_TO_RIGHT + assert comic.layout == Layout.DOUBLE_OFFSET + assert comic.rating == Rating.EXPLICIT + assert comic.language == Language.IT + assert comic.censorship == Censorship.BAR + + +@pytest.mark.anyio +async def test_update_comic_clears_associations(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + empty = {"ids": []} + + input = { + "artists": empty, + "circles": empty, + "worlds": empty, + "tags": empty, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.artists == [] + assert comic.circles == [] + assert comic.worlds == [] + assert comic.tags == [] + + +@pytest.mark.anyio +async def test_update_comic_clears_enums(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + + input = { + "category": None, + "censorship": None, + "rating": None, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.rating is None + assert comic.category is None + assert comic.censorship is None + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "with None", + "with empty string", + ], +) +@pytest.mark.anyio +async def test_update_comic_clears_string_fields(update_comics, gen_comic, empty): + original_comic = await DB.add(next(gen_comic)) + + input = { + "originalTitle": empty, + "url": empty, + "date": None, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert comic.original_title is None + assert comic.date is None + assert comic.url is None + + +@pytest.mark.anyio +async def test_update_comic_fails_comic_not_found(update_comics): + response = Response(await update_comics(1, {"title": "This Will Not Happen"})) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Comic ID not found: '1'" + + +@pytest.mark.parametrize( + "parameter,empty", + [ + ("title", ""), + ("title", None), + ("direction", None), + ("layout", None), + ], + ids=[ + "title (empty string)", + "title (none)", + "direction", + "layout", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_empty_parameter( + update_comics, gen_archive, parameter, empty +): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {parameter: empty})) + response.assert_is("InvalidParameterError") + assert response.parameter == parameter + assert response.message == f"Invalid parameter '{parameter}': cannot be empty" + + +@pytest.mark.anyio +async def test_update_comic_fails_namespace_not_found(update_comics, gen_archive): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + tag = await DB.add(Tag(name="shiny")) + + response = Response( + await update_comics(comic.id, {"tags": {"ids": [f"1:{tag.id}"]}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_comic_fails_tag_not_found(update_comics, gen_archive): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + namespace = await DB.add(Namespace(name="height")) + + response = Response( + await update_comics(comic.id, {"tags": {"ids": [f"{namespace.id}:1"]}}) + ) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.parametrize( + "input", + [ + "", + ":1", + "1:", + "a:b", + "tag", + ], + ids=[ + "empty", + "namespacing missing", + "tag missing", + "no numeric ids", + "wrong format", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_invalid_tag(update_comics, gen_archive, input): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {"tags": {"ids": [input]}})) + response.assert_is("InvalidParameterError") + assert response.parameter == "id" + + msg = "Invalid parameter 'id': ComicTag ID must be specified as :" # noqa: E501 + assert response.message == msg + + +@pytest.mark.parametrize( + "name,key,id", + [ + ("Artist", "artists", 1), + ("Character", "characters", 1), + ("Circle", "circles", 1), + ("World", "worlds", 1), + ], + ids=[ + "artist", + "character", + "circle", + "world", + ], +) +@pytest.mark.anyio +async def test_update_comic_fails_assoc_not_found( + update_comics, gen_archive, name, key, id +): + archive = next(gen_archive) + comic = await DB.add( + Comic( + title="Dusty Old Comic", + archive=archive, + cover=archive.cover, + pages=archive.pages, + ) + ) + + response = Response(await update_comics(comic.id, {key: {"ids": [id]}})) + response.assert_is("IDNotFoundError") + assert response.id == id + assert response.message == f"{name} ID not found: '{id}'" + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + ], + ids=[ + "default option", + "empty option", + "explicit option", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignores_with(upsert_comics, gen_comic, option): + original_comic = await DB.add(next(gen_comic)) + + new_artists = await DB.add_all(Artist(name="arty"), Artist(name="farty")) + + input = { + "artists": { + "names": [a.name for a in new_artists] + ["newy"], + "options": option, + }, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.artists, original_comic.artists + list(new_artists)) + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + ], + ids=[ + "default option", + "empty option", + "explicit option", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignores_missing_tags_with(upsert_comics, gen_comic, option): + original_comic = await DB.add(next(gen_comic)) + + tags = await DB.add_all( + ComicTag( + comic_id=original_comic.id, + namespace=Namespace(name="foo"), + tag=Tag(name="bar"), + ) + ) + + input = { + "tags": {"names": ["foo:bar", "baz:qux"], "options": option}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, original_comic.tags + list(tags)) + + +@pytest.mark.parametrize( + "option", + [ + None, + {}, + {"onMissing": "IGNORE"}, + {"onMissing": "CREATE"}, + ], + ids=[ + "default option", + "empty option", + "IGNORE", + "CREATE", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_skips_existing_tags(upsert_comics, gen_comic, option): + comic = await DB.add(next(gen_comic)) + + ctag = comic.tags[0] + names = [f"{ctag.namespace.name}:{ctag.tag.name}"] + + input = { + "tags": {"names": names, "options": option}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, comic.tags) + + +@pytest.mark.parametrize( + "valid", + [ + True, + False, + ], + ids=[ + "valid combination", + "invalid combination", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_ignore_missing_handles_resident( + upsert_comics, gen_comic, valid +): + original_comic = await DB.add(next(gen_comic)) + + namespace = await DB.add(Namespace(name="foo")) + tag = Tag(name="bar") + if valid: + tag.namespaces = [namespace] + + tag = await DB.add(tag) + ctag = ComicTag(namespace=namespace, tag=tag) + + expected_tags = original_comic.tags + + if valid: + expected_tags.append(ctag) + + input = { + "tags": {"names": ["foo:bar"], "options": {"onMissing": "IGNORE"}}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert_assocs_match(comic.tags, expected_tags) + + +@pytest.mark.anyio +async def test_upsert_comic_tags_uses_existing(upsert_comics, empty_comic): + original_comic = await DB.add(empty_comic) + + await DB.add_all(Namespace(name="foo")) + await DB.add_all(Tag(name="bar")) + + tag_names = ["foo:bar"] + + response = Response( + await upsert_comics( + original_comic.id, + {"tags": {"names": tag_names, "options": {"onMissing": "CREATE"}}}, + ) + ) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert set(tag_names) == set( + [f"{t.namespace.name}:{t.tag.name}" for t in comic.tags] + ) + + +@pytest.mark.parametrize( + "key,list", + [ + ("artists", ["arty", "farty"]), + ("tags", ["alien:medium", "human:tiny"]), + ("artists", ["arty", "arty"]), + ("tags", ["foo:good", "bar:good"]), + ("tags", ["foo:good", "foo:bad"]), + ("artists", []), + ], + ids=[ + "artists", + "tags", + "artists (duplicate)", + "tags (duplicate)", + "namespace (duplicate)", + "artists (empty)", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_creates(upsert_comics, empty_comic, key, list): + original_comic = await DB.add(empty_comic) + + input = { + key: {"names": list, "options": {"onMissing": "CREATE"}}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert comic is not None + + assert set(list) == set([o.name for o in getattr(comic, key)]) + + +@pytest.mark.anyio +async def test_upsert_comic_fails_creating_empty_assoc_name(upsert_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + input = { + "artists": {"names": ""}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("InvalidParameterError") + assert response.parameter == "Artist.name" + + +@pytest.mark.anyio +async def test_upsert_comic_does_not_replace(upsert_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + original_artists = set([a.name for a in original_comic.artists]) + + input = { + "artists": {"names": []}, + } + response = Response(await upsert_comics(original_comic.id, input)) + response.assert_is("UpsertSuccess") + + comic = await DB.get(Comic, original_comic.id) + artists = set([a.name for a in comic.artists]) + + assert artists == original_artists + + +@pytest.mark.parametrize( + "input", + [ + "", + ":tiny", + "human:", + "medium", + ], + ids=[ + "empty", + "namespace missing", + "tag missing", + "wrong format", + ], +) +@pytest.mark.anyio +async def test_upsert_comic_fails_creating_invalid_tag(upsert_comics, gen_comic, input): + comic = await DB.add(next(gen_comic)) + + input = { + "tags": {"names": [input]}, + } + response = Response(await upsert_comics(comic.id, input)) + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + msg = "Invalid parameter 'name': ComicTag name must be specified as :" # noqa: E501 + assert response.message == msg + + +@pytest.mark.parametrize( + "options", + [ + None, + {}, + {"mode": "REPLACE"}, + ], + ids=[ + "by default (none)", + "by default (empty record)", + "when defined explicitly", + ], +) +@pytest.mark.anyio +async def test_update_comic_replaces_assocs(update_comics, gen_comic, options): + original_comic = await DB.add(next(gen_comic)) + new_artist = await DB.add(Artist(name="max")) + + input = { + "artists": {"ids": [new_artist.id]}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, [new_artist]) + + +@pytest.mark.anyio +async def test_update_comic_adds_assocs(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + new_artist = await DB.add(Artist(name="max")) + added_artists = original_comic.artists + [new_artist] + + input = { + "artists": {"ids": [new_artist.id], "options": {"mode": "ADD"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, added_artists) + + +@pytest.mark.anyio +async def test_update_comic_adds_existing_assocs(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + artists = original_comic.artists + + input = { + "artists": { + "ids": [artist.id for artist in artists], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, artists) + + +@pytest.mark.anyio +async def test_update_comic_adds_tags(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + new_namespace = await DB.add(Namespace(name="new")) + new_tag = await DB.add(Tag(name="new")) + added_tags = original_comic.tags + [ + ComicTag(comic_id=original_comic.id, tag=new_tag, namespace=new_namespace) + ] + + input = { + "tags": { + "ids": [f"{new_namespace.id}:{new_tag.id}"], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert_assocs_match(comic.tags, added_tags) + + +@pytest.mark.anyio +async def test_update_comic_adds_existing_tags(update_comics, gen_comic): + original_comic = await DB.add(next(gen_comic)) + tags = original_comic.tags + + input = { + "tags": { + "ids": [f"{tag.namespace.id}:{tag.tag.id}" for tag in tags], + "options": {"mode": "ADD"}, + }, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + assert_assocs_match(comic.tags, tags) + + +@pytest.mark.anyio +async def test_update_comic_removes_assocs(update_comics, empty_comic): + original_comic = empty_comic + removed_artist = Artist(id=1, name="sam") + remaining_artist = Artist(id=2, name="max") + original_comic.artists = [removed_artist, remaining_artist] + original_comic = await DB.add(original_comic) + + input = { + "artists": {"ids": [removed_artist.id], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.artists, [remaining_artist]) + + +@pytest.mark.anyio +async def test_update_comic_removes_tags(update_comics, empty_comic): + original_comic = empty_comic + removed_tag = ComicTag( + comic_id=original_comic.id, + tag=Tag(id=1, name="gone"), + namespace=Namespace(id=1, name="all"), + ) + remaining_tag = ComicTag( + comic_id=original_comic.id, + tag=Tag(id=2, name="there"), + namespace=Namespace(id=2, name="still"), + ) + original_comic.tags = [removed_tag, remaining_tag] + original_comic = await DB.add(original_comic) + + input = { + "tags": {"ids": ["1:1"], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id, full=True) + + assert_assocs_match(comic.tags, [remaining_tag]) + + +@pytest.mark.parametrize( + "rows,input", + [ + ([], {"title": "Updated Comic"}), + ( + [Artist(id=1, name="artist")], + {"artists": {"ids": [1]}}, + ), + ( + [Artist(id=1, name="artist"), ComicArtist(artist_id=1, comic_id=100)], + {"title": "Updated Comic", "artists": {"ids": [1]}}, + ), + ( + [ + Namespace(id=1, name="ns"), + Tag(id=1, name="artist"), + ], + {"tags": {"ids": ["1:1"]}}, + ), + ( + [ + Namespace(id=1, name="ns"), + Tag(id=1, name="artist"), + ComicTag(namespace_id=1, tag_id=1, comic_id=100), + ], + {"title": "Updated Comic", "tags": {"ids": ["1:1"]}}, + ), + ], + ids=[ + "with scalar", + "with assoc", + "with scalar and existing assoc", + "with tag", + "with scalar and existing tag", + ], +) +@pytest.mark.anyio +async def test_update_comic_changes_updated_at(update_comics, empty_comic, rows, input): + original_comic = empty_comic + original_comic.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_comic = await DB.add(original_comic) + + await DB.add_all(*rows) + + response = Response(await update_comics(original_comic.id, input)) + response.assert_is("UpdateSuccess") + + comic = await DB.get(Comic, original_comic.id) + assert comic.updated_at > original_comic.updated_at + + +@pytest.mark.anyio +async def test_update_comic_cover_fails_page_not_found(update_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await update_comics(comic.id, {"cover": {"id": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_comic_cover_fails_page_remote( + update_comics, gen_comic, gen_archive +): + comic = await DB.add(next(gen_comic)) + other_archive = await DB.add(next(gen_archive)) + remote_id = other_archive.pages[0].id + + response = Response(await update_comics(comic.id, {"cover": {"id": remote_id}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_not_found(update_comics, gen_comic): + comic = await DB.add(next(gen_comic)) + + response = Response(await update_comics(comic.id, {"pages": {"ids": 100}})) + response.assert_is("IDNotFoundError") + assert response.id == 100 + assert response.message == "Page ID not found: '100'" + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_remote( + update_comics, gen_comic, gen_archive +): + comic = await DB.add(next(gen_comic)) + other_archive = await DB.add(next(gen_archive)) + remote_id = other_archive.pages[0].id + + response = Response(await update_comics(comic.id, {"pages": {"ids": [remote_id]}})) + response.assert_is("PageRemoteError") + assert response.id == remote_id + assert response.archiveId == other_archive.id + assert ( + response.message + == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}" + ) + + +@pytest.mark.anyio +async def test_update_comic_pages_fails_page_claimed(update_comics, gen_archive): + archive = await DB.add(next(gen_archive)) + + comic = await DB.add( + Comic( + id=1, + title="A Very Good Comic", + archive=archive, + cover=archive.pages[0].image, + pages=[archive.pages[0], archive.pages[1]], + ) + ) + + claiming = await DB.add( + Comic( + id=2, + title="A Very Claiming Comic", + archive=archive, + cover=archive.pages[2].image, + pages=[archive.pages[2], archive.pages[3]], + ) + ) + + claimed_id = claiming.pages[0].id + + response = Response(await update_comics(comic.id, {"pages": {"ids": [claimed_id]}})) + response.assert_is("PageClaimedError") + assert response.id == claimed_id + assert response.comicId == claiming.id + assert ( + response.message + == f"Page ID {claimed_id} is already claimed by comic ID {claiming.id}" + ) + + +@pytest.mark.parametrize( + "mode", + [ + ("REPLACE"), + ("REMOVE"), + ], +) +@pytest.mark.anyio +async def test_update_comic_pages_fails_empty(update_comics, gen_comic, mode): + comic = await DB.add(next(gen_comic)) + + ids = [] if mode == "REPLACE" else [p.id for p in comic.pages] + + response = Response( + await update_comics( + comic.id, {"pages": {"ids": ids, "options": {"mode": mode}}} + ) + ) + response.assert_is("InvalidParameterError") + assert response.parameter == "pages" + assert response.message == "Invalid parameter 'pages': cannot be empty" -- cgit v1.2.3-2-gb3c3