summaryrefslogtreecommitdiffstatshomepage
path: root/tests/api/test_comic.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/api/test_comic.py1505
1 files changed, 1505 insertions, 0 deletions
diff --git a/tests/api/test_comic.py b/tests/api/test_comic.py
new file mode 100644
index 0000000..d3fa51e
--- /dev/null
+++ b/tests/api/test_comic.py
@@ -0,0 +1,1505 @@
+from datetime import date, timezone
+from datetime import datetime as dt
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import (
+ Artist,
+ Circle,
+ Comic,
+ ComicArtist,
+ ComicTag,
+ Namespace,
+ Tag,
+ World,
+)
+from hircine.enums import Category, Censorship, Direction, Language, Layout, Rating
+
+full_comic_fragment = """
+ fragment FullComic on FullComic {
+ id
+ title
+ category
+ censorship
+ createdAt
+ date
+ direction
+ language
+ layout
+ originalTitle
+ url
+ rating
+ pageCount
+ updatedAt
+ organized
+ bookmarked
+ archive {
+ __typename
+ id
+ }
+ artists {
+ __typename
+ id
+ }
+ characters {
+ __typename
+ id
+ }
+ circles {
+ __typename
+ id
+ }
+ cover {
+ __typename
+ id
+ }
+ pages {
+ __typename
+ id
+ }
+ tags {
+ __typename
+ id
+ name
+ }
+ worlds {
+ __typename
+ id
+ }
+ }
+"""
+
+comic_fragment = """
+ fragment Comic on Comic {
+ id
+ title
+ category
+ censorship
+ date
+ language
+ originalTitle
+ rating
+ pageCount
+ organized
+ bookmarked
+ artists {
+ __typename
+ id
+ }
+ characters {
+ __typename
+ id
+ }
+ circles {
+ __typename
+ id
+ }
+ cover {
+ __typename
+ id
+ }
+ tags {
+ __typename
+ id
+ name
+ }
+ worlds {
+ __typename
+ id
+ }
+ }
+"""
+
+
+@pytest.fixture
+def query_comic(execute_id):
+ query = """
+ query comic($id: Int!) {
+ comic(id: $id) {
+ __typename
+ ... on FullComic {
+ ...FullComic
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(full_comic_fragment + query)
+
+
+@pytest.fixture
+def query_comics(execute):
+ query = """
+ query comics {
+ comics {
+ __typename
+ count
+ edges {
+ ...Comic
+ }
+ }
+ }
+ """
+
+ return execute(comic_fragment + query)
+
+
+@pytest.fixture
+def add_comic(execute_add):
+ mutation = """
+ mutation addComic($input: AddComicInput!) {
+ addComic(input: $input) {
+ __typename
+ ... on AddComicSuccess {
+ id
+ archivePagesRemaining
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on PageClaimedError {
+ comicId
+ id
+ }
+ ... on PageRemoteError {
+ archiveId
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def delete_comics(execute_delete):
+ mutation = """
+ mutation deleteComics($ids: [Int!]!) {
+ deleteComics(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.fixture
+def update_comics(execute_update):
+ mutation = """
+ mutation updateComics($ids: [Int!]!, $input: UpdateComicInput!) {
+ updateComics(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on PageRemoteError {
+ id
+ archiveId
+ }
+ ... on PageClaimedError {
+ id
+ comicId
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def upsert_comics(execute_update):
+ mutation = """
+ mutation upsertComics($ids: [Int!]!, $input: UpsertComicInput!) {
+ upsertComics(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+def assert_association_matches(obj, model, typename):
+ assert obj["__typename"] == typename
+ assert obj["id"] == model.id
+
+
+def assert_associations_match(objlist, modellist, typename, sortkey):
+ assert isinstance((objlist), list)
+ assert len(objlist) == len(modellist)
+ model = iter(sorted(modellist, key=sortkey))
+ for obj in objlist:
+ assert_association_matches(obj, next(model), typename)
+
+
+def assert_comic_item_matches(data, comic):
+ assert data["id"] == comic.id
+ assert data["title"] == comic.title
+ assert data["originalTitle"] == comic.original_title
+ assert date.fromisoformat(data["date"]) == comic.date
+ assert Rating[data["rating"]] == comic.rating
+ assert Language[data["language"]] == comic.language
+ assert data["pageCount"] == comic.page_count
+ assert data["organized"] == comic.organized
+ assert data["bookmarked"] == comic.bookmarked
+
+ if data["category"]:
+ assert Category[data["category"]] == comic.category
+ else:
+ assert comic.category is None
+
+ if data["censorship"]:
+ assert Censorship[data["censorship"]] == comic.censorship
+ else:
+ assert comic.censorship is None
+
+ assert_association_matches(data["cover"], comic.cover, "Image")
+ assert_associations_match(
+ data["artists"], comic.artists, "Artist", lambda a: a.name
+ )
+ assert_associations_match(
+ data["characters"], comic.characters, "Character", lambda c: c.name
+ )
+ assert_associations_match(
+ data["circles"], comic.circles, "Circle", lambda c: c.name
+ )
+ assert_associations_match(data["tags"], comic.tags, "ComicTag", lambda t: t.name)
+ assert_associations_match(data["worlds"], comic.worlds, "World", lambda w: w.name)
+
+
+def assert_comic_matches(data, comic):
+ assert_comic_item_matches(data, comic)
+ assert dt.fromisoformat(data["createdAt"]) == comic.created_at
+ assert dt.fromisoformat(data["updatedAt"]) == comic.updated_at
+ assert Direction[data["direction"]] == comic.direction
+ assert Layout[data["layout"]] == comic.layout
+ assert data["url"] == comic.url
+
+ assert_association_matches(data["archive"], comic.archive, "Archive")
+ assert_associations_match(data["pages"], comic.pages, "Page", lambda p: p.index)
+
+
+@pytest.mark.anyio
+async def test_query_comic(query_comic, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_comic(comic.id))
+ response.assert_is("FullComic")
+
+ assert_comic_matches(response.data, comic)
+
+
+@pytest.mark.anyio
+async def test_query_comic_sorts_pages(query_comic, gen_jumbled_archive):
+ archive = next(gen_jumbled_archive)
+
+ comic = await DB.add(
+ Comic(
+ id=1,
+ title="A Jumbled Mess",
+ archive=archive,
+ pages=archive.pages,
+ cover=archive.cover,
+ )
+ )
+
+ response = Response(await query_comic(comic.id))
+ response.assert_is("FullComic")
+
+ assert_associations_match(response.pages, comic.pages, "Page", lambda p: p.index)
+
+
+@pytest.mark.anyio
+async def test_query_comic_fails_not_found(query_comic):
+ response = Response(await query_comic(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Comic ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_comics(query_comics, gen_comic):
+ comics = await DB.add_all(*gen_comic)
+
+ response = Response(await query_comics())
+ response.assert_is("ComicFilterResult")
+
+ assert response.count == len(comics)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(comics)
+
+ edge = iter(response.edges)
+ for comic in sorted(comics, key=lambda c: c.title):
+ assert_comic_item_matches(next(edge), comic)
+
+
+@pytest.mark.anyio
+async def test_add_comic(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ before = dt.now(timezone.utc).replace(microsecond=0)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "The Comically Bad Comic",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("AddComicSuccess")
+ assert response.archivePagesRemaining is False
+
+ after = dt.now(timezone.utc).replace(microsecond=0)
+
+ comic = await DB.get(Comic, response.id, full=True)
+ assert comic is not None
+ assert comic.title == "The Comically Bad Comic"
+
+ assert comic.archive.id == archive.id
+ assert comic.archive.organized is True
+
+ assert set([page.id for page in comic.pages]) == set(
+ [page.id for page in archive.pages]
+ )
+
+ assert comic.cover.id == archive.cover.id
+
+ assert comic.category is None
+ assert comic.censorship is None
+ assert comic.created_at >= before
+ assert comic.created_at <= after
+ assert comic.date is None
+ assert comic.language is None
+ assert comic.layout == Layout.SINGLE
+ assert comic.original_title is None
+ assert comic.url is None
+ assert comic.rating is None
+
+ assert comic.artists == []
+ assert comic.characters == []
+ assert comic.circles == []
+ assert comic.tags == []
+ assert comic.worlds == []
+
+
+@pytest.mark.anyio
+async def test_add_comic_pages_remaining(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "The Unfinished Comic",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages][:2]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("AddComicSuccess")
+ assert response.archivePagesRemaining is True
+
+ comic = await DB.get(Comic, response.id, full=True)
+ assert comic.archive.organized is False
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_archive_not_found(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Voidful Comic",
+ "archive": {"id": 10},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 10
+ assert response.message == "Archive ID not found: '10'"
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_page_not_found(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Pageless Comic",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [10]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 10
+ assert response.message == "Page ID not found: '10'"
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_page_claimed(add_comic, gen_archive):
+ other_archive = next(gen_archive)
+ other_comic = await DB.add(
+ Comic(
+ title="Lawful Comic",
+ archive=other_archive,
+ cover=other_archive.cover,
+ pages=other_archive.pages,
+ )
+ )
+
+ claimed_page = other_comic.pages[0]
+
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Comic of Attempted Burglary",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [claimed_page.id]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+
+ response.assert_is("PageClaimedError")
+ assert response.id == claimed_page.id
+ assert response.comicId == other_comic.id
+ assert (
+ response.message
+ == f"Page ID {claimed_page.id} is already claimed by comic ID {other_comic.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_empty_parameter(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "title"
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_page_remote(add_comic, gen_archive):
+ other_archive = await DB.add(next(gen_archive))
+ other_page = other_archive.pages[0]
+
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Comic of Multiple Archives",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [other_page.id]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+
+ response.assert_is("PageRemoteError")
+ assert response.id == other_page.id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_cover_remote(add_comic, gen_archive):
+ other_archive = await DB.add(next(gen_archive))
+ other_page = other_archive.pages[0]
+
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Comic of Multiple Archives",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": other_page.id},
+ }
+ )
+ )
+
+ response.assert_is("PageRemoteError")
+ assert response.id == other_page.id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_delete_comic(delete_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await delete_comics(comic.id))
+ response.assert_is("DeleteSuccess")
+
+ comic = await DB.get(Comic, comic.id)
+ assert comic is None
+
+
+@pytest.mark.anyio
+async def test_delete_comic_not_found(delete_comics):
+ response = Response(await delete_comics(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Comic ID not found: '1'"
+
+
+def assert_assocs_match(assocs, collection, name_only=False):
+ assert set([o.name for o in assocs]) == set([o.name for o in collection])
+ assert set([o.id for o in assocs]) == set([o.id for o in collection])
+
+
+@pytest.mark.anyio
+async def test_update_comic(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+
+ artists = await DB.add_all(Artist(name="arty"), Artist(name="farty"))
+ circles = await DB.add_all(Circle(name="round"), Circle(name="oval"))
+ worlds = await DB.add_all(World(name="animal world"), World(name="no spiders"))
+
+ namespace = await DB.add(Namespace(name="emus"))
+ tag = await DB.add(Tag(name="creepy"))
+ ct = ComicTag(namespace=namespace, tag=tag)
+
+ new_tags = [ct] + original_comic.tags
+ new_pages = [p.id for p in original_comic.pages[:2]]
+
+ input = {
+ "title": "Saucy Savannah Adventures (now in Italian)",
+ "url": "file:///home/savannah/avventura",
+ "originalTitle": original_comic.title,
+ "cover": {"id": original_comic.pages[1].id},
+ "pages": {"ids": new_pages},
+ "favourite": False,
+ "organized": True,
+ "bookmarked": True,
+ "artists": {"ids": [a.id for a in artists]},
+ "circles": {"ids": [c.id for c in circles]},
+ "worlds": {"ids": [w.id for w in worlds]},
+ "tags": {"ids": [ct.id for ct in new_tags]},
+ "date": "2010-07-06",
+ "direction": "LEFT_TO_RIGHT",
+ "language": "IT",
+ "layout": "DOUBLE_OFFSET",
+ "rating": "EXPLICIT",
+ "censorship": "BAR",
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+ assert comic.title == "Saucy Savannah Adventures (now in Italian)"
+ assert comic.original_title == original_comic.title
+ assert comic.cover.id == original_comic.pages[1].image.id
+ assert comic.url == "file:///home/savannah/avventura"
+ assert comic.favourite is False
+ assert comic.organized is True
+ assert comic.bookmarked is True
+
+ assert set([p.id for p in comic.pages]) == set(new_pages)
+
+ assert_assocs_match(comic.artists, artists)
+ assert_assocs_match(comic.circles, circles)
+ assert_assocs_match(comic.characters, original_comic.characters)
+ assert_assocs_match(comic.worlds, worlds)
+ assert_assocs_match(comic.tags, new_tags)
+
+ assert comic.date == date(2010, 7, 6)
+ assert comic.direction == Direction.LEFT_TO_RIGHT
+ assert comic.layout == Layout.DOUBLE_OFFSET
+ assert comic.rating == Rating.EXPLICIT
+ assert comic.language == Language.IT
+ assert comic.censorship == Censorship.BAR
+
+
+@pytest.mark.anyio
+async def test_update_comic_clears_associations(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+
+ empty = {"ids": []}
+
+ input = {
+ "artists": empty,
+ "circles": empty,
+ "worlds": empty,
+ "tags": empty,
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert comic.artists == []
+ assert comic.circles == []
+ assert comic.worlds == []
+ assert comic.tags == []
+
+
+@pytest.mark.anyio
+async def test_update_comic_clears_enums(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+
+ input = {
+ "category": None,
+ "censorship": None,
+ "rating": None,
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert comic.rating is None
+ assert comic.category is None
+ assert comic.censorship is None
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "with None",
+ "with empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_clears_string_fields(update_comics, gen_comic, empty):
+ original_comic = await DB.add(next(gen_comic))
+
+ input = {
+ "originalTitle": empty,
+ "url": empty,
+ "date": None,
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert comic.original_title is None
+ assert comic.date is None
+ assert comic.url is None
+
+
+@pytest.mark.anyio
+async def test_update_comic_fails_comic_not_found(update_comics):
+ response = Response(await update_comics(1, {"title": "This Will Not Happen"}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Comic ID not found: '1'"
+
+
+@pytest.mark.parametrize(
+ "parameter,empty",
+ [
+ ("title", ""),
+ ("title", None),
+ ("direction", None),
+ ("layout", None),
+ ],
+ ids=[
+ "title (empty string)",
+ "title (none)",
+ "direction",
+ "layout",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_fails_empty_parameter(
+ update_comics, gen_archive, parameter, empty
+):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ response = Response(await update_comics(comic.id, {parameter: empty}))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == parameter
+ assert response.message == f"Invalid parameter '{parameter}': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_comic_fails_namespace_not_found(update_comics, gen_archive):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ tag = await DB.add(Tag(name="shiny"))
+
+ response = Response(
+ await update_comics(comic.id, {"tags": {"ids": [f"1:{tag.id}"]}})
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_comic_fails_tag_not_found(update_comics, gen_archive):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ namespace = await DB.add(Namespace(name="height"))
+
+ response = Response(
+ await update_comics(comic.id, {"tags": {"ids": [f"{namespace.id}:1"]}})
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.parametrize(
+ "input",
+ [
+ "",
+ ":1",
+ "1:",
+ "a:b",
+ "tag",
+ ],
+ ids=[
+ "empty",
+ "namespacing missing",
+ "tag missing",
+ "no numeric ids",
+ "wrong format",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_fails_invalid_tag(update_comics, gen_archive, input):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ response = Response(await update_comics(comic.id, {"tags": {"ids": [input]}}))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "id"
+
+ msg = "Invalid parameter 'id': ComicTag ID must be specified as <namespace_id>:<tag_id>" # noqa: E501
+ assert response.message == msg
+
+
+@pytest.mark.parametrize(
+ "name,key,id",
+ [
+ ("Artist", "artists", 1),
+ ("Character", "characters", 1),
+ ("Circle", "circles", 1),
+ ("World", "worlds", 1),
+ ],
+ ids=[
+ "artist",
+ "character",
+ "circle",
+ "world",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_fails_assoc_not_found(
+ update_comics, gen_archive, name, key, id
+):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ response = Response(await update_comics(comic.id, {key: {"ids": [id]}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == id
+ assert response.message == f"{name} ID not found: '{id}'"
+
+
+@pytest.mark.parametrize(
+ "option",
+ [
+ None,
+ {},
+ {"onMissing": "IGNORE"},
+ ],
+ ids=[
+ "default option",
+ "empty option",
+ "explicit option",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_ignores_with(upsert_comics, gen_comic, option):
+ original_comic = await DB.add(next(gen_comic))
+
+ new_artists = await DB.add_all(Artist(name="arty"), Artist(name="farty"))
+
+ input = {
+ "artists": {
+ "names": [a.name for a in new_artists] + ["newy"],
+ "options": option,
+ },
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.artists, original_comic.artists + list(new_artists))
+
+
+@pytest.mark.parametrize(
+ "option",
+ [
+ None,
+ {},
+ {"onMissing": "IGNORE"},
+ ],
+ ids=[
+ "default option",
+ "empty option",
+ "explicit option",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_ignores_missing_tags_with(upsert_comics, gen_comic, option):
+ original_comic = await DB.add(next(gen_comic))
+
+ tags = await DB.add_all(
+ ComicTag(
+ comic_id=original_comic.id,
+ namespace=Namespace(name="foo"),
+ tag=Tag(name="bar"),
+ )
+ )
+
+ input = {
+ "tags": {"names": ["foo:bar", "baz:qux"], "options": option},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.tags, original_comic.tags + list(tags))
+
+
+@pytest.mark.parametrize(
+ "option",
+ [
+ None,
+ {},
+ {"onMissing": "IGNORE"},
+ {"onMissing": "CREATE"},
+ ],
+ ids=[
+ "default option",
+ "empty option",
+ "IGNORE",
+ "CREATE",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_skips_existing_tags(upsert_comics, gen_comic, option):
+ comic = await DB.add(next(gen_comic))
+
+ ctag = comic.tags[0]
+ names = [f"{ctag.namespace.name}:{ctag.tag.name}"]
+
+ input = {
+ "tags": {"names": names, "options": option},
+ }
+ response = Response(await upsert_comics(comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.tags, comic.tags)
+
+
+@pytest.mark.parametrize(
+ "valid",
+ [
+ True,
+ False,
+ ],
+ ids=[
+ "valid combination",
+ "invalid combination",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_ignore_missing_handles_resident(
+ upsert_comics, gen_comic, valid
+):
+ original_comic = await DB.add(next(gen_comic))
+
+ namespace = await DB.add(Namespace(name="foo"))
+ tag = Tag(name="bar")
+ if valid:
+ tag.namespaces = [namespace]
+
+ tag = await DB.add(tag)
+ ctag = ComicTag(namespace=namespace, tag=tag)
+
+ expected_tags = original_comic.tags
+
+ if valid:
+ expected_tags.append(ctag)
+
+ input = {
+ "tags": {"names": ["foo:bar"], "options": {"onMissing": "IGNORE"}},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.tags, expected_tags)
+
+
+@pytest.mark.anyio
+async def test_upsert_comic_tags_uses_existing(upsert_comics, empty_comic):
+ original_comic = await DB.add(empty_comic)
+
+ await DB.add_all(Namespace(name="foo"))
+ await DB.add_all(Tag(name="bar"))
+
+ tag_names = ["foo:bar"]
+
+ response = Response(
+ await upsert_comics(
+ original_comic.id,
+ {"tags": {"names": tag_names, "options": {"onMissing": "CREATE"}}},
+ )
+ )
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert set(tag_names) == set(
+ [f"{t.namespace.name}:{t.tag.name}" for t in comic.tags]
+ )
+
+
+@pytest.mark.parametrize(
+ "key,list",
+ [
+ ("artists", ["arty", "farty"]),
+ ("tags", ["alien:medium", "human:tiny"]),
+ ("artists", ["arty", "arty"]),
+ ("tags", ["foo:good", "bar:good"]),
+ ("tags", ["foo:good", "foo:bad"]),
+ ("artists", []),
+ ],
+ ids=[
+ "artists",
+ "tags",
+ "artists (duplicate)",
+ "tags (duplicate)",
+ "namespace (duplicate)",
+ "artists (empty)",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_creates(upsert_comics, empty_comic, key, list):
+ original_comic = await DB.add(empty_comic)
+
+ input = {
+ key: {"names": list, "options": {"onMissing": "CREATE"}},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert set(list) == set([o.name for o in getattr(comic, key)])
+
+
+@pytest.mark.anyio
+async def test_upsert_comic_fails_creating_empty_assoc_name(upsert_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ input = {
+ "artists": {"names": ""},
+ }
+ response = Response(await upsert_comics(comic.id, input))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "Artist.name"
+
+
+@pytest.mark.anyio
+async def test_upsert_comic_does_not_replace(upsert_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ original_artists = set([a.name for a in original_comic.artists])
+
+ input = {
+ "artists": {"names": []},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id)
+ artists = set([a.name for a in comic.artists])
+
+ assert artists == original_artists
+
+
+@pytest.mark.parametrize(
+ "input",
+ [
+ "",
+ ":tiny",
+ "human:",
+ "medium",
+ ],
+ ids=[
+ "empty",
+ "namespace missing",
+ "tag missing",
+ "wrong format",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_fails_creating_invalid_tag(upsert_comics, gen_comic, input):
+ comic = await DB.add(next(gen_comic))
+
+ input = {
+ "tags": {"names": [input]},
+ }
+ response = Response(await upsert_comics(comic.id, input))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ msg = "Invalid parameter 'name': ComicTag name must be specified as <namespace>:<tag>" # noqa: E501
+ assert response.message == msg
+
+
+@pytest.mark.parametrize(
+ "options",
+ [
+ None,
+ {},
+ {"mode": "REPLACE"},
+ ],
+ ids=[
+ "by default (none)",
+ "by default (empty record)",
+ "when defined explicitly",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_replaces_assocs(update_comics, gen_comic, options):
+ original_comic = await DB.add(next(gen_comic))
+ new_artist = await DB.add(Artist(name="max"))
+
+ input = {
+ "artists": {"ids": [new_artist.id]},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, [new_artist])
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_assocs(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ new_artist = await DB.add(Artist(name="max"))
+ added_artists = original_comic.artists + [new_artist]
+
+ input = {
+ "artists": {"ids": [new_artist.id], "options": {"mode": "ADD"}},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, added_artists)
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_existing_assocs(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ artists = original_comic.artists
+
+ input = {
+ "artists": {
+ "ids": [artist.id for artist in artists],
+ "options": {"mode": "ADD"},
+ },
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, artists)
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_tags(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ new_namespace = await DB.add(Namespace(name="new"))
+ new_tag = await DB.add(Tag(name="new"))
+ added_tags = original_comic.tags + [
+ ComicTag(comic_id=original_comic.id, tag=new_tag, namespace=new_namespace)
+ ]
+
+ input = {
+ "tags": {
+ "ids": [f"{new_namespace.id}:{new_tag.id}"],
+ "options": {"mode": "ADD"},
+ },
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert_assocs_match(comic.tags, added_tags)
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_existing_tags(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ tags = original_comic.tags
+
+ input = {
+ "tags": {
+ "ids": [f"{tag.namespace.id}:{tag.tag.id}" for tag in tags],
+ "options": {"mode": "ADD"},
+ },
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert_assocs_match(comic.tags, tags)
+
+
+@pytest.mark.anyio
+async def test_update_comic_removes_assocs(update_comics, empty_comic):
+ original_comic = empty_comic
+ removed_artist = Artist(id=1, name="sam")
+ remaining_artist = Artist(id=2, name="max")
+ original_comic.artists = [removed_artist, remaining_artist]
+ original_comic = await DB.add(original_comic)
+
+ input = {
+ "artists": {"ids": [removed_artist.id], "options": {"mode": "REMOVE"}},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, [remaining_artist])
+
+
+@pytest.mark.anyio
+async def test_update_comic_removes_tags(update_comics, empty_comic):
+ original_comic = empty_comic
+ removed_tag = ComicTag(
+ comic_id=original_comic.id,
+ tag=Tag(id=1, name="gone"),
+ namespace=Namespace(id=1, name="all"),
+ )
+ remaining_tag = ComicTag(
+ comic_id=original_comic.id,
+ tag=Tag(id=2, name="there"),
+ namespace=Namespace(id=2, name="still"),
+ )
+ original_comic.tags = [removed_tag, remaining_tag]
+ original_comic = await DB.add(original_comic)
+
+ input = {
+ "tags": {"ids": ["1:1"], "options": {"mode": "REMOVE"}},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.tags, [remaining_tag])
+
+
+@pytest.mark.parametrize(
+ "rows,input",
+ [
+ ([], {"title": "Updated Comic"}),
+ (
+ [Artist(id=1, name="artist")],
+ {"artists": {"ids": [1]}},
+ ),
+ (
+ [Artist(id=1, name="artist"), ComicArtist(artist_id=1, comic_id=100)],
+ {"title": "Updated Comic", "artists": {"ids": [1]}},
+ ),
+ (
+ [
+ Namespace(id=1, name="ns"),
+ Tag(id=1, name="artist"),
+ ],
+ {"tags": {"ids": ["1:1"]}},
+ ),
+ (
+ [
+ Namespace(id=1, name="ns"),
+ Tag(id=1, name="artist"),
+ ComicTag(namespace_id=1, tag_id=1, comic_id=100),
+ ],
+ {"title": "Updated Comic", "tags": {"ids": ["1:1"]}},
+ ),
+ ],
+ ids=[
+ "with scalar",
+ "with assoc",
+ "with scalar and existing assoc",
+ "with tag",
+ "with scalar and existing tag",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_changes_updated_at(update_comics, empty_comic, rows, input):
+ original_comic = empty_comic
+ original_comic.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_comic = await DB.add(original_comic)
+
+ await DB.add_all(*rows)
+
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id)
+ assert comic.updated_at > original_comic.updated_at
+
+
+@pytest.mark.anyio
+async def test_update_comic_cover_fails_page_not_found(update_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await update_comics(comic.id, {"cover": {"id": 100}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Page ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_comic_cover_fails_page_remote(
+ update_comics, gen_comic, gen_archive
+):
+ comic = await DB.add(next(gen_comic))
+ other_archive = await DB.add(next(gen_archive))
+ remote_id = other_archive.pages[0].id
+
+ response = Response(await update_comics(comic.id, {"cover": {"id": remote_id}}))
+ response.assert_is("PageRemoteError")
+ assert response.id == remote_id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_page_not_found(update_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await update_comics(comic.id, {"pages": {"ids": 100}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Page ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_page_remote(
+ update_comics, gen_comic, gen_archive
+):
+ comic = await DB.add(next(gen_comic))
+ other_archive = await DB.add(next(gen_archive))
+ remote_id = other_archive.pages[0].id
+
+ response = Response(await update_comics(comic.id, {"pages": {"ids": [remote_id]}}))
+ response.assert_is("PageRemoteError")
+ assert response.id == remote_id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_page_claimed(update_comics, gen_archive):
+ archive = await DB.add(next(gen_archive))
+
+ comic = await DB.add(
+ Comic(
+ id=1,
+ title="A Very Good Comic",
+ archive=archive,
+ cover=archive.pages[0].image,
+ pages=[archive.pages[0], archive.pages[1]],
+ )
+ )
+
+ claiming = await DB.add(
+ Comic(
+ id=2,
+ title="A Very Claiming Comic",
+ archive=archive,
+ cover=archive.pages[2].image,
+ pages=[archive.pages[2], archive.pages[3]],
+ )
+ )
+
+ claimed_id = claiming.pages[0].id
+
+ response = Response(await update_comics(comic.id, {"pages": {"ids": [claimed_id]}}))
+ response.assert_is("PageClaimedError")
+ assert response.id == claimed_id
+ assert response.comicId == claiming.id
+ assert (
+ response.message
+ == f"Page ID {claimed_id} is already claimed by comic ID {claiming.id}"
+ )
+
+
+@pytest.mark.parametrize(
+ "mode",
+ [
+ ("REPLACE"),
+ ("REMOVE"),
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_empty(update_comics, gen_comic, mode):
+ comic = await DB.add(next(gen_comic))
+
+ ids = [] if mode == "REPLACE" else [p.id for p in comic.pages]
+
+ response = Response(
+ await update_comics(
+ comic.id, {"pages": {"ids": ids, "options": {"mode": mode}}}
+ )
+ )
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "pages"
+ assert response.message == "Invalid parameter 'pages': cannot be empty"