summaryrefslogtreecommitdiffstatshomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/api/test_archive.py388
-rw-r--r--tests/api/test_artist.py278
-rw-r--r--tests/api/test_character.py285
-rw-r--r--tests/api/test_circle.py278
-rw-r--r--tests/api/test_collection.py0
-rw-r--r--tests/api/test_comic.py1505
-rw-r--r--tests/api/test_comic_tag.py134
-rw-r--r--tests/api/test_db.py324
-rw-r--r--tests/api/test_filter.py521
-rw-r--r--tests/api/test_image.py16
-rw-r--r--tests/api/test_namespace.py291
-rw-r--r--tests/api/test_page.py39
-rw-r--r--tests/api/test_pagination.py61
-rw-r--r--tests/api/test_scraper_api.py395
-rw-r--r--tests/api/test_sort.py137
-rw-r--r--tests/api/test_tag.py441
-rw-r--r--tests/api/test_world.py278
-rw-r--r--tests/config/data/config.toml3
-rw-r--r--tests/conftest.py594
-rw-r--r--tests/plugins/test_plugins.py9
-rw-r--r--tests/scanner/data/contents/archive.zipbin0 -> 1284 bytes
-rw-r--r--tests/scanner/test_scanner.py311
-rw-r--r--tests/scrapers/test_scraper.py55
-rw-r--r--tests/scrapers/test_scraper_utils.py28
-rw-r--r--tests/scrapers/test_types.py131
-rw-r--r--tests/thumbnailer/data/example_palette.pngbin0 -> 703 bytes
-rw-r--r--tests/thumbnailer/data/example_rgb.pngbin0 -> 14362 bytes
-rw-r--r--tests/thumbnailer/test_thumbnailer.py74
28 files changed, 6576 insertions, 0 deletions
diff --git a/tests/api/test_archive.py b/tests/api/test_archive.py
new file mode 100644
index 0000000..0ef3425
--- /dev/null
+++ b/tests/api/test_archive.py
@@ -0,0 +1,388 @@
+import os
+from datetime import datetime as dt
+from pathlib import Path
+
+import hircine.config
+import hircine.db as database
+import hircine.thumbnailer as thumb
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Archive, Comic, Image, Page
+from sqlalchemy import select
+
+
+@pytest.fixture
+def query_archive(execute_id):
+ query = """
+ query archive($id: Int!) {
+ archive(id: $id) {
+ __typename
+ ... on FullArchive {
+ id
+ name
+ createdAt
+ mtime
+ size
+ path
+ pageCount
+ organized
+ comics {
+ __typename
+ id
+ }
+ cover {
+ __typename
+ id
+ }
+ pages {
+ __typename
+ id
+ image {
+ __typename
+ id
+ }
+ }
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_archives(execute):
+ query = """
+ query archives {
+ archives {
+ __typename
+ count
+ edges {
+ id
+ name
+ size
+ pageCount
+ organized
+ cover {
+ __typename
+ id
+ }
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def update_archives(execute_update):
+ mutation = """
+ mutation updateArchives($ids: [Int!]!, $input: UpdateArchiveInput!) {
+ updateArchives(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on PageRemoteError {
+ id
+ archiveId
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_archives(execute_delete):
+ mutation = """
+ mutation deleteArchives($ids: [Int!]!) {
+ deleteArchives(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+def assert_image_matches(obj, model):
+ assert obj["__typename"] == "Image"
+ assert obj["id"] == model.id
+
+
+def assert_page_matches(obj, model):
+ assert obj["__typename"] == "Page"
+ assert obj["id"] == model.id
+
+
+@pytest.mark.anyio
+async def test_query_archive(query_archive, gen_archive):
+ archive = next(gen_archive)
+ pages = archive.pages
+
+ await DB.add(archive)
+
+ response = Response(await query_archive(archive.id))
+ response.assert_is("FullArchive")
+
+ assert response.id == archive.id
+ assert response.name == archive.name
+ assert dt.fromisoformat(response.createdAt) == archive.created_at
+ assert dt.fromisoformat(response.mtime) == archive.mtime
+ assert response.size == archive.size
+ assert response.path == archive.path
+ assert response.comics == []
+ assert response.pageCount == archive.page_count
+ assert response.organized == archive.organized
+ assert_image_matches(response.cover, pages[0].image)
+
+ assert len(response.pages) == len(pages)
+
+ page_iter = iter(sorted(pages, key=lambda page: page.index))
+ for page in response.pages:
+ matching_page = next(page_iter)
+ assert_page_matches(page, matching_page)
+ assert_image_matches(page["image"], matching_page.image)
+
+
+@pytest.mark.anyio
+async def test_query_archive_sorts_pages(query_archive, gen_jumbled_archive):
+ archive = await DB.add(next(gen_jumbled_archive))
+
+ response = Response(await query_archive(archive.id))
+ response.assert_is("FullArchive")
+
+ page_iter = iter(sorted(archive.pages, key=lambda page: page.index))
+ for page in response.pages:
+ matching_page = next(page_iter)
+ assert_page_matches(page, matching_page)
+ assert_image_matches(page["image"], matching_page.image)
+
+
+@pytest.mark.anyio
+async def test_query_archive_fails_not_found(query_archive):
+ response = Response(await query_archive(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Archive ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_archives(query_archives, gen_archive):
+ archives = await DB.add_all(*gen_archive)
+
+ response = Response(await query_archives())
+ response.assert_is("ArchiveFilterResult")
+
+ assert response.count == len(archives)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(archives)
+
+ edges = iter(response.edges)
+ for archive in sorted(archives, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == archive.id
+ assert edge["name"] == archive.name
+ assert edge["size"] == archive.size
+ assert edge["pageCount"] == archive.page_count
+ assert_image_matches(edge["cover"], archive.cover)
+
+
+@pytest.fixture
+def gen_archive_with_files(tmpdir, monkeypatch, gen_archive):
+ content_dir = os.path.join(tmpdir, "content/")
+ object_dir = os.path.join(tmpdir, "objects/")
+ os.mkdir(content_dir)
+ os.mkdir(object_dir)
+
+ dirs = hircine.config.DirectoryStructure(scan=content_dir, objects=object_dir)
+ monkeypatch.setattr(hircine.config, "dir_structure", dirs)
+
+ archive = next(gen_archive)
+
+ archive_path = Path(os.path.join(content_dir, "archive.zip"))
+ archive_path.touch()
+ archive.path = str(archive_path)
+
+ img_paths = []
+ for page in archive.pages:
+ for suffix in ["full", "thumb"]:
+ img_path = Path(thumb.object_path(object_dir, page.image.hash, suffix))
+ os.makedirs(os.path.dirname(img_path), exist_ok=True)
+ img_path.touch()
+
+ img_paths.append(img_path)
+
+ yield archive, content_dir, object_dir, img_paths
+
+
+@pytest.mark.anyio
+async def test_delete_archive(delete_archives, gen_archive_with_files):
+ archive, content_dir, object_dir, img_paths = gen_archive_with_files
+ archive_path = archive.path
+
+ archive = await DB.add(archive)
+ page_ids = [page.id for page in archive.pages]
+ image_ids = [page.image.id for page in archive.pages]
+
+ response = Response(await delete_archives(archive.id))
+ response.assert_is("DeleteSuccess")
+
+ archive = await DB.get(Archive, archive.id)
+ assert archive is None
+
+ async with database.session() as s:
+ db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all()
+ db_images = (
+ await s.scalars(select(Image).where(Image.id.in_(image_ids)))
+ ).all()
+
+ assert db_pages == []
+ assert db_images == []
+
+ assert os.path.exists(archive_path) is False
+ for img_path in img_paths:
+ assert os.path.exists(img_path) is False
+
+
+@pytest.mark.anyio
+async def test_delete_archive_deletes_images_only_when_necessary(
+ delete_archives, gen_archive_with_files, gen_archive
+):
+ archive, content_dir, object_dir, img_paths = gen_archive_with_files
+ archive_path = archive.path
+
+ archive = await DB.add(archive)
+ page_ids = [page.id for page in archive.pages]
+ image_ids = [page.image.id for page in archive.pages]
+
+ another = next(gen_archive)
+ another.pages = [
+ Page(path="foo", index=1, image_id=id, archive=another) for id in image_ids
+ ]
+ another.cover = archive.cover
+ await DB.add(another)
+
+ response = Response(await delete_archives(archive.id))
+ response.assert_is("DeleteSuccess")
+
+ archive = await DB.get(Archive, archive.id)
+ assert archive is None
+
+ async with database.session() as s:
+ db_pages = (await s.scalars(select(Page).where(Page.id.in_(page_ids)))).all()
+ db_images = (
+ await s.scalars(select(Image.id).where(Image.id.in_(image_ids)))
+ ).all()
+
+ assert db_pages == []
+ assert db_images == image_ids
+
+ assert os.path.exists(archive_path) is False
+ for img_path in img_paths:
+ assert os.path.exists(img_path) is True
+
+
+@pytest.mark.anyio
+async def test_delete_archive_cascades_on_comic(
+ delete_archives, gen_archive_with_files
+):
+ archive, *_ = gen_archive_with_files
+ comic = Comic(
+ id=1,
+ title="Hic Sunt Dracones",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+
+ comic = await DB.add(comic)
+
+ response = Response(await delete_archives(comic.archive.id))
+ response.assert_is("DeleteSuccess")
+
+ archive = await DB.get(Archive, archive.id)
+ assert archive is None
+
+ comic = await DB.get(Comic, comic.id)
+ assert comic is None
+
+
+@pytest.mark.anyio
+async def test_update_archives(update_archives, gen_archive):
+ old_archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await update_archives(
+ old_archive.id,
+ {"cover": {"id": old_archive.pages[1].id}, "organized": True},
+ )
+ )
+ response.assert_is("UpdateSuccess")
+
+ archive = await DB.get(Archive, old_archive.id)
+
+ assert archive.cover_id == old_archive.pages[1].image.id
+ assert archive.organized is True
+
+
+@pytest.mark.anyio
+async def test_update_archive_fails_archive_not_found(update_archives, gen_archive):
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await update_archives(100, {"cover": {"id": archive.pages[1].id}})
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Archive ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_archive_cover_fails_page_not_found(update_archives, gen_archive):
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(await update_archives(archive.id, {"cover": {"id": 100}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Page ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_archive_cover_fails_page_remote(update_archives, gen_archive):
+ archive = await DB.add(next(gen_archive))
+ another = await DB.add(next(gen_archive))
+ remote_id = another.pages[0].id
+
+ response = Response(await update_archives(archive.id, {"cover": {"id": remote_id}}))
+ response.assert_is("PageRemoteError")
+ assert response.id == remote_id
+ assert response.archiveId == another.id
+ assert (
+ response.message
+ == f"Page ID {remote_id} comes from remote archive ID {another.id}"
+ )
diff --git a/tests/api/test_artist.py b/tests/api/test_artist.py
new file mode 100644
index 0000000..8cb2f1a
--- /dev/null
+++ b/tests/api/test_artist.py
@@ -0,0 +1,278 @@
+from datetime import datetime as dt
+from datetime import timezone
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Artist
+
+
+@pytest.fixture
+def query_artist(execute_id):
+ query = """
+ query artist($id: Int!) {
+ artist(id: $id) {
+ __typename
+ ... on Artist {
+ id
+ name
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_artists(execute):
+ query = """
+ query artists {
+ artists {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def add_artist(execute_add):
+ mutation = """
+ mutation addArtist($input: AddArtistInput!) {
+ addArtist(input: $input) {
+ __typename
+ ... on AddSuccess {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def update_artists(execute_update):
+ mutation = """
+ mutation updateArtists($ids: [Int!]!, $input: UpdateArtistInput!) {
+ updateArtists(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_artists(execute_delete):
+ mutation = """
+ mutation deleteArtists($ids: [Int!]!) {
+ deleteArtists(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.mark.anyio
+async def test_query_artist(query_artist, gen_artist):
+ artist = await DB.add(next(gen_artist))
+
+ response = Response(await query_artist(artist.id))
+ response.assert_is("Artist")
+
+ assert response.id == artist.id
+ assert response.name == artist.name
+
+
+@pytest.mark.anyio
+async def test_query_artist_fails_not_found(query_artist):
+ response = Response(await query_artist(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Artist ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_artists(query_artists, gen_artist):
+ artists = await DB.add_all(*gen_artist)
+
+ response = Response(await query_artists())
+ response.assert_is("ArtistFilterResult")
+
+ assert response.count == len(artists)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(artists)
+
+ edges = iter(response.edges)
+ for artist in sorted(artists, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == artist.id
+ assert edge["name"] == artist.name
+
+
+@pytest.mark.anyio
+async def test_add_artist(add_artist):
+ response = Response(await add_artist({"name": "added artist"}))
+ response.assert_is("AddSuccess")
+
+ artist = await DB.get(Artist, response.id)
+ assert artist is not None
+ assert artist.name == "added artist"
+
+
+@pytest.mark.anyio
+async def test_add_artist_fails_empty_parameter(add_artist):
+ response = Response(await add_artist({"name": ""}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_add_artist_fails_exists(add_artist, gen_artist):
+ artist = await DB.add(next(gen_artist))
+
+ response = Response(await add_artist({"name": artist.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Artist with this name exists"
+
+
+@pytest.mark.anyio
+async def test_delete_artist(delete_artists, gen_artist):
+ artist = await DB.add(next(gen_artist))
+ id = artist.id
+
+ response = Response(await delete_artists(id))
+ response.assert_is("DeleteSuccess")
+
+ artist = await DB.get(Artist, id)
+ assert artist is None
+
+
+@pytest.mark.anyio
+async def test_delete_artist_not_found(delete_artists):
+ response = Response(await delete_artists(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Artist ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_artist(update_artists, gen_artist):
+ artist = await DB.add(next(gen_artist))
+
+ input = {"name": "updated artist"}
+ response = Response(await update_artists(artist.id, input))
+ response.assert_is("UpdateSuccess")
+
+ artist = await DB.get(Artist, artist.id)
+ assert artist is not None
+ assert artist.name == "updated artist"
+
+
+@pytest.mark.anyio
+async def test_update_artist_fails_exists(update_artists, gen_artist):
+ first = await DB.add(next(gen_artist))
+ second = await DB.add(next(gen_artist))
+
+ response = Response(await update_artists(second.id, {"name": first.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Artist with this name exists"
+
+
+@pytest.mark.anyio
+async def test_update_artist_fails_not_found(update_artists):
+ response = Response(await update_artists(1, {"name": "updated artist"}))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Artist ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_artists_cannot_bulk_edit_name(update_artists, gen_artist):
+ first = await DB.add(next(gen_artist))
+ second = await DB.add(next(gen_artist))
+
+ response = Response(await update_artists([first.id, second.id], {"name": "unique"}))
+ response.assert_is("InvalidParameterError")
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "none",
+ "empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_artist_fails_empty_parameter(update_artists, gen_artist, empty):
+ artist = await DB.add(next(gen_artist))
+
+ response = Response(await update_artists(artist.id, {"name": empty}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_artist_changes_updated_at(update_artists):
+ original_artist = Artist(name="artist")
+ original_artist.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_artist = await DB.add(original_artist)
+
+ response = Response(await update_artists(original_artist.id, {"name": "updated"}))
+ response.assert_is("UpdateSuccess")
+
+ artist = await DB.get(Artist, original_artist.id)
+ assert artist.updated_at > original_artist.updated_at
diff --git a/tests/api/test_character.py b/tests/api/test_character.py
new file mode 100644
index 0000000..567d2a4
--- /dev/null
+++ b/tests/api/test_character.py
@@ -0,0 +1,285 @@
+from datetime import datetime as dt
+from datetime import timezone
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Character
+
+
+@pytest.fixture
+def query_character(execute_id):
+ query = """
+ query character($id: Int!) {
+ character(id: $id) {
+ __typename
+ ... on Character {
+ id
+ name
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_characters(execute):
+ query = """
+ query characters {
+ characters {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def add_character(execute_add):
+ mutation = """
+ mutation addCharacter($input: AddCharacterInput!) {
+ addCharacter(input: $input) {
+ __typename
+ ... on AddSuccess {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def update_characters(execute_update):
+ mutation = """
+ mutation updateCharacters($ids: [Int!]!, $input: UpdateCharacterInput!) {
+ updateCharacters(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_characters(execute_delete):
+ mutation = """
+ mutation deleteCharacters($ids: [Int!]!) {
+ deleteCharacters(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.mark.anyio
+async def test_query_character(query_character, gen_character):
+ character = await DB.add(next(gen_character))
+
+ response = Response(await query_character(character.id))
+ response.assert_is("Character")
+
+ assert response.id == character.id
+ assert response.name == character.name
+
+
+@pytest.mark.anyio
+async def test_query_character_fails_not_found(query_character):
+ response = Response(await query_character(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Character ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_characters(query_characters, gen_character):
+ characters = await DB.add_all(*gen_character)
+
+ response = Response(await query_characters())
+ response.assert_is("CharacterFilterResult")
+
+ assert response.count == len(characters)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(characters)
+
+ edges = iter(response.edges)
+ for character in sorted(characters, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == character.id
+ assert edge["name"] == character.name
+
+
+@pytest.mark.anyio
+async def test_add_character(add_character):
+ response = Response(await add_character({"name": "added character"}))
+ response.assert_is("AddSuccess")
+
+ character = await DB.get(Character, response.id)
+ assert character is not None
+ assert character.name == "added character"
+
+
+@pytest.mark.anyio
+async def test_add_character_fails_empty_parameter(add_character):
+ response = Response(await add_character({"name": ""}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_add_character_fails_exists(add_character, gen_character):
+ character = await DB.add(next(gen_character))
+
+ response = Response(await add_character({"name": character.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Character with this name exists"
+
+
+@pytest.mark.anyio
+async def test_delete_character(delete_characters, gen_character):
+ character = await DB.add(next(gen_character))
+ id = character.id
+
+ response = Response(await delete_characters(id))
+ response.assert_is("DeleteSuccess")
+
+ character = await DB.get(Character, id)
+ assert character is None
+
+
+@pytest.mark.anyio
+async def test_delete_character_not_found(delete_characters):
+ response = Response(await delete_characters(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Character ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_character(update_characters, gen_character):
+ character = await DB.add(next(gen_character))
+
+ input = {"name": "updated character"}
+ response = Response(await update_characters(character.id, input))
+ response.assert_is("UpdateSuccess")
+
+ character = await DB.get(Character, character.id)
+ assert character is not None
+ assert character.name == "updated character"
+
+
+@pytest.mark.anyio
+async def test_update_character_fails_exists(update_characters, gen_character):
+ first = await DB.add(next(gen_character))
+ second = await DB.add(next(gen_character))
+
+ response = Response(await update_characters(second.id, {"name": first.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Character with this name exists"
+
+
+@pytest.mark.anyio
+async def test_update_character_fails_not_found(update_characters):
+ response = Response(await update_characters(1, {"name": "updated_character"}))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Character ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_characters_cannot_bulk_edit_name(
+ update_characters, gen_character
+):
+ first = await DB.add(next(gen_character))
+ second = await DB.add(next(gen_character))
+
+ response = Response(
+ await update_characters([first.id, second.id], {"name": "unique"})
+ )
+ response.assert_is("InvalidParameterError")
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "none",
+ "empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_character_fails_empty_parameter(
+ update_characters, gen_character, empty
+):
+ character = await DB.add(next(gen_character))
+ response = Response(await update_characters(character.id, {"name": empty}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_character_changes_updated_at(update_characters):
+ original_character = Character(name="character")
+ original_character.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_character = await DB.add(original_character)
+
+ response = Response(
+ await update_characters(original_character.id, {"name": "updated"})
+ )
+ response.assert_is("UpdateSuccess")
+
+ character = await DB.get(Character, original_character.id)
+ assert character.updated_at > original_character.updated_at
diff --git a/tests/api/test_circle.py b/tests/api/test_circle.py
new file mode 100644
index 0000000..a03ba89
--- /dev/null
+++ b/tests/api/test_circle.py
@@ -0,0 +1,278 @@
+from datetime import datetime as dt
+from datetime import timezone
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Circle
+
+
+@pytest.fixture
+def query_circle(execute_id):
+ query = """
+ query circle($id: Int!) {
+ circle(id: $id) {
+ __typename
+ ... on Circle {
+ id
+ name
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_circles(execute):
+ query = """
+ query circles {
+ circles {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def add_circle(execute_add):
+ mutation = """
+ mutation addCircle($input: AddCircleInput!) {
+ addCircle(input: $input) {
+ __typename
+ ... on AddSuccess {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def update_circles(execute_update):
+ mutation = """
+ mutation updateCircles($ids: [Int!]!, $input: UpdateCircleInput!) {
+ updateCircles(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_circles(execute_delete):
+ mutation = """
+ mutation deleteCircles($ids: [Int!]!) {
+ deleteCircles(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.mark.anyio
+async def test_query_circle(query_circle, gen_circle):
+ circle = await DB.add(next(gen_circle))
+
+ response = Response(await query_circle(circle.id))
+ response.assert_is("Circle")
+
+ assert response.id == circle.id
+ assert response.name == circle.name
+
+
+@pytest.mark.anyio
+async def test_query_circle_fails_not_found(query_circle):
+ response = Response(await query_circle(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Circle ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_circles(query_circles, gen_circle):
+ circles = await DB.add_all(*gen_circle)
+
+ response = Response(await query_circles())
+ response.assert_is("CircleFilterResult")
+
+ assert response.count == len(circles)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(circles)
+
+ edges = iter(response.edges)
+ for circle in sorted(circles, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == circle.id
+ assert edge["name"] == circle.name
+
+
+@pytest.mark.anyio
+async def test_add_circle(add_circle):
+ response = Response(await add_circle({"name": "added circle"}))
+ response.assert_is("AddSuccess")
+
+ circle = await DB.get(Circle, response.id)
+ assert circle is not None
+ assert circle.name == "added circle"
+
+
+@pytest.mark.anyio
+async def test_add_circle_fails_empty_parameter(add_circle):
+ response = Response(await add_circle({"name": ""}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_add_circle_fails_exists(add_circle, gen_circle):
+ circle = await DB.add(next(gen_circle))
+
+ response = Response(await add_circle({"name": circle.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Circle with this name exists"
+
+
+@pytest.mark.anyio
+async def test_delete_circle(delete_circles, gen_circle):
+ circle = await DB.add(next(gen_circle))
+ id = circle.id
+
+ response = Response(await delete_circles(id))
+ response.assert_is("DeleteSuccess")
+
+ circle = await DB.get(Circle, id)
+ assert circle is None
+
+
+@pytest.mark.anyio
+async def test_delete_circle_not_found(delete_circles):
+ response = Response(await delete_circles(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Circle ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_circle(update_circles, gen_circle):
+ circle = await DB.add(next(gen_circle))
+
+ input = {"name": "updated circle"}
+ response = Response(await update_circles(circle.id, input))
+ response.assert_is("UpdateSuccess")
+
+ circle = await DB.get(Circle, circle.id)
+ assert circle is not None
+ assert circle.name == "updated circle"
+
+
+@pytest.mark.anyio
+async def test_update_circle_fails_exists(update_circles, gen_circle):
+ first = await DB.add(next(gen_circle))
+ second = await DB.add(next(gen_circle))
+
+ response = Response(await update_circles(second.id, {"name": first.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Circle with this name exists"
+
+
+@pytest.mark.anyio
+async def test_update_circle_fails_not_found(update_circles):
+ response = Response(await update_circles(1, {"name": "updated circle"}))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Circle ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_circles_cannot_bulk_edit_name(update_circles, gen_circle):
+ first = await DB.add(next(gen_circle))
+ second = await DB.add(next(gen_circle))
+
+ response = Response(await update_circles([first.id, second.id], {"name": "unique"}))
+ response.assert_is("InvalidParameterError")
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "none",
+ "empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_circle_fails_empty_parameter(update_circles, gen_circle, empty):
+ circle = await DB.add(next(gen_circle))
+
+ response = Response(await update_circles(circle.id, {"name": empty}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_circle_changes_updated_at(update_circles):
+ original_circle = Circle(name="circle")
+ original_circle.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_circle = await DB.add(original_circle)
+
+ response = Response(await update_circles(original_circle.id, {"name": "updated"}))
+ response.assert_is("UpdateSuccess")
+
+ circle = await DB.get(Circle, original_circle.id)
+ assert circle.updated_at > original_circle.updated_at
diff --git a/tests/api/test_collection.py b/tests/api/test_collection.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/api/test_collection.py
diff --git a/tests/api/test_comic.py b/tests/api/test_comic.py
new file mode 100644
index 0000000..d3fa51e
--- /dev/null
+++ b/tests/api/test_comic.py
@@ -0,0 +1,1505 @@
+from datetime import date, timezone
+from datetime import datetime as dt
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import (
+ Artist,
+ Circle,
+ Comic,
+ ComicArtist,
+ ComicTag,
+ Namespace,
+ Tag,
+ World,
+)
+from hircine.enums import Category, Censorship, Direction, Language, Layout, Rating
+
+full_comic_fragment = """
+ fragment FullComic on FullComic {
+ id
+ title
+ category
+ censorship
+ createdAt
+ date
+ direction
+ language
+ layout
+ originalTitle
+ url
+ rating
+ pageCount
+ updatedAt
+ organized
+ bookmarked
+ archive {
+ __typename
+ id
+ }
+ artists {
+ __typename
+ id
+ }
+ characters {
+ __typename
+ id
+ }
+ circles {
+ __typename
+ id
+ }
+ cover {
+ __typename
+ id
+ }
+ pages {
+ __typename
+ id
+ }
+ tags {
+ __typename
+ id
+ name
+ }
+ worlds {
+ __typename
+ id
+ }
+ }
+"""
+
+comic_fragment = """
+ fragment Comic on Comic {
+ id
+ title
+ category
+ censorship
+ date
+ language
+ originalTitle
+ rating
+ pageCount
+ organized
+ bookmarked
+ artists {
+ __typename
+ id
+ }
+ characters {
+ __typename
+ id
+ }
+ circles {
+ __typename
+ id
+ }
+ cover {
+ __typename
+ id
+ }
+ tags {
+ __typename
+ id
+ name
+ }
+ worlds {
+ __typename
+ id
+ }
+ }
+"""
+
+
+@pytest.fixture
+def query_comic(execute_id):
+ query = """
+ query comic($id: Int!) {
+ comic(id: $id) {
+ __typename
+ ... on FullComic {
+ ...FullComic
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(full_comic_fragment + query)
+
+
+@pytest.fixture
+def query_comics(execute):
+ query = """
+ query comics {
+ comics {
+ __typename
+ count
+ edges {
+ ...Comic
+ }
+ }
+ }
+ """
+
+ return execute(comic_fragment + query)
+
+
+@pytest.fixture
+def add_comic(execute_add):
+ mutation = """
+ mutation addComic($input: AddComicInput!) {
+ addComic(input: $input) {
+ __typename
+ ... on AddComicSuccess {
+ id
+ archivePagesRemaining
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on PageClaimedError {
+ comicId
+ id
+ }
+ ... on PageRemoteError {
+ archiveId
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def delete_comics(execute_delete):
+ mutation = """
+ mutation deleteComics($ids: [Int!]!) {
+ deleteComics(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.fixture
+def update_comics(execute_update):
+ mutation = """
+ mutation updateComics($ids: [Int!]!, $input: UpdateComicInput!) {
+ updateComics(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on PageRemoteError {
+ id
+ archiveId
+ }
+ ... on PageClaimedError {
+ id
+ comicId
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def upsert_comics(execute_update):
+ mutation = """
+ mutation upsertComics($ids: [Int!]!, $input: UpsertComicInput!) {
+ upsertComics(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+def assert_association_matches(obj, model, typename):
+ assert obj["__typename"] == typename
+ assert obj["id"] == model.id
+
+
+def assert_associations_match(objlist, modellist, typename, sortkey):
+ assert isinstance((objlist), list)
+ assert len(objlist) == len(modellist)
+ model = iter(sorted(modellist, key=sortkey))
+ for obj in objlist:
+ assert_association_matches(obj, next(model), typename)
+
+
+def assert_comic_item_matches(data, comic):
+ assert data["id"] == comic.id
+ assert data["title"] == comic.title
+ assert data["originalTitle"] == comic.original_title
+ assert date.fromisoformat(data["date"]) == comic.date
+ assert Rating[data["rating"]] == comic.rating
+ assert Language[data["language"]] == comic.language
+ assert data["pageCount"] == comic.page_count
+ assert data["organized"] == comic.organized
+ assert data["bookmarked"] == comic.bookmarked
+
+ if data["category"]:
+ assert Category[data["category"]] == comic.category
+ else:
+ assert comic.category is None
+
+ if data["censorship"]:
+ assert Censorship[data["censorship"]] == comic.censorship
+ else:
+ assert comic.censorship is None
+
+ assert_association_matches(data["cover"], comic.cover, "Image")
+ assert_associations_match(
+ data["artists"], comic.artists, "Artist", lambda a: a.name
+ )
+ assert_associations_match(
+ data["characters"], comic.characters, "Character", lambda c: c.name
+ )
+ assert_associations_match(
+ data["circles"], comic.circles, "Circle", lambda c: c.name
+ )
+ assert_associations_match(data["tags"], comic.tags, "ComicTag", lambda t: t.name)
+ assert_associations_match(data["worlds"], comic.worlds, "World", lambda w: w.name)
+
+
+def assert_comic_matches(data, comic):
+ assert_comic_item_matches(data, comic)
+ assert dt.fromisoformat(data["createdAt"]) == comic.created_at
+ assert dt.fromisoformat(data["updatedAt"]) == comic.updated_at
+ assert Direction[data["direction"]] == comic.direction
+ assert Layout[data["layout"]] == comic.layout
+ assert data["url"] == comic.url
+
+ assert_association_matches(data["archive"], comic.archive, "Archive")
+ assert_associations_match(data["pages"], comic.pages, "Page", lambda p: p.index)
+
+
+@pytest.mark.anyio
+async def test_query_comic(query_comic, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_comic(comic.id))
+ response.assert_is("FullComic")
+
+ assert_comic_matches(response.data, comic)
+
+
+@pytest.mark.anyio
+async def test_query_comic_sorts_pages(query_comic, gen_jumbled_archive):
+ archive = next(gen_jumbled_archive)
+
+ comic = await DB.add(
+ Comic(
+ id=1,
+ title="A Jumbled Mess",
+ archive=archive,
+ pages=archive.pages,
+ cover=archive.cover,
+ )
+ )
+
+ response = Response(await query_comic(comic.id))
+ response.assert_is("FullComic")
+
+ assert_associations_match(response.pages, comic.pages, "Page", lambda p: p.index)
+
+
+@pytest.mark.anyio
+async def test_query_comic_fails_not_found(query_comic):
+ response = Response(await query_comic(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Comic ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_comics(query_comics, gen_comic):
+ comics = await DB.add_all(*gen_comic)
+
+ response = Response(await query_comics())
+ response.assert_is("ComicFilterResult")
+
+ assert response.count == len(comics)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(comics)
+
+ edge = iter(response.edges)
+ for comic in sorted(comics, key=lambda c: c.title):
+ assert_comic_item_matches(next(edge), comic)
+
+
+@pytest.mark.anyio
+async def test_add_comic(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ before = dt.now(timezone.utc).replace(microsecond=0)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "The Comically Bad Comic",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("AddComicSuccess")
+ assert response.archivePagesRemaining is False
+
+ after = dt.now(timezone.utc).replace(microsecond=0)
+
+ comic = await DB.get(Comic, response.id, full=True)
+ assert comic is not None
+ assert comic.title == "The Comically Bad Comic"
+
+ assert comic.archive.id == archive.id
+ assert comic.archive.organized is True
+
+ assert set([page.id for page in comic.pages]) == set(
+ [page.id for page in archive.pages]
+ )
+
+ assert comic.cover.id == archive.cover.id
+
+ assert comic.category is None
+ assert comic.censorship is None
+ assert comic.created_at >= before
+ assert comic.created_at <= after
+ assert comic.date is None
+ assert comic.language is None
+ assert comic.layout == Layout.SINGLE
+ assert comic.original_title is None
+ assert comic.url is None
+ assert comic.rating is None
+
+ assert comic.artists == []
+ assert comic.characters == []
+ assert comic.circles == []
+ assert comic.tags == []
+ assert comic.worlds == []
+
+
+@pytest.mark.anyio
+async def test_add_comic_pages_remaining(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "The Unfinished Comic",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages][:2]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("AddComicSuccess")
+ assert response.archivePagesRemaining is True
+
+ comic = await DB.get(Comic, response.id, full=True)
+ assert comic.archive.organized is False
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_archive_not_found(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Voidful Comic",
+ "archive": {"id": 10},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 10
+ assert response.message == "Archive ID not found: '10'"
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_page_not_found(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Pageless Comic",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [10]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 10
+ assert response.message == "Page ID not found: '10'"
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_page_claimed(add_comic, gen_archive):
+ other_archive = next(gen_archive)
+ other_comic = await DB.add(
+ Comic(
+ title="Lawful Comic",
+ archive=other_archive,
+ cover=other_archive.cover,
+ pages=other_archive.pages,
+ )
+ )
+
+ claimed_page = other_comic.pages[0]
+
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Comic of Attempted Burglary",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [claimed_page.id]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+
+ response.assert_is("PageClaimedError")
+ assert response.id == claimed_page.id
+ assert response.comicId == other_comic.id
+ assert (
+ response.message
+ == f"Page ID {claimed_page.id} is already claimed by comic ID {other_comic.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_empty_parameter(add_comic, gen_archive):
+ archive = next(gen_archive)
+ await DB.add(archive)
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "title"
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_page_remote(add_comic, gen_archive):
+ other_archive = await DB.add(next(gen_archive))
+ other_page = other_archive.pages[0]
+
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Comic of Multiple Archives",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [other_page.id]},
+ "cover": {"id": archive.pages[0].id},
+ }
+ )
+ )
+
+ response.assert_is("PageRemoteError")
+ assert response.id == other_page.id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_add_comic_fails_cover_remote(add_comic, gen_archive):
+ other_archive = await DB.add(next(gen_archive))
+ other_page = other_archive.pages[0]
+
+ archive = await DB.add(next(gen_archive))
+
+ response = Response(
+ await add_comic(
+ {
+ "title": "Comic of Multiple Archives",
+ "archive": {"id": archive.id},
+ "pages": {"ids": [p.id for p in archive.pages]},
+ "cover": {"id": other_page.id},
+ }
+ )
+ )
+
+ response.assert_is("PageRemoteError")
+ assert response.id == other_page.id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {other_page.id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_delete_comic(delete_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await delete_comics(comic.id))
+ response.assert_is("DeleteSuccess")
+
+ comic = await DB.get(Comic, comic.id)
+ assert comic is None
+
+
+@pytest.mark.anyio
+async def test_delete_comic_not_found(delete_comics):
+ response = Response(await delete_comics(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Comic ID not found: '1'"
+
+
+def assert_assocs_match(assocs, collection, name_only=False):
+ assert set([o.name for o in assocs]) == set([o.name for o in collection])
+ assert set([o.id for o in assocs]) == set([o.id for o in collection])
+
+
+@pytest.mark.anyio
+async def test_update_comic(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+
+ artists = await DB.add_all(Artist(name="arty"), Artist(name="farty"))
+ circles = await DB.add_all(Circle(name="round"), Circle(name="oval"))
+ worlds = await DB.add_all(World(name="animal world"), World(name="no spiders"))
+
+ namespace = await DB.add(Namespace(name="emus"))
+ tag = await DB.add(Tag(name="creepy"))
+ ct = ComicTag(namespace=namespace, tag=tag)
+
+ new_tags = [ct] + original_comic.tags
+ new_pages = [p.id for p in original_comic.pages[:2]]
+
+ input = {
+ "title": "Saucy Savannah Adventures (now in Italian)",
+ "url": "file:///home/savannah/avventura",
+ "originalTitle": original_comic.title,
+ "cover": {"id": original_comic.pages[1].id},
+ "pages": {"ids": new_pages},
+ "favourite": False,
+ "organized": True,
+ "bookmarked": True,
+ "artists": {"ids": [a.id for a in artists]},
+ "circles": {"ids": [c.id for c in circles]},
+ "worlds": {"ids": [w.id for w in worlds]},
+ "tags": {"ids": [ct.id for ct in new_tags]},
+ "date": "2010-07-06",
+ "direction": "LEFT_TO_RIGHT",
+ "language": "IT",
+ "layout": "DOUBLE_OFFSET",
+ "rating": "EXPLICIT",
+ "censorship": "BAR",
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+ assert comic.title == "Saucy Savannah Adventures (now in Italian)"
+ assert comic.original_title == original_comic.title
+ assert comic.cover.id == original_comic.pages[1].image.id
+ assert comic.url == "file:///home/savannah/avventura"
+ assert comic.favourite is False
+ assert comic.organized is True
+ assert comic.bookmarked is True
+
+ assert set([p.id for p in comic.pages]) == set(new_pages)
+
+ assert_assocs_match(comic.artists, artists)
+ assert_assocs_match(comic.circles, circles)
+ assert_assocs_match(comic.characters, original_comic.characters)
+ assert_assocs_match(comic.worlds, worlds)
+ assert_assocs_match(comic.tags, new_tags)
+
+ assert comic.date == date(2010, 7, 6)
+ assert comic.direction == Direction.LEFT_TO_RIGHT
+ assert comic.layout == Layout.DOUBLE_OFFSET
+ assert comic.rating == Rating.EXPLICIT
+ assert comic.language == Language.IT
+ assert comic.censorship == Censorship.BAR
+
+
+@pytest.mark.anyio
+async def test_update_comic_clears_associations(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+
+ empty = {"ids": []}
+
+ input = {
+ "artists": empty,
+ "circles": empty,
+ "worlds": empty,
+ "tags": empty,
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert comic.artists == []
+ assert comic.circles == []
+ assert comic.worlds == []
+ assert comic.tags == []
+
+
+@pytest.mark.anyio
+async def test_update_comic_clears_enums(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+
+ input = {
+ "category": None,
+ "censorship": None,
+ "rating": None,
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert comic.rating is None
+ assert comic.category is None
+ assert comic.censorship is None
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "with None",
+ "with empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_clears_string_fields(update_comics, gen_comic, empty):
+ original_comic = await DB.add(next(gen_comic))
+
+ input = {
+ "originalTitle": empty,
+ "url": empty,
+ "date": None,
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert comic.original_title is None
+ assert comic.date is None
+ assert comic.url is None
+
+
+@pytest.mark.anyio
+async def test_update_comic_fails_comic_not_found(update_comics):
+ response = Response(await update_comics(1, {"title": "This Will Not Happen"}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Comic ID not found: '1'"
+
+
+@pytest.mark.parametrize(
+ "parameter,empty",
+ [
+ ("title", ""),
+ ("title", None),
+ ("direction", None),
+ ("layout", None),
+ ],
+ ids=[
+ "title (empty string)",
+ "title (none)",
+ "direction",
+ "layout",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_fails_empty_parameter(
+ update_comics, gen_archive, parameter, empty
+):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ response = Response(await update_comics(comic.id, {parameter: empty}))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == parameter
+ assert response.message == f"Invalid parameter '{parameter}': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_comic_fails_namespace_not_found(update_comics, gen_archive):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ tag = await DB.add(Tag(name="shiny"))
+
+ response = Response(
+ await update_comics(comic.id, {"tags": {"ids": [f"1:{tag.id}"]}})
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_comic_fails_tag_not_found(update_comics, gen_archive):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ namespace = await DB.add(Namespace(name="height"))
+
+ response = Response(
+ await update_comics(comic.id, {"tags": {"ids": [f"{namespace.id}:1"]}})
+ )
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.parametrize(
+ "input",
+ [
+ "",
+ ":1",
+ "1:",
+ "a:b",
+ "tag",
+ ],
+ ids=[
+ "empty",
+ "namespacing missing",
+ "tag missing",
+ "no numeric ids",
+ "wrong format",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_fails_invalid_tag(update_comics, gen_archive, input):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ response = Response(await update_comics(comic.id, {"tags": {"ids": [input]}}))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "id"
+
+ msg = "Invalid parameter 'id': ComicTag ID must be specified as <namespace_id>:<tag_id>" # noqa: E501
+ assert response.message == msg
+
+
+@pytest.mark.parametrize(
+ "name,key,id",
+ [
+ ("Artist", "artists", 1),
+ ("Character", "characters", 1),
+ ("Circle", "circles", 1),
+ ("World", "worlds", 1),
+ ],
+ ids=[
+ "artist",
+ "character",
+ "circle",
+ "world",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_fails_assoc_not_found(
+ update_comics, gen_archive, name, key, id
+):
+ archive = next(gen_archive)
+ comic = await DB.add(
+ Comic(
+ title="Dusty Old Comic",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
+ )
+
+ response = Response(await update_comics(comic.id, {key: {"ids": [id]}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == id
+ assert response.message == f"{name} ID not found: '{id}'"
+
+
+@pytest.mark.parametrize(
+ "option",
+ [
+ None,
+ {},
+ {"onMissing": "IGNORE"},
+ ],
+ ids=[
+ "default option",
+ "empty option",
+ "explicit option",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_ignores_with(upsert_comics, gen_comic, option):
+ original_comic = await DB.add(next(gen_comic))
+
+ new_artists = await DB.add_all(Artist(name="arty"), Artist(name="farty"))
+
+ input = {
+ "artists": {
+ "names": [a.name for a in new_artists] + ["newy"],
+ "options": option,
+ },
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.artists, original_comic.artists + list(new_artists))
+
+
+@pytest.mark.parametrize(
+ "option",
+ [
+ None,
+ {},
+ {"onMissing": "IGNORE"},
+ ],
+ ids=[
+ "default option",
+ "empty option",
+ "explicit option",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_ignores_missing_tags_with(upsert_comics, gen_comic, option):
+ original_comic = await DB.add(next(gen_comic))
+
+ tags = await DB.add_all(
+ ComicTag(
+ comic_id=original_comic.id,
+ namespace=Namespace(name="foo"),
+ tag=Tag(name="bar"),
+ )
+ )
+
+ input = {
+ "tags": {"names": ["foo:bar", "baz:qux"], "options": option},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.tags, original_comic.tags + list(tags))
+
+
+@pytest.mark.parametrize(
+ "option",
+ [
+ None,
+ {},
+ {"onMissing": "IGNORE"},
+ {"onMissing": "CREATE"},
+ ],
+ ids=[
+ "default option",
+ "empty option",
+ "IGNORE",
+ "CREATE",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_skips_existing_tags(upsert_comics, gen_comic, option):
+ comic = await DB.add(next(gen_comic))
+
+ ctag = comic.tags[0]
+ names = [f"{ctag.namespace.name}:{ctag.tag.name}"]
+
+ input = {
+ "tags": {"names": names, "options": option},
+ }
+ response = Response(await upsert_comics(comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.tags, comic.tags)
+
+
+@pytest.mark.parametrize(
+ "valid",
+ [
+ True,
+ False,
+ ],
+ ids=[
+ "valid combination",
+ "invalid combination",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_ignore_missing_handles_resident(
+ upsert_comics, gen_comic, valid
+):
+ original_comic = await DB.add(next(gen_comic))
+
+ namespace = await DB.add(Namespace(name="foo"))
+ tag = Tag(name="bar")
+ if valid:
+ tag.namespaces = [namespace]
+
+ tag = await DB.add(tag)
+ ctag = ComicTag(namespace=namespace, tag=tag)
+
+ expected_tags = original_comic.tags
+
+ if valid:
+ expected_tags.append(ctag)
+
+ input = {
+ "tags": {"names": ["foo:bar"], "options": {"onMissing": "IGNORE"}},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert_assocs_match(comic.tags, expected_tags)
+
+
+@pytest.mark.anyio
+async def test_upsert_comic_tags_uses_existing(upsert_comics, empty_comic):
+ original_comic = await DB.add(empty_comic)
+
+ await DB.add_all(Namespace(name="foo"))
+ await DB.add_all(Tag(name="bar"))
+
+ tag_names = ["foo:bar"]
+
+ response = Response(
+ await upsert_comics(
+ original_comic.id,
+ {"tags": {"names": tag_names, "options": {"onMissing": "CREATE"}}},
+ )
+ )
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert set(tag_names) == set(
+ [f"{t.namespace.name}:{t.tag.name}" for t in comic.tags]
+ )
+
+
+@pytest.mark.parametrize(
+ "key,list",
+ [
+ ("artists", ["arty", "farty"]),
+ ("tags", ["alien:medium", "human:tiny"]),
+ ("artists", ["arty", "arty"]),
+ ("tags", ["foo:good", "bar:good"]),
+ ("tags", ["foo:good", "foo:bad"]),
+ ("artists", []),
+ ],
+ ids=[
+ "artists",
+ "tags",
+ "artists (duplicate)",
+ "tags (duplicate)",
+ "namespace (duplicate)",
+ "artists (empty)",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_creates(upsert_comics, empty_comic, key, list):
+ original_comic = await DB.add(empty_comic)
+
+ input = {
+ key: {"names": list, "options": {"onMissing": "CREATE"}},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert comic is not None
+
+ assert set(list) == set([o.name for o in getattr(comic, key)])
+
+
+@pytest.mark.anyio
+async def test_upsert_comic_fails_creating_empty_assoc_name(upsert_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ input = {
+ "artists": {"names": ""},
+ }
+ response = Response(await upsert_comics(comic.id, input))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "Artist.name"
+
+
+@pytest.mark.anyio
+async def test_upsert_comic_does_not_replace(upsert_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ original_artists = set([a.name for a in original_comic.artists])
+
+ input = {
+ "artists": {"names": []},
+ }
+ response = Response(await upsert_comics(original_comic.id, input))
+ response.assert_is("UpsertSuccess")
+
+ comic = await DB.get(Comic, original_comic.id)
+ artists = set([a.name for a in comic.artists])
+
+ assert artists == original_artists
+
+
+@pytest.mark.parametrize(
+ "input",
+ [
+ "",
+ ":tiny",
+ "human:",
+ "medium",
+ ],
+ ids=[
+ "empty",
+ "namespace missing",
+ "tag missing",
+ "wrong format",
+ ],
+)
+@pytest.mark.anyio
+async def test_upsert_comic_fails_creating_invalid_tag(upsert_comics, gen_comic, input):
+ comic = await DB.add(next(gen_comic))
+
+ input = {
+ "tags": {"names": [input]},
+ }
+ response = Response(await upsert_comics(comic.id, input))
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ msg = "Invalid parameter 'name': ComicTag name must be specified as <namespace>:<tag>" # noqa: E501
+ assert response.message == msg
+
+
+@pytest.mark.parametrize(
+ "options",
+ [
+ None,
+ {},
+ {"mode": "REPLACE"},
+ ],
+ ids=[
+ "by default (none)",
+ "by default (empty record)",
+ "when defined explicitly",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_replaces_assocs(update_comics, gen_comic, options):
+ original_comic = await DB.add(next(gen_comic))
+ new_artist = await DB.add(Artist(name="max"))
+
+ input = {
+ "artists": {"ids": [new_artist.id]},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, [new_artist])
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_assocs(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ new_artist = await DB.add(Artist(name="max"))
+ added_artists = original_comic.artists + [new_artist]
+
+ input = {
+ "artists": {"ids": [new_artist.id], "options": {"mode": "ADD"}},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, added_artists)
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_existing_assocs(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ artists = original_comic.artists
+
+ input = {
+ "artists": {
+ "ids": [artist.id for artist in artists],
+ "options": {"mode": "ADD"},
+ },
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, artists)
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_tags(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ new_namespace = await DB.add(Namespace(name="new"))
+ new_tag = await DB.add(Tag(name="new"))
+ added_tags = original_comic.tags + [
+ ComicTag(comic_id=original_comic.id, tag=new_tag, namespace=new_namespace)
+ ]
+
+ input = {
+ "tags": {
+ "ids": [f"{new_namespace.id}:{new_tag.id}"],
+ "options": {"mode": "ADD"},
+ },
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert_assocs_match(comic.tags, added_tags)
+
+
+@pytest.mark.anyio
+async def test_update_comic_adds_existing_tags(update_comics, gen_comic):
+ original_comic = await DB.add(next(gen_comic))
+ tags = original_comic.tags
+
+ input = {
+ "tags": {
+ "ids": [f"{tag.namespace.id}:{tag.tag.id}" for tag in tags],
+ "options": {"mode": "ADD"},
+ },
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+ assert_assocs_match(comic.tags, tags)
+
+
+@pytest.mark.anyio
+async def test_update_comic_removes_assocs(update_comics, empty_comic):
+ original_comic = empty_comic
+ removed_artist = Artist(id=1, name="sam")
+ remaining_artist = Artist(id=2, name="max")
+ original_comic.artists = [removed_artist, remaining_artist]
+ original_comic = await DB.add(original_comic)
+
+ input = {
+ "artists": {"ids": [removed_artist.id], "options": {"mode": "REMOVE"}},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.artists, [remaining_artist])
+
+
+@pytest.mark.anyio
+async def test_update_comic_removes_tags(update_comics, empty_comic):
+ original_comic = empty_comic
+ removed_tag = ComicTag(
+ comic_id=original_comic.id,
+ tag=Tag(id=1, name="gone"),
+ namespace=Namespace(id=1, name="all"),
+ )
+ remaining_tag = ComicTag(
+ comic_id=original_comic.id,
+ tag=Tag(id=2, name="there"),
+ namespace=Namespace(id=2, name="still"),
+ )
+ original_comic.tags = [removed_tag, remaining_tag]
+ original_comic = await DB.add(original_comic)
+
+ input = {
+ "tags": {"ids": ["1:1"], "options": {"mode": "REMOVE"}},
+ }
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id, full=True)
+
+ assert_assocs_match(comic.tags, [remaining_tag])
+
+
+@pytest.mark.parametrize(
+ "rows,input",
+ [
+ ([], {"title": "Updated Comic"}),
+ (
+ [Artist(id=1, name="artist")],
+ {"artists": {"ids": [1]}},
+ ),
+ (
+ [Artist(id=1, name="artist"), ComicArtist(artist_id=1, comic_id=100)],
+ {"title": "Updated Comic", "artists": {"ids": [1]}},
+ ),
+ (
+ [
+ Namespace(id=1, name="ns"),
+ Tag(id=1, name="artist"),
+ ],
+ {"tags": {"ids": ["1:1"]}},
+ ),
+ (
+ [
+ Namespace(id=1, name="ns"),
+ Tag(id=1, name="artist"),
+ ComicTag(namespace_id=1, tag_id=1, comic_id=100),
+ ],
+ {"title": "Updated Comic", "tags": {"ids": ["1:1"]}},
+ ),
+ ],
+ ids=[
+ "with scalar",
+ "with assoc",
+ "with scalar and existing assoc",
+ "with tag",
+ "with scalar and existing tag",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_changes_updated_at(update_comics, empty_comic, rows, input):
+ original_comic = empty_comic
+ original_comic.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_comic = await DB.add(original_comic)
+
+ await DB.add_all(*rows)
+
+ response = Response(await update_comics(original_comic.id, input))
+ response.assert_is("UpdateSuccess")
+
+ comic = await DB.get(Comic, original_comic.id)
+ assert comic.updated_at > original_comic.updated_at
+
+
+@pytest.mark.anyio
+async def test_update_comic_cover_fails_page_not_found(update_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await update_comics(comic.id, {"cover": {"id": 100}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Page ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_comic_cover_fails_page_remote(
+ update_comics, gen_comic, gen_archive
+):
+ comic = await DB.add(next(gen_comic))
+ other_archive = await DB.add(next(gen_archive))
+ remote_id = other_archive.pages[0].id
+
+ response = Response(await update_comics(comic.id, {"cover": {"id": remote_id}}))
+ response.assert_is("PageRemoteError")
+ assert response.id == remote_id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_page_not_found(update_comics, gen_comic):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await update_comics(comic.id, {"pages": {"ids": 100}}))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 100
+ assert response.message == "Page ID not found: '100'"
+
+
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_page_remote(
+ update_comics, gen_comic, gen_archive
+):
+ comic = await DB.add(next(gen_comic))
+ other_archive = await DB.add(next(gen_archive))
+ remote_id = other_archive.pages[0].id
+
+ response = Response(await update_comics(comic.id, {"pages": {"ids": [remote_id]}}))
+ response.assert_is("PageRemoteError")
+ assert response.id == remote_id
+ assert response.archiveId == other_archive.id
+ assert (
+ response.message
+ == f"Page ID {remote_id} comes from remote archive ID {other_archive.id}"
+ )
+
+
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_page_claimed(update_comics, gen_archive):
+ archive = await DB.add(next(gen_archive))
+
+ comic = await DB.add(
+ Comic(
+ id=1,
+ title="A Very Good Comic",
+ archive=archive,
+ cover=archive.pages[0].image,
+ pages=[archive.pages[0], archive.pages[1]],
+ )
+ )
+
+ claiming = await DB.add(
+ Comic(
+ id=2,
+ title="A Very Claiming Comic",
+ archive=archive,
+ cover=archive.pages[2].image,
+ pages=[archive.pages[2], archive.pages[3]],
+ )
+ )
+
+ claimed_id = claiming.pages[0].id
+
+ response = Response(await update_comics(comic.id, {"pages": {"ids": [claimed_id]}}))
+ response.assert_is("PageClaimedError")
+ assert response.id == claimed_id
+ assert response.comicId == claiming.id
+ assert (
+ response.message
+ == f"Page ID {claimed_id} is already claimed by comic ID {claiming.id}"
+ )
+
+
+@pytest.mark.parametrize(
+ "mode",
+ [
+ ("REPLACE"),
+ ("REMOVE"),
+ ],
+)
+@pytest.mark.anyio
+async def test_update_comic_pages_fails_empty(update_comics, gen_comic, mode):
+ comic = await DB.add(next(gen_comic))
+
+ ids = [] if mode == "REPLACE" else [p.id for p in comic.pages]
+
+ response = Response(
+ await update_comics(
+ comic.id, {"pages": {"ids": ids, "options": {"mode": mode}}}
+ )
+ )
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "pages"
+ assert response.message == "Invalid parameter 'pages': cannot be empty"
diff --git a/tests/api/test_comic_tag.py b/tests/api/test_comic_tag.py
new file mode 100644
index 0000000..f536b79
--- /dev/null
+++ b/tests/api/test_comic_tag.py
@@ -0,0 +1,134 @@
+from functools import partial
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Namespace, Tag
+
+
+@pytest.fixture
+def query_comic_tags(schema_execute):
+ query = """
+ query comicTags($forFilter: Boolean) {
+ comicTags(forFilter: $forFilter) {
+ edges {
+ __typename
+ id
+ name
+ }
+ count
+ }
+ }
+ """
+
+ def wrapper(q):
+ async def _execute(for_filter=False):
+ return await schema_execute(q, {"forFilter": for_filter})
+
+ return _execute
+
+ return wrapper(query)
+
+
+def build_item(namespace, tag):
+ nid, tid = "", ""
+ nname, tname = "", ""
+
+ if namespace:
+ nid, nname = namespace.id, namespace.name
+
+ if tag:
+ tid, tname = tag.id, tag.name
+
+ item = {
+ "__typename": "ComicTag",
+ "id": f"{nid}:{tid}",
+ "name": f"{nname}:{tname}",
+ }
+
+ return item
+
+
+@pytest.mark.anyio
+async def test_query_comic_tags_cross(query_comic_tags):
+ ns_foo = Namespace(id=1, name="foo")
+ ns_bar = Namespace(id=2, name="bar")
+ tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_foo, ns_bar])
+ tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo, ns_bar])
+
+ await DB.add_all(ns_foo, ns_bar)
+ await DB.add_all(tag_qoo, tag_qar)
+
+ builder = partial(build_item)
+
+ response = Response(await query_comic_tags())
+ assert response.data["edges"] == [
+ builder(ns_bar, tag_qar),
+ builder(ns_bar, tag_qoo),
+ builder(ns_foo, tag_qar),
+ builder(ns_foo, tag_qoo),
+ ]
+
+
+@pytest.mark.anyio
+async def test_query_comic_tags_restricted_namespace(query_comic_tags):
+ ns_foo = Namespace(id=1, name="foo")
+ ns_bar = Namespace(id=2, name="bar")
+ tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_bar])
+ tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo])
+
+ await DB.add_all(ns_foo, ns_bar)
+ await DB.add_all(tag_qoo, tag_qar)
+
+ builder = partial(build_item)
+
+ response = Response(await query_comic_tags())
+ assert response.data["edges"] == [
+ builder(ns_bar, tag_qoo),
+ builder(ns_foo, tag_qar),
+ ]
+
+
+@pytest.mark.anyio
+async def test_query_comic_tag_matchers_cross(query_comic_tags):
+ ns_foo = Namespace(id=1, name="foo")
+ ns_bar = Namespace(id=2, name="bar")
+ tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_foo, ns_bar])
+ tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo, ns_bar])
+
+ await DB.add_all(ns_foo, ns_bar, tag_qoo, tag_qar)
+
+ builder = partial(build_item)
+
+ response = Response(await query_comic_tags(for_filter=True))
+ assert response.data["edges"] == [
+ builder(ns_bar, None),
+ builder(ns_foo, None),
+ builder(None, tag_qar),
+ builder(None, tag_qoo),
+ builder(ns_bar, tag_qar),
+ builder(ns_bar, tag_qoo),
+ builder(ns_foo, tag_qar),
+ builder(ns_foo, tag_qoo),
+ ]
+
+
+@pytest.mark.anyio
+async def test_query_comic_tag_matchers_restricted_namespace(query_comic_tags):
+ ns_foo = Namespace(id=1, name="foo")
+ ns_bar = Namespace(id=2, name="bar")
+ tag_qoo = Tag(id=1, name="qoo", namespaces=[ns_bar])
+ tag_qar = Tag(id=2, name="qar", namespaces=[ns_foo])
+
+ await DB.add_all(ns_foo, ns_bar, tag_qoo, tag_qar)
+
+ builder = partial(build_item)
+
+ response = Response(await query_comic_tags(for_filter=True))
+ assert response.data["edges"] == [
+ builder(ns_bar, None),
+ builder(ns_foo, None),
+ builder(None, tag_qar),
+ builder(None, tag_qoo),
+ builder(ns_bar, tag_qoo),
+ builder(ns_foo, tag_qar),
+ ]
diff --git a/tests/api/test_db.py b/tests/api/test_db.py
new file mode 100644
index 0000000..f53b90f
--- /dev/null
+++ b/tests/api/test_db.py
@@ -0,0 +1,324 @@
+from datetime import datetime, timedelta, timezone
+
+import hircine.db as database
+import hircine.db.models as models
+import hircine.db.ops as ops
+import pytest
+from conftest import DB
+from hircine.db.models import (
+ Artist,
+ Base,
+ Comic,
+ ComicTag,
+ DateTimeUTC,
+ MixinID,
+ Namespace,
+ Tag,
+ TagNamespaces,
+)
+from sqlalchemy.exc import StatementError
+from sqlalchemy.orm import (
+ Mapped,
+ mapped_column,
+)
+
+
+class Date(MixinID, Base):
+ date: Mapped[datetime] = mapped_column(DateTimeUTC)
+
+
+@pytest.mark.anyio
+async def test_db_requires_tzinfo():
+ with pytest.raises(StatementError, match="tzinfo is required"):
+ await DB.add(Date(date=datetime(2019, 4, 22)))
+
+
+@pytest.mark.anyio
+async def test_db_converts_date_input_to_utc():
+ date = datetime(2019, 4, 22, tzinfo=timezone(timedelta(hours=-4)))
+ await DB.add(Date(date=date))
+
+ item = await DB.get(Date, 1)
+
+ assert item.date.tzinfo == timezone.utc
+ assert item.date == date
+
+
+@pytest.mark.parametrize(
+ "modelcls,assoccls",
+ [
+ (models.Artist, models.ComicArtist),
+ (models.Circle, models.ComicCircle),
+ (models.Character, models.ComicCharacter),
+ (models.World, models.ComicWorld),
+ ],
+ ids=["artists", "circles", "characters", "worlds"],
+)
+@pytest.mark.anyio
+async def test_models_retained_when_clearing_association(
+ empty_comic, modelcls, assoccls
+):
+ model = modelcls(id=1, name="foo")
+ key = f"{modelcls.__name__.lower()}s"
+
+ comic = empty_comic
+ setattr(comic, key, [model])
+ comic = await DB.add(comic)
+
+ async with database.session() as s:
+ object = await s.get(Comic, comic.id)
+ setattr(object, key, [])
+ await s.commit()
+
+ assert await DB.get(assoccls, (comic.id, model.id)) is None
+ assert await DB.get(Comic, comic.id) is not None
+ assert await DB.get(modelcls, model.id) is not None
+
+
+@pytest.mark.anyio
+async def test_models_retained_when_clearing_comictag(empty_comic):
+ comic = await DB.add(empty_comic)
+
+ namespace = Namespace(id=1, name="foo")
+ tag = Tag(id=1, name="bar")
+ ct = ComicTag(comic_id=comic.id, namespace=namespace, tag=tag)
+
+ await DB.add(ct)
+
+ async with database.session() as s:
+ object = await s.get(Comic, comic.id)
+ object.tags = []
+ await s.commit()
+
+ assert await DB.get(ComicTag, (comic.id, ct.namespace_id, ct.tag_id)) is None
+ assert await DB.get(Namespace, namespace.id) is not None
+ assert await DB.get(Tag, tag.id) is not None
+ assert await DB.get(Comic, comic.id) is not None
+
+
+@pytest.mark.parametrize(
+ "modelcls,assoccls",
+ [
+ (models.Artist, models.ComicArtist),
+ (models.Circle, models.ComicCircle),
+ (models.Character, models.ComicCharacter),
+ (models.World, models.ComicWorld),
+ ],
+ ids=["artists", "circles", "characters", "worlds"],
+)
+@pytest.mark.anyio
+async def test_only_association_cleared_when_deleting(empty_comic, modelcls, assoccls):
+ model = modelcls(id=1, name="foo")
+
+ comic = empty_comic
+ setattr(comic, f"{modelcls.__name__.lower()}s", [model])
+ comic = await DB.add(comic)
+
+ await DB.delete(modelcls, model.id)
+ assert await DB.get(assoccls, (comic.id, model.id)) is None
+ assert await DB.get(Comic, comic.id) is not None
+
+
+@pytest.mark.parametrize(
+ "deleted",
+ [
+ "namespace",
+ "tag",
+ ],
+)
+@pytest.mark.anyio
+async def test_only_comictag_association_cleared_when_deleting(empty_comic, deleted):
+ comic = await DB.add(empty_comic)
+
+ namespace = Namespace(id=1, name="foo")
+ tag = Tag(id=1, name="bar")
+
+ await DB.add(ComicTag(comic_id=comic.id, namespace=namespace, tag=tag))
+
+ if deleted == "namespace":
+ await DB.delete(Namespace, namespace.id)
+ elif deleted == "tag":
+ await DB.delete(Tag, tag.id)
+
+ assert await DB.get(ComicTag, (comic.id, namespace.id, tag.id)) is None
+ if deleted == "namespace":
+ assert await DB.get(Tag, tag.id) is not None
+ elif deleted == "tag":
+ assert await DB.get(Namespace, namespace.id) is not None
+ assert await DB.get(Comic, comic.id) is not None
+
+
+@pytest.mark.parametrize(
+ "modelcls,assoccls",
+ [
+ (models.Artist, models.ComicArtist),
+ (models.Circle, models.ComicCircle),
+ (models.Character, models.ComicCharacter),
+ (models.World, models.ComicWorld),
+ ],
+ ids=["artists", "circles", "characters", "worlds"],
+)
+@pytest.mark.anyio
+async def test_deleting_comic_only_clears_association(empty_comic, modelcls, assoccls):
+ model = modelcls(id=1, name="foo")
+
+ comic = empty_comic
+ setattr(comic, f"{modelcls.__name__.lower()}s", [model])
+ comic = await DB.add(comic)
+
+ await DB.delete(Comic, comic.id)
+ assert await DB.get(assoccls, (comic.id, model.id)) is None
+ assert await DB.get(modelcls, model.id) is not None
+
+
+@pytest.mark.anyio
+async def test_deleting_comic_only_clears_comictag(empty_comic):
+ comic = await DB.add(empty_comic)
+
+ namespace = Namespace(id=1, name="foo")
+ tag = Tag(id=1, name="bar")
+
+ await DB.add(ComicTag(comic_id=comic.id, namespace=namespace, tag=tag))
+ await DB.delete(Comic, comic.id)
+
+ assert await DB.get(ComicTag, (comic.id, namespace.id, tag.id)) is None
+ assert await DB.get(Tag, tag.id) is not None
+ assert await DB.get(Namespace, namespace.id) is not None
+
+
+@pytest.mark.anyio
+async def test_models_retained_when_clearing_tagnamespace():
+ namespace = Namespace(id=1, name="foo")
+ tag = Tag(id=1, name="foo", namespaces=[namespace])
+
+ tag = await DB.add(tag)
+
+ async with database.session() as s:
+ db_tag = await s.get(Tag, tag.id, options=Tag.load_full())
+ db_tag.namespaces = []
+ await s.commit()
+
+ assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None
+ assert await DB.get(Namespace, namespace.id) is not None
+ assert await DB.get(Tag, tag.id) is not None
+
+
+@pytest.mark.anyio
+async def test_only_tagnamespace_cleared_when_deleting_tag():
+ namespace = Namespace(id=1, name="foo")
+ tag = Tag(id=1, name="foo", namespaces=[namespace])
+
+ tag = await DB.add(tag)
+
+ await DB.delete(Tag, tag.id)
+
+ assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None
+ assert await DB.get(Namespace, namespace.id) is not None
+ assert await DB.get(Tag, tag.id) is None
+
+
+@pytest.mark.anyio
+async def test_only_tagnamespace_cleared_when_deleting_namespace():
+ namespace = Namespace(id=1, name="foo")
+ tag = Tag(id=1, name="foo", namespaces=[namespace])
+
+ tag = await DB.add(tag)
+
+ await DB.delete(Namespace, namespace.id)
+
+ assert await DB.get(TagNamespaces, (namespace.id, tag.id)) is None
+ assert await DB.get(Namespace, namespace.id) is None
+ assert await DB.get(Tag, tag.id) is not None
+
+
+@pytest.mark.parametrize(
+ "use_identity_map",
+ [False, True],
+ ids=["without identity lookup", "with identity lookup"],
+)
+@pytest.mark.anyio
+async def test_ops_get_all(gen_artist, use_identity_map):
+ artist = await DB.add(next(gen_artist))
+ have = list(await DB.add_all(*gen_artist))
+ have.append(artist)
+
+ missing_ids = [10, 20]
+
+ async with database.session() as s:
+ if use_identity_map:
+ s.add(artist)
+
+ artists, missing = await ops.get_all(
+ s,
+ Artist,
+ [a.id for a in have] + missing_ids,
+ use_identity_map=use_identity_map,
+ )
+
+ assert set([a.id for a in artists]) == set([a.id for a in have])
+ assert missing == set(missing_ids)
+
+
+@pytest.mark.anyio
+async def test_ops_get_all_names(gen_artist):
+ have = await DB.add_all(*gen_artist)
+ missing_names = ["arty", "farty"]
+
+ async with database.session() as s:
+ artists, missing = await ops.get_all_names(
+ s, Artist, [a.name for a in have] + missing_names
+ )
+
+ assert set([a.name for a in artists]) == set([a.name for a in have])
+ assert missing == set(missing_names)
+
+
+@pytest.mark.parametrize(
+ "missing",
+ [[("foo", "bar"), ("qux", "qaz")], []],
+ ids=["missing", "no missing"],
+)
+@pytest.mark.anyio
+async def test_ops_get_ctag_names(gen_comic, gen_tag, gen_namespace, missing):
+ comic = await DB.add(next(gen_comic))
+ have = [(ct.namespace.name, ct.tag.name) for ct in comic.tags]
+
+ async with database.session() as s:
+ cts, missing = await ops.get_ctag_names(s, comic.id, have + missing)
+
+ assert set(have) == set([(ct.namespace.name, ct.tag.name) for ct in cts])
+ assert missing == set(missing)
+
+
+@pytest.mark.anyio
+async def test_ops_lookup_identity(gen_artist):
+ one = await DB.add(next(gen_artist))
+ two = await DB.add(next(gen_artist))
+ rest = await DB.add_all(*gen_artist)
+
+ async with database.session() as s:
+ get_one = await s.get(Artist, one.id)
+ get_two = await s.get(Artist, two.id)
+ s.add(get_one, get_two)
+
+ artists, satisfied = ops.lookup_identity(
+ s, Artist, [a.id for a in [one, two] + list(rest)]
+ )
+
+ assert set([a.name for a in artists]) == set([a.name for a in [one, two]])
+ assert satisfied == set([one.id, two.id])
+
+
+@pytest.mark.anyio
+async def test_ops_get_image_orphans(gen_archive, gen_image):
+ await DB.add(next(gen_archive))
+
+ orphan_one = await DB.add(next(gen_image))
+ orphan_two = await DB.add(next(gen_image))
+
+ async with database.session() as s:
+ orphans = set(await ops.get_image_orphans(s))
+
+ assert orphans == set(
+ [(orphan_one.id, orphan_one.hash), (orphan_two.id, orphan_two.hash)]
+ )
diff --git a/tests/api/test_filter.py b/tests/api/test_filter.py
new file mode 100644
index 0000000..67a953f
--- /dev/null
+++ b/tests/api/test_filter.py
@@ -0,0 +1,521 @@
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Namespace, Tag
+
+
+@pytest.fixture
+def query_comic_filter(execute_filter):
+ query = """
+ query comics($filter: ComicFilterInput) {
+ comics(filter: $filter) {
+ __typename
+ count
+ edges {
+ id
+ title
+ }
+ }
+ }
+ """
+
+ return execute_filter(query)
+
+
+@pytest.fixture
+def query_string_filter(execute_filter):
+ query = """
+ query artists($filter: ArtistFilterInput) {
+ artists(filter: $filter) {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute_filter(query)
+
+
+@pytest.fixture
+def query_tag_filter(execute_filter):
+ query = """
+ query tags($filter: TagFilterInput) {
+ tags(filter: $filter) {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute_filter(query)
+
+
+def id_list(edges):
+ return sorted([int(edge["id"]) for edge in edges])
+
+
+@pytest.mark.parametrize(
+ "filter,ids",
+ [
+ (
+ {"include": {"name": {"contains": "robin"}}},
+ [3, 4],
+ ),
+ ({"exclude": {"name": {"contains": "smith"}}}, [2, 3]),
+ (
+ {
+ "exclude": {"name": {"contains": "robin"}},
+ "include": {"name": {"contains": "smith"}},
+ },
+ [1],
+ ),
+ ],
+ ids=[
+ "includes",
+ "excludes",
+ "includes and excludes",
+ ],
+)
+@pytest.mark.anyio
+async def test_string_filter(query_string_filter, gen_artist, filter, ids):
+ await DB.add_all(*gen_artist)
+
+ response = Response(await query_string_filter(filter))
+ response.assert_is("ArtistFilterResult")
+
+ assert id_list(response.edges) == ids
+
+
+@pytest.mark.parametrize(
+ "filter,empty_response",
+ [
+ ({"include": {"name": {"contains": ""}}}, False),
+ ({"include": {"name": {}}}, False),
+ ({"exclude": {"name": {"contains": ""}}}, True),
+ ({"exclude": {"name": {}}}, False),
+ ],
+ ids=[
+ "string (include)",
+ "field (include)",
+ "string (exclude)",
+ "field (exclude)",
+ ],
+)
+@pytest.mark.anyio
+async def test_string_filter_handles_empty(
+ query_string_filter, gen_artist, filter, empty_response
+):
+ artists = await DB.add_all(*gen_artist)
+
+ response = Response(await query_string_filter(filter))
+ response.assert_is("ArtistFilterResult")
+
+ if empty_response:
+ assert response.edges == []
+ assert response.count == 0
+ else:
+ assert len(response.edges) == len(artists)
+ assert response.count == len(artists)
+
+
+@pytest.mark.parametrize(
+ "filter",
+ [
+ {"include": {}},
+ {"exclude": {}},
+ ],
+ ids=[
+ "include",
+ "exclude",
+ ],
+)
+@pytest.mark.anyio
+async def test_filter_handles_empty_field(query_string_filter, gen_artist, filter):
+ artists = await DB.add_all(*gen_artist)
+
+ response = Response(await query_string_filter(filter))
+ response.assert_is("ArtistFilterResult")
+
+ assert len(response.edges) == len(artists)
+ assert response.count == len(artists)
+
+
+@pytest.mark.parametrize(
+ "filter,ids",
+ [
+ (
+ {"include": {"artists": {"any": 1}}},
+ [1, 3],
+ ),
+ (
+ {"include": {"artists": {"all": 1}}},
+ [1, 3],
+ ),
+ (
+ {"include": {"artists": {"any": [1, 4]}}},
+ [1, 3, 4],
+ ),
+ (
+ {"include": {"artists": {"all": [1, 4]}}},
+ [3],
+ ),
+ (
+ {"exclude": {"artists": {"any": 1}}},
+ [2, 4],
+ ),
+ (
+ {"exclude": {"artists": {"all": 1}}},
+ [2, 4],
+ ),
+ (
+ {"exclude": {"artists": {"any": [1, 4]}}},
+ [2],
+ ),
+ (
+ {"exclude": {"artists": {"all": [1, 4]}}},
+ [1, 2, 4],
+ ),
+ (
+ {
+ "include": {"artists": {"any": [1]}},
+ "exclude": {"artists": {"all": [4]}},
+ },
+ [1],
+ ),
+ (
+ {
+ "include": {"artists": {"any": [1, 4]}},
+ "exclude": {"artists": {"all": [1, 4]}},
+ },
+ [1, 4],
+ ),
+ (
+ {
+ "include": {"artists": {"any": [1, 4], "all": [1, 2]}},
+ },
+ [1],
+ ),
+ ],
+ ids=[
+ "includes any (single)",
+ "includes all (single)",
+ "includes any (list)",
+ "includes all (list)",
+ "excludes any (single)",
+ "excludes all (single)",
+ "excludes any (list)",
+ "excludes all (list)",
+ "includes and excludes (single)",
+ "includes and excludes (list)",
+ "includes any and all",
+ ],
+)
+@pytest.mark.anyio
+async def test_assoc_filter(query_comic_filter, gen_comic, filter, ids):
+ await DB.add_all(*gen_comic)
+
+ response = Response(await query_comic_filter(filter))
+ response.assert_is("ComicFilterResult")
+
+ assert id_list(response.edges) == ids
+
+
+@pytest.mark.parametrize(
+ "filter,empty_response",
+ [
+ ({"include": {"artists": {"any": []}}}, True),
+ ({"include": {"artists": {"all": []}}}, True),
+ ({"include": {"artists": {}}}, False),
+ ({"exclude": {"artists": {"any": []}}}, False),
+ ({"exclude": {"artists": {"all": []}}}, False),
+ ({"exclude": {"artists": {}}}, False),
+ ({"include": {"tags": {"any": ""}}}, True),
+ ({"include": {"tags": {"any": ":"}}}, True),
+ ],
+ ids=[
+ "list (include any)",
+ "list (include all)",
+ "field (include)",
+ "list (exclude any)",
+ "list (exclude all)",
+ "field (exclude)",
+ "string (tags)",
+ "specifier (tags)",
+ ],
+)
+@pytest.mark.anyio
+async def test_assoc_filter_handles_empty(
+ query_comic_filter, gen_comic, filter, empty_response
+):
+ comics = await DB.add_all(*gen_comic)
+
+ response = Response(await query_comic_filter(filter))
+
+ response.assert_is("ComicFilterResult")
+
+ if empty_response:
+ assert response.edges == []
+ assert response.count == 0
+ else:
+ assert len(response.edges) == len(comics)
+ assert response.count == len(comics)
+
+
+@pytest.mark.parametrize(
+ "filter,ids",
+ [
+ (
+ {"include": {"tags": {"any": "1:"}}},
+ [1, 2, 3],
+ ),
+ (
+ {"include": {"tags": {"any": ":2"}}},
+ [1, 4],
+ ),
+ (
+ {"include": {"tags": {"exact": ["1:3", "2:1"]}}},
+ [2],
+ ),
+ (
+ {"include": {"tags": {"exact": ["1:"]}}},
+ [3],
+ ),
+ (
+ {"include": {"tags": {"exact": [":4"]}}},
+ [3],
+ ),
+ (
+ {"exclude": {"tags": {"all": ["1:4", "1:1"]}}},
+ [2, 3, 4],
+ ),
+ (
+ {"exclude": {"tags": {"exact": ["2:1", "2:2", "2:3"]}}},
+ [1, 2, 3],
+ ),
+ (
+ {"exclude": {"tags": {"exact": ["1:"]}}},
+ [1, 2, 4],
+ ),
+ (
+ {"exclude": {"tags": {"exact": [":4"]}}},
+ [1, 2, 4],
+ ),
+ ],
+ ids=[
+ "includes any namespace",
+ "includes any tag",
+ "includes exact tags",
+ "includes exact namespace",
+ "includes exact tag",
+ "excludes all tags",
+ "includes exact tags",
+ "includes exact namespace",
+ "includes exact tag",
+ ],
+)
+@pytest.mark.anyio
+async def test_assoc_tag_filter(query_comic_filter, gen_comic, filter, ids):
+ await DB.add_all(*gen_comic)
+
+ response = Response(await query_comic_filter(filter))
+ response.assert_is("ComicFilterResult")
+
+ assert id_list(response.edges) == ids
+
+
+@pytest.mark.parametrize(
+ "filter,ids",
+ [
+ (
+ {"include": {"favourite": True}},
+ [1],
+ ),
+ (
+ {"include": {"rating": {"any": "EXPLICIT"}}},
+ [3],
+ ),
+ (
+ {"include": {"category": {"any": "MANGA"}}},
+ [1, 2],
+ ),
+ (
+ {"include": {"censorship": {"any": "MOSAIC"}}},
+ [3],
+ ),
+ (
+ {"exclude": {"favourite": True}},
+ [2, 3, 4],
+ ),
+ (
+ {"exclude": {"rating": {"any": ["EXPLICIT", "QUESTIONABLE"]}}},
+ [1, 4],
+ ),
+ ],
+ ids=[
+ "includes favourite",
+ "includes rating",
+ "includes category",
+ "includes censorship",
+ "excludes favourite",
+ "excludes ratings",
+ ],
+)
+@pytest.mark.anyio
+async def test_field_filter(query_comic_filter, gen_comic, filter, ids):
+ await DB.add_all(*gen_comic)
+
+ response = Response(await query_comic_filter(filter))
+ response.assert_is("ComicFilterResult")
+
+ assert id_list(response.edges) == ids
+
+
+@pytest.mark.parametrize(
+ "filter,ids",
+ [
+ (
+ {"include": {"rating": {"empty": True}}},
+ [100],
+ ),
+ (
+ {"include": {"rating": {"empty": False}}},
+ [1, 2],
+ ),
+ (
+ {"exclude": {"rating": {"empty": True}}},
+ [1, 2],
+ ),
+ (
+ {"exclude": {"rating": {"empty": False}}},
+ [100],
+ ),
+ ],
+ ids=[
+ "includes rating empty",
+ "includes rating not empty",
+ "excludes rating empty",
+ "excludes rating not empty",
+ ],
+)
+@pytest.mark.anyio
+async def test_field_presence(query_comic_filter, gen_comic, empty_comic, filter, ids):
+ await DB.add(next(gen_comic))
+ await DB.add(next(gen_comic))
+ await DB.add(empty_comic)
+
+ response = Response(await query_comic_filter(filter))
+ response.assert_is("ComicFilterResult")
+
+ assert id_list(response.edges) == ids
+
+
+@pytest.mark.parametrize(
+ "filter,ids",
+ [
+ (
+ {"include": {"artists": {"empty": True}}},
+ [100],
+ ),
+ (
+ {"include": {"artists": {"empty": False}}},
+ [1, 2],
+ ),
+ (
+ {"exclude": {"artists": {"empty": True}}},
+ [1, 2],
+ ),
+ (
+ {"exclude": {"artists": {"empty": False}}},
+ [100],
+ ),
+ (
+ {"include": {"tags": {"empty": True}}},
+ [100],
+ ),
+ (
+ {"include": {"tags": {"empty": False}}},
+ [1, 2],
+ ),
+ (
+ {"exclude": {"tags": {"empty": True}}},
+ [1, 2],
+ ),
+ (
+ {"exclude": {"tags": {"empty": False}}},
+ [100],
+ ),
+ ],
+ ids=[
+ "includes artist empty",
+ "includes artist not empty",
+ "excludes artist empty",
+ "excludes artist not empty",
+ "includes tags empty",
+ "includes tags not empty",
+ "excludes tags empty",
+ "excludes tags not empty",
+ ],
+)
+@pytest.mark.anyio
+async def test_assoc_presence(query_comic_filter, gen_comic, empty_comic, filter, ids):
+ await DB.add(next(gen_comic))
+ await DB.add(next(gen_comic))
+ await DB.add(empty_comic)
+
+ response = Response(await query_comic_filter(filter))
+ response.assert_is("ComicFilterResult")
+
+ assert id_list(response.edges) == ids
+
+
+@pytest.mark.parametrize(
+ "filter,ids",
+ [
+ (
+ {"include": {"namespaces": {"any": 1}}},
+ [1, 2],
+ ),
+ (
+ {"include": {"namespaces": {"all": [1, 2]}}},
+ [2],
+ ),
+ (
+ {"include": {"namespaces": {"exact": [1]}}},
+ [1],
+ ),
+ (
+ {"exclude": {"namespaces": {"any": 2}}},
+ [1],
+ ),
+ (
+ {"exclude": {"namespaces": {"exact": [1]}}},
+ [2],
+ ),
+ ],
+ ids=[
+ "includes any namespace",
+ "includes all namespace",
+ "includes exact namespaces",
+ "excludes any namespace",
+ "excludes exact namespaces",
+ ],
+)
+@pytest.mark.anyio
+async def test_tag_assoc_filter(query_tag_filter, gen_namespace, gen_tag, filter, ids):
+ foo = await DB.add(Namespace(id=1, name="foo"))
+ bar = await DB.add(Namespace(id=2, name="bar"))
+
+ await DB.add(Tag(id=1, name="small", namespaces=[foo]))
+ await DB.add(Tag(id=2, name="large", namespaces=[foo, bar]))
+
+ response = Response(await query_tag_filter(filter))
+ response.assert_is("TagFilterResult")
+
+ assert id_list(response.edges) == ids
diff --git a/tests/api/test_image.py b/tests/api/test_image.py
new file mode 100644
index 0000000..c8c26b3
--- /dev/null
+++ b/tests/api/test_image.py
@@ -0,0 +1,16 @@
+import pytest
+from conftest import DB
+from hircine.api.types import Image
+
+
+@pytest.mark.anyio
+async def test_image(gen_image):
+ images = await DB.add_all(*gen_image)
+
+ for db_image in images:
+ image = Image(db_image)
+ assert image.id == db_image.id
+ assert image.hash == db_image.hash
+ assert image.width == db_image.width
+ assert image.height == db_image.height
+ assert image.aspect_ratio == db_image.width / db_image.height
diff --git a/tests/api/test_namespace.py b/tests/api/test_namespace.py
new file mode 100644
index 0000000..450075b
--- /dev/null
+++ b/tests/api/test_namespace.py
@@ -0,0 +1,291 @@
+from datetime import datetime as dt
+from datetime import timezone
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Namespace
+
+
+@pytest.fixture
+def query_namespace(execute_id):
+ query = """
+ query namespace($id: Int!) {
+ namespace(id: $id) {
+ __typename
+ ... on Namespace {
+ id
+ name
+ sortName
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_namespaces(execute):
+ query = """
+ query namespaces {
+ namespaces {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def add_namespace(execute_add):
+ mutation = """
+ mutation addNamespace($input: AddNamespaceInput!) {
+ addNamespace(input: $input) {
+ __typename
+ ... on AddSuccess {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def update_namespaces(execute_update):
+ mutation = """
+ mutation updateNamespaces($ids: [Int!]!, $input: UpdateNamespaceInput!) {
+ updateNamespaces(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_namespaces(execute_delete):
+ mutation = """
+ mutation deleteNamespaces($ids: [Int!]!) {
+ deleteNamespaces(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.mark.anyio
+async def test_query_namespace(query_namespace, gen_namespace):
+ namespace = await DB.add(next(gen_namespace))
+
+ response = Response(await query_namespace(namespace.id))
+ response.assert_is("Namespace")
+
+ assert response.id == namespace.id
+ assert response.name == namespace.name
+ assert response.sortName == namespace.sort_name
+
+
+@pytest.mark.anyio
+async def test_query_namespace_fails_not_found(query_namespace):
+ response = Response(await query_namespace(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_namespaces(query_namespaces, gen_namespace):
+ namespaces = await DB.add_all(*gen_namespace)
+ response = Response(await query_namespaces())
+ response.assert_is("NamespaceFilterResult")
+
+ assert response.count == len(namespaces)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(namespaces)
+
+ edges = iter(response.edges)
+ for namespace in sorted(namespaces, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == namespace.id
+ assert edge["name"] == namespace.name
+
+
+@pytest.mark.anyio
+async def test_add_namespace(add_namespace):
+ response = Response(await add_namespace({"name": "added", "sortName": "foo"}))
+ response.assert_is("AddSuccess")
+
+ namespace = await DB.get(Namespace, response.id)
+ assert namespace is not None
+ assert namespace.name == "added"
+ assert namespace.sort_name == "foo"
+
+
+@pytest.mark.anyio
+async def test_add_namespace_fails_empty_parameter(add_namespace):
+ response = Response(await add_namespace({"name": ""}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_add_namespace_fails_exists(add_namespace, gen_namespace):
+ namespace = await DB.add(next(gen_namespace))
+
+ response = Response(await add_namespace({"name": namespace.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Namespace with this name exists"
+
+
+@pytest.mark.anyio
+async def test_delete_namespace(delete_namespaces, gen_namespace):
+ namespace = await DB.add(next(gen_namespace))
+ id = namespace.id
+
+ response = Response(await delete_namespaces(id))
+ response.assert_is("DeleteSuccess")
+
+ namespace = await DB.get(Namespace, id)
+ assert namespace is None
+
+
+@pytest.mark.anyio
+async def test_delete_namespace_not_found(delete_namespaces):
+ response = Response(await delete_namespaces(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_namespace(update_namespaces, gen_namespace):
+ namespace = await DB.add(next(gen_namespace))
+
+ input = {"name": "updated", "sortName": "foo"}
+ response = Response(await update_namespaces(namespace.id, input))
+ response.assert_is("UpdateSuccess")
+
+ namespace = await DB.get(Namespace, namespace.id)
+ assert namespace is not None
+ assert namespace.name == "updated"
+ assert namespace.sort_name == "foo"
+
+
+@pytest.mark.anyio
+async def test_update_namespace_fails_exists(update_namespaces, gen_namespace):
+ first = await DB.add(next(gen_namespace))
+ second = await DB.add(next(gen_namespace))
+
+ response = Response(await update_namespaces(second.id, {"name": first.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Namespace with this name exists"
+
+
+@pytest.mark.anyio
+async def test_update_namespace_fails_not_found(update_namespaces):
+ response = Response(await update_namespaces(1, {"name": "updated"}))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_namespaces_cannot_bulk_edit_name(
+ update_namespaces, gen_namespace
+):
+ first = await DB.add(next(gen_namespace))
+ second = await DB.add(next(gen_namespace))
+
+ response = Response(
+ await update_namespaces([first.id, second.id], {"name": "unique"})
+ )
+ response.assert_is("InvalidParameterError")
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "none",
+ "empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_namespace_fails_empty_parameter(
+ update_namespaces, gen_namespace, empty
+):
+ namespace = await DB.add(next(gen_namespace))
+ response = Response(await update_namespaces(namespace.id, {"name": empty}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_namespace_changes_updated_at(update_namespaces):
+ original_namespace = Namespace(name="namespace")
+ original_namespace.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_namespace = await DB.add(original_namespace)
+
+ response = Response(
+ await update_namespaces(original_namespace.id, {"name": "updated"})
+ )
+ response.assert_is("UpdateSuccess")
+
+ namespace = await DB.get(Namespace, original_namespace.id)
+ assert namespace.updated_at > original_namespace.updated_at
diff --git a/tests/api/test_page.py b/tests/api/test_page.py
new file mode 100644
index 0000000..debd69a
--- /dev/null
+++ b/tests/api/test_page.py
@@ -0,0 +1,39 @@
+from datetime import datetime, timezone
+
+import pytest
+from conftest import DB
+from hircine.api.types import Page
+from hircine.db.models import Archive
+
+
+@pytest.mark.anyio
+async def test_page(gen_page):
+ pages = list(gen_page)
+ images = [p.image for p in pages]
+
+ # persist images and pages in database by binding them to a throwaway
+ # archive
+ archive = await DB.add(
+ Archive(
+ hash="339e3a32e5648fdeb2597f05cb2e1ef6",
+ path="some/archive.zip",
+ size=78631597,
+ mtime=datetime(1999, 12, 27, tzinfo=timezone.utc),
+ cover=images[0],
+ pages=pages,
+ page_count=len(pages),
+ )
+ )
+
+ assert len(archive.pages) == len(pages)
+
+ page_iter = iter(pages)
+ image_iter = iter(images)
+ for page in [Page(p) for p in archive.pages]:
+ matching_page = next(page_iter)
+ matching_image = next(image_iter)
+
+ assert page.id == matching_page.id
+ assert page.comic_id is None
+ assert page.image.id == matching_image.id
+ assert page.path == matching_page.path
diff --git a/tests/api/test_pagination.py b/tests/api/test_pagination.py
new file mode 100644
index 0000000..67854c3
--- /dev/null
+++ b/tests/api/test_pagination.py
@@ -0,0 +1,61 @@
+import pytest
+from conftest import DB, Response
+
+
+@pytest.fixture
+def query_pagination(schema_execute):
+ query = """
+ query artists($pagination: Pagination) {
+ artists(pagination: $pagination) {
+ __typename
+ count
+ edges {
+ id
+ }
+ }
+ }
+ """
+
+ async def _execute(pagination=None):
+ return await schema_execute(
+ query, {"pagination": pagination} if pagination else None
+ )
+
+ return _execute
+
+
+@pytest.mark.parametrize(
+ "pagination,count,length",
+ [
+ (None, 4, 4),
+ ({"items": 3, "page": 1}, 4, 3),
+ ({"items": 3, "page": 2}, 4, 1),
+ ({"items": 3, "page": 3}, 0, 0),
+ ({"items": 10, "page": 1}, 4, 4),
+ ({"items": 0, "page": 1}, 0, 0),
+ ({"items": 2, "page": 0}, 0, 0),
+ ({"items": -1, "page": 0}, 0, 0),
+ ({"items": 0, "page": -1}, 0, 0),
+ ],
+ ids=[
+ "is missing and lists all",
+ "lists first page",
+ "lists last page",
+ "lists none (no more items)",
+ "lists all",
+ "lists none (zero items)",
+ "lists none (page zero)",
+ "lists none (negative items)",
+ "lists none (negative page)",
+ ],
+)
+@pytest.mark.anyio
+async def test_pagination(query_pagination, gen_artist, pagination, count, length):
+ await DB.add_all(*gen_artist)
+
+ response = Response(await query_pagination(pagination))
+ response.assert_is("ArtistFilterResult")
+
+ assert response.count == count
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == length
diff --git a/tests/api/test_scraper_api.py b/tests/api/test_scraper_api.py
new file mode 100644
index 0000000..1edd74f
--- /dev/null
+++ b/tests/api/test_scraper_api.py
@@ -0,0 +1,395 @@
+import hircine.enums as enums
+import hircine.plugins
+import hircine.scraper.types as scraped
+import pytest
+from conftest import DB, Response
+from hircine.scraper import ScrapeError, Scraper, ScrapeWarning
+
+
+@pytest.fixture
+def query_comic_scrapers(schema_execute):
+ query = """
+ query comicScrapers($id: Int!) {
+ comicScrapers(id: $id) {
+ __typename
+ id
+ name
+ }
+ }
+ """
+
+ async def _execute(id):
+ return await schema_execute(query, {"id": id})
+
+ return _execute
+
+
+@pytest.fixture
+def query_scrape_comic(schema_execute):
+ query = """
+ query scrapeComic($id: Int!, $scraper: String!) {
+ scrapeComic(id: $id, scraper: $scraper) {
+ __typename
+ ... on ScrapeComicResult {
+ data {
+ title
+ originalTitle
+ url
+ artists
+ category
+ censorship
+ characters
+ circles
+ date
+ direction
+ language
+ layout
+ rating
+ tags
+ worlds
+ }
+ warnings
+ }
+ ... on Error {
+ message
+ }
+ ... on ScraperNotFoundError {
+ name
+ }
+ ... on ScraperNotAvailableError {
+ scraper
+ comicId
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ async def _execute(id, scraper):
+ return await schema_execute(query, {"id": id, "scraper": scraper})
+
+ return _execute
+
+
+@pytest.fixture
+def scrapers(empty_plugins):
+ class GoodScraper(Scraper):
+ name = "Good Scraper"
+ is_available = True
+ source = "good"
+
+ def scrape(self):
+ yield scraped.Title("Arid Savannah Adventures")
+ yield scraped.OriginalTitle("Arid Savannah Hijinx")
+ yield scraped.URL("file:///home/savannah/adventures")
+ yield scraped.Language(enums.Language.EN)
+ yield scraped.Date.from_iso("2010-07-05")
+ yield scraped.Direction(enums.Direction["LEFT_TO_RIGHT"])
+ yield scraped.Layout(enums.Layout.SINGLE)
+ yield scraped.Rating(enums.Rating.SAFE)
+ yield scraped.Category(enums.Category.MANGA)
+ yield scraped.Censorship(enums.Censorship.NONE)
+ yield scraped.Tag.from_string("animal:small")
+ yield scraped.Tag.from_string("animal:medium")
+ yield scraped.Tag.from_string("animal:big")
+ yield scraped.Tag.from_string("animal:massive")
+ yield scraped.Artist("alan smithee")
+ yield scraped.Artist("david agnew")
+ yield scraped.Character("greta giraffe")
+ yield scraped.Character("bob bear")
+ yield scraped.Character("rico rhinoceros")
+ yield scraped.Character("ziggy zebra")
+ yield scraped.Circle("archimedes")
+ yield scraped.World("animal friends")
+
+ class DuplicateScraper(Scraper):
+ name = "Duplicate Scraper"
+ is_available = True
+ source = "dupe"
+
+ def gen(self):
+ yield scraped.Title("Arid Savannah Adventures")
+ yield scraped.OriginalTitle("Arid Savannah Hijinx")
+ yield scraped.URL("file:///home/savannah/adventures")
+ yield scraped.Language(enums.Language.EN)
+ yield scraped.Date.from_iso("2010-07-05")
+ yield scraped.Direction(enums.Direction["LEFT_TO_RIGHT"])
+ yield scraped.Layout(enums.Layout.SINGLE)
+ yield scraped.Rating(enums.Rating.SAFE)
+ yield scraped.Category(enums.Category.MANGA)
+ yield scraped.Censorship(enums.Censorship.NONE)
+ yield scraped.Tag.from_string("animal:small")
+ yield scraped.Tag.from_string("animal:medium")
+ yield scraped.Tag.from_string("animal:big")
+ yield scraped.Tag.from_string("animal:massive")
+ yield scraped.Artist("alan smithee")
+ yield scraped.Artist("david agnew")
+ yield scraped.Character("greta giraffe")
+ yield scraped.Character("bob bear")
+ yield scraped.Character("rico rhinoceros")
+ yield scraped.Character("ziggy zebra")
+ yield scraped.Circle("archimedes")
+ yield scraped.World("animal friends")
+
+ def scrape(self):
+ yield from self.gen()
+ yield from self.gen()
+
+ class WarnScraper(Scraper):
+ name = "Warn Scraper"
+ is_available = True
+ source = "warn"
+
+ def warn_on_purpose(self, item):
+ raise ScrapeWarning(f"Could not parse: {item}")
+
+ def scrape(self):
+ yield scraped.Title("Arid Savannah Adventures")
+ yield lambda: self.warn_on_purpose("Arid Savannah Hijinx")
+ yield scraped.Language(enums.Language.EN)
+
+ class FailScraper(Scraper):
+ name = "Fail Scraper"
+ is_available = True
+ source = "fail"
+
+ def scrape(self):
+ yield scraped.Title("Arid Savannah Adventures")
+ raise ScrapeError("Could not continue")
+ yield scraped.Language(enums.Language.EN)
+
+ class UnavailableScraper(Scraper):
+ name = "Unavailable Scraper"
+ is_available = False
+ source = "unavail"
+
+ def scrape(self):
+ yield None
+
+ hircine.plugins.register_scraper("good", GoodScraper)
+ hircine.plugins.register_scraper("dupe", DuplicateScraper)
+ hircine.plugins.register_scraper("warn", WarnScraper)
+ hircine.plugins.register_scraper("fail", FailScraper)
+ hircine.plugins.register_scraper("unavail", UnavailableScraper)
+
+ return [
+ ("good", GoodScraper),
+ ("dupe", DuplicateScraper),
+ ("warn", WarnScraper),
+ ("fail", FailScraper),
+ ("unavail", UnavailableScraper),
+ ]
+
+
+@pytest.mark.anyio
+async def test_comic_scrapers(gen_comic, query_comic_scrapers, scrapers):
+ comic = await DB.add(next(gen_comic))
+ response = Response(await query_comic_scrapers(comic.id))
+
+ assert isinstance((response.data), list)
+
+ available_scrapers = []
+ for name, cls in sorted(scrapers, key=lambda s: s[1].name):
+ instance = cls(comic)
+ if instance.is_available:
+ available_scrapers.append((name, cls))
+
+ assert len(response.data) == len(available_scrapers)
+
+ data = iter(response.data)
+ for id, scraper in available_scrapers:
+ field = next(data)
+ assert field["__typename"] == "ComicScraper"
+ assert field["id"] == id
+ assert field["name"] == scraper.name
+
+
+@pytest.mark.anyio
+async def test_comic_empty_for_missing_comic(gen_comic, query_comic_scrapers, scrapers):
+ response = Response(await query_comic_scrapers(1))
+
+ assert response.data == []
+
+
+@pytest.mark.anyio
+async def test_scrape_comic(gen_comic, query_scrape_comic, scrapers):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_scrape_comic(comic.id, "good"))
+ response.assert_is("ScrapeComicResult")
+
+ assert response.warnings == []
+
+ scraped_comic = response.data["data"]
+
+ assert scraped_comic["title"] == "Arid Savannah Adventures"
+ assert scraped_comic["originalTitle"] == "Arid Savannah Hijinx"
+ assert scraped_comic["url"] == "file:///home/savannah/adventures"
+ assert scraped_comic["language"] == "EN"
+ assert scraped_comic["date"] == "2010-07-05"
+ assert scraped_comic["rating"] == "SAFE"
+ assert scraped_comic["category"] == "MANGA"
+ assert scraped_comic["direction"] == "LEFT_TO_RIGHT"
+ assert scraped_comic["layout"] == "SINGLE"
+ assert scraped_comic["tags"] == [
+ "animal:small",
+ "animal:medium",
+ "animal:big",
+ "animal:massive",
+ ]
+ assert scraped_comic["artists"] == ["alan smithee", "david agnew"]
+ assert scraped_comic["characters"] == [
+ "greta giraffe",
+ "bob bear",
+ "rico rhinoceros",
+ "ziggy zebra",
+ ]
+ assert scraped_comic["circles"] == ["archimedes"]
+ assert scraped_comic["worlds"] == ["animal friends"]
+
+
+@pytest.mark.anyio
+async def test_scrape_comic_removes_duplicates(gen_comic, query_scrape_comic, scrapers):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_scrape_comic(comic.id, "dupe"))
+ response.assert_is("ScrapeComicResult")
+
+ assert response.warnings == []
+
+ scraped_comic = response.data["data"]
+
+ assert scraped_comic["title"] == "Arid Savannah Adventures"
+ assert scraped_comic["originalTitle"] == "Arid Savannah Hijinx"
+ assert scraped_comic["url"] == "file:///home/savannah/adventures"
+ assert scraped_comic["language"] == "EN"
+ assert scraped_comic["date"] == "2010-07-05"
+ assert scraped_comic["rating"] == "SAFE"
+ assert scraped_comic["category"] == "MANGA"
+ assert scraped_comic["direction"] == "LEFT_TO_RIGHT"
+ assert scraped_comic["layout"] == "SINGLE"
+ assert scraped_comic["tags"] == [
+ "animal:small",
+ "animal:medium",
+ "animal:big",
+ "animal:massive",
+ ]
+ assert scraped_comic["artists"] == ["alan smithee", "david agnew"]
+ assert scraped_comic["characters"] == [
+ "greta giraffe",
+ "bob bear",
+ "rico rhinoceros",
+ "ziggy zebra",
+ ]
+ assert scraped_comic["circles"] == ["archimedes"]
+ assert scraped_comic["worlds"] == ["animal friends"]
+
+
+@pytest.mark.anyio
+async def test_scrape_comic_fails_comic_not_found(query_scrape_comic, scrapers):
+ response = Response(await query_scrape_comic(1, "good"))
+ response.assert_is("IDNotFoundError")
+
+ assert response.id == 1
+ assert response.message == "Comic ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_scrape_comic_fails_scraper_not_found(
+ gen_comic, query_scrape_comic, scrapers
+):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_scrape_comic(comic.id, "missing"))
+ response.assert_is("ScraperNotFoundError")
+
+ assert response.name == "missing"
+ assert response.message == "Scraper not found: 'missing'"
+
+
+@pytest.mark.anyio
+async def test_scrape_comic_fails_scraper_not_available(
+ gen_comic, query_scrape_comic, scrapers
+):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_scrape_comic(comic.id, "unavail"))
+ response.assert_is("ScraperNotAvailableError")
+
+ assert response.scraper == "unavail"
+ assert response.comicId == comic.id
+ assert response.message == f"Scraper unavail not available for comic ID {comic.id}"
+
+
+async def test_scrape_comic_with_transformer(gen_comic, query_scrape_comic, scrapers):
+ def keep(generator, info):
+ for item in generator:
+ match item:
+ case scraped.Title():
+ yield item
+
+ hircine.plugins.transformers = [keep]
+
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_scrape_comic(comic.id, "good"))
+ response.assert_is("ScrapeComicResult")
+
+ assert response.warnings == []
+
+ scraped_comic = response.data["data"]
+
+ assert scraped_comic["title"] == "Arid Savannah Adventures"
+ assert scraped_comic["originalTitle"] is None
+ assert scraped_comic["url"] is None
+ assert scraped_comic["language"] is None
+ assert scraped_comic["date"] is None
+ assert scraped_comic["rating"] is None
+ assert scraped_comic["category"] is None
+ assert scraped_comic["censorship"] is None
+ assert scraped_comic["direction"] is None
+ assert scraped_comic["layout"] is None
+ assert scraped_comic["tags"] == []
+ assert scraped_comic["artists"] == []
+ assert scraped_comic["characters"] == []
+ assert scraped_comic["circles"] == []
+ assert scraped_comic["worlds"] == []
+
+
+@pytest.mark.anyio
+async def test_scrape_comic_catches_warnings(gen_comic, query_scrape_comic, scrapers):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_scrape_comic(comic.id, "warn"))
+ response.assert_is("ScrapeComicResult")
+
+ assert response.warnings == ["Could not parse: Arid Savannah Hijinx"]
+
+ scraped_comic = response.data["data"]
+
+ assert scraped_comic["title"] == "Arid Savannah Adventures"
+ assert scraped_comic["originalTitle"] is None
+ assert scraped_comic["language"] == "EN"
+ assert scraped_comic["date"] is None
+ assert scraped_comic["rating"] is None
+ assert scraped_comic["category"] is None
+ assert scraped_comic["direction"] is None
+ assert scraped_comic["layout"] is None
+ assert scraped_comic["tags"] == []
+ assert scraped_comic["artists"] == []
+ assert scraped_comic["characters"] == []
+ assert scraped_comic["circles"] == []
+ assert scraped_comic["worlds"] == []
+
+
+@pytest.mark.anyio
+async def test_scrape_comic_fails_with_scraper_error(
+ gen_comic, query_scrape_comic, scrapers
+):
+ comic = await DB.add(next(gen_comic))
+
+ response = Response(await query_scrape_comic(comic.id, "fail"))
+ response.assert_is("ScraperError")
+ assert response.message == "Scraping failed: Could not continue"
diff --git a/tests/api/test_sort.py b/tests/api/test_sort.py
new file mode 100644
index 0000000..b3c8562
--- /dev/null
+++ b/tests/api/test_sort.py
@@ -0,0 +1,137 @@
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Namespace
+
+
+@pytest.fixture
+def query_comic_sort(execute_sort):
+ query = """
+ query comics($sort: ComicSortInput) {
+ comics(sort: $sort) {
+ __typename
+ count
+ edges {
+ id
+ title
+ }
+ }
+ }
+ """
+
+ return execute_sort(query)
+
+
+@pytest.fixture
+def query_namespace_sort(execute_sort):
+ query = """
+ query namespaces($sort: NamespaceSortInput) {
+ namespaces(sort: $sort) {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute_sort(query)
+
+
+@pytest.mark.parametrize(
+ "sort,reverse",
+ [
+ ({"on": "DATE"}, False),
+ ({"on": "DATE", "direction": "DESCENDING"}, True),
+ ({"on": "DATE", "direction": "ASCENDING"}, False),
+ ],
+ ids=[
+ "ascending (default)",
+ "descending",
+ "ascending",
+ ],
+)
+@pytest.mark.anyio
+async def test_query_comics_sort_date(gen_comic, query_comic_sort, sort, reverse):
+ comics = await DB.add_all(*gen_comic)
+ ids = [c.id for c in sorted(comics, key=lambda c: c.date, reverse=reverse)]
+
+ response = Response(await query_comic_sort(sort))
+ response.assert_is("ComicFilterResult")
+
+ assert ids == [edge["id"] for edge in response.edges]
+
+
+@pytest.mark.parametrize(
+ "sort,reverse",
+ [
+ ({"on": "TAG_COUNT"}, False),
+ ({"on": "TAG_COUNT", "direction": "DESCENDING"}, True),
+ ({"on": "TAG_COUNT", "direction": "ASCENDING"}, False),
+ ],
+ ids=[
+ "ascending (default)",
+ "descending",
+ "ascending",
+ ],
+)
+@pytest.mark.anyio
+async def test_query_comics_sort_tag_count(gen_comic, query_comic_sort, sort, reverse):
+ comics = await DB.add_all(*gen_comic)
+ ids = [c.id for c in sorted(comics, key=lambda c: len(c.tags), reverse=reverse)]
+
+ response = Response(await query_comic_sort(sort))
+ response.assert_is("ComicFilterResult")
+
+ assert ids == [edge["id"] for edge in response.edges]
+
+
+@pytest.mark.anyio
+async def test_query_comics_sort_random(gen_comic, query_comic_sort):
+ comics = await DB.add_all(*gen_comic)
+ ids = set([c.id for c in comics])
+
+ response = Response(await query_comic_sort({"on": "RANDOM"}))
+ response.assert_is("ComicFilterResult")
+
+ assert ids == set(edge["id"] for edge in response.edges)
+
+
+@pytest.mark.anyio
+async def test_query_comics_sort_random_seed_direction(gen_comic, query_comic_sort):
+ comics = await DB.add_all(*gen_comic)
+ ids = set([c.id for c in comics])
+
+ response = Response(
+ await query_comic_sort(
+ {"on": "RANDOM", "seed": 42069, "direction": "ASCENDING"}
+ )
+ )
+ response.assert_is("ComicFilterResult")
+
+ ascending_ids = [edge["id"] for edge in response.edges]
+
+ assert ids == set(ascending_ids)
+
+ response = Response(
+ await query_comic_sort(
+ {"on": "RANDOM", "seed": 42069, "direction": "DESCENDING"}
+ )
+ )
+ response.assert_is("ComicFilterResult")
+
+ descending_ids = [edge["id"] for edge in response.edges]
+
+ assert ascending_ids == descending_ids[::-1]
+
+
+@pytest.mark.anyio
+async def test_query_namespace_sort_sort_name(query_namespace_sort):
+ await DB.add(Namespace(name="one", sort_name="2"))
+ await DB.add(Namespace(name="two", sort_name="1"))
+
+ response = Response(await query_namespace_sort({"on": "SORT_NAME"}))
+ response.assert_is("NamespaceFilterResult")
+
+ assert ["two", "one"] == [edge["name"] for edge in response.edges]
diff --git a/tests/api/test_tag.py b/tests/api/test_tag.py
new file mode 100644
index 0000000..c863a00
--- /dev/null
+++ b/tests/api/test_tag.py
@@ -0,0 +1,441 @@
+from datetime import datetime as dt
+from datetime import timezone
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Namespace, Tag
+
+
+@pytest.fixture
+def query_tag(execute_id):
+ query = """
+ query tag($id: Int!) {
+ tag(id: $id) {
+ __typename
+ ... on FullTag {
+ id
+ name
+ description
+ namespaces {
+ __typename
+ id
+ }
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_tags(execute):
+ query = """
+ query tags {
+ tags {
+ __typename
+ count
+ edges {
+ id
+ name
+ description
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def add_tag(execute_add):
+ mutation = """
+ mutation addTag($input: AddTagInput!) {
+ addTag(input: $input) {
+ __typename
+ ... on AddSuccess {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def update_tags(execute_update):
+ mutation = """
+ mutation updateTags($ids: [Int!]!, $input: UpdateTagInput!) {
+ updateTags(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_tags(execute_delete):
+ mutation = """
+ mutation deleteTags($ids: [Int!]!) {
+ deleteTags(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.mark.anyio
+async def test_query_tag(query_tag, gen_tag):
+ tag = await DB.add(next(gen_tag))
+
+ response = Response(await query_tag(tag.id))
+ response.assert_is("FullTag")
+
+ assert response.id == tag.id
+ assert response.name == tag.name
+ assert response.description == tag.description
+ assert set([n["id"] for n in response.namespaces]) == set(
+ [n.id for n in tag.namespaces]
+ )
+
+
+@pytest.mark.anyio
+async def test_query_tag_fails_not_found(query_tag):
+ response = Response(await query_tag(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_tags(query_tags, gen_tag):
+ tags = await DB.add_all(*gen_tag)
+ response = Response(await query_tags())
+ response.assert_is("TagFilterResult")
+
+ assert response.count == len(tags)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(tags)
+
+ edges = iter(response.edges)
+ for tag in sorted(tags, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == tag.id
+ assert edge["name"] == tag.name
+ assert edge["description"] == tag.description
+
+
+@pytest.mark.anyio
+async def test_add_tag(add_tag):
+ response = Response(
+ await add_tag({"name": "added", "description": "it's been added!"})
+ )
+ response.assert_is("AddSuccess")
+
+ tag = await DB.get(Tag, response.id)
+ assert tag is not None
+ assert tag.name == "added"
+ assert tag.description == "it's been added!"
+
+
+@pytest.mark.anyio
+async def test_add_tag_with_namespace(add_tag):
+ namespace = await DB.add(Namespace(id=1, name="new"))
+
+ response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}}))
+ response.assert_is("AddSuccess")
+
+ tag = await DB.get(Tag, response.id, full=True)
+ assert tag is not None
+ assert tag.name == "added"
+ assert tag.namespaces[0].id == namespace.id
+ assert tag.namespaces[0].name == namespace.name
+
+
+@pytest.mark.anyio
+async def test_add_tag_fails_empty_parameter(add_tag):
+ response = Response(await add_tag({"name": ""}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_add_tag_fails_namespace_not_found(add_tag):
+ response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}}))
+ response.assert_is("IDNotFoundError")
+
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_add_tag_fails_exists(add_tag, gen_tag):
+ tag = await DB.add(next(gen_tag))
+
+ response = Response(await add_tag({"name": tag.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Tag with this name exists"
+
+
+@pytest.mark.anyio
+async def test_delete_tag(delete_tags, gen_tag):
+ tag = await DB.add(next(gen_tag))
+ id = tag.id
+
+ response = Response(await delete_tags(id))
+ response.assert_is("DeleteSuccess")
+
+ tag = await DB.get(Tag, id)
+ assert tag is None
+
+
+@pytest.mark.anyio
+async def test_delete_tag_not_found(delete_tags):
+ response = Response(await delete_tags(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_tag(update_tags, gen_tag, gen_namespace):
+ tag = await DB.add(next(gen_tag))
+ namespace = await DB.add(next(gen_namespace))
+
+ input = {
+ "name": "updated",
+ "description": "how different, how unique",
+ "namespaces": {"ids": [1]},
+ }
+ response = Response(await update_tags(tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, tag.id, full=True)
+ assert tag is not None
+ assert tag.name == "updated"
+ assert tag.description == "how different, how unique"
+ assert tag.namespaces[0].id == namespace.id
+ assert tag.namespaces[0].name == namespace.name
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "with None",
+ "with empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_tag_clears_description(update_tags, gen_tag, empty):
+ tag = await DB.add(next(gen_tag))
+
+ input = {
+ "description": empty,
+ }
+ response = Response(await update_tags(tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, tag.id)
+ assert tag is not None
+ assert tag.description is None
+
+
+@pytest.mark.anyio
+async def test_update_tag_fails_exists(update_tags, gen_tag):
+ first = await DB.add(next(gen_tag))
+ second = await DB.add(next(gen_tag))
+
+ response = Response(await update_tags(second.id, {"name": first.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Tag with this name exists"
+
+
+@pytest.mark.anyio
+async def test_update_tag_fails_not_found(update_tags):
+ response = Response(await update_tags(1, {"name": "updated"}))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_tags_cannot_bulk_edit_name(update_tags, gen_tag):
+ first = await DB.add(next(gen_tag))
+ second = await DB.add(next(gen_tag))
+
+ response = Response(await update_tags([first.id, second.id], {"name": "unique"}))
+ response.assert_is("InvalidParameterError")
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "none",
+ "empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_tag_fails_empty_parameter(update_tags, gen_tag, empty):
+ tag = await DB.add(next(gen_tag))
+ response = Response(await update_tags(tag.id, {"name": empty}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_tag_fails_namespace_not_found(update_tags, gen_tag):
+ tag = await DB.add(next(gen_tag))
+ response = Response(await update_tags(tag.id, {"namespaces": {"ids": [1]}}))
+ response.assert_is("IDNotFoundError")
+
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.parametrize(
+ "options",
+ [
+ None,
+ {},
+ {"mode": "REPLACE"},
+ ],
+ ids=[
+ "by default (none)",
+ "by default (empty record)",
+ "when defined explicitly",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_tag_replaces_assocs(update_tags, gen_tag, options):
+ original_tag = await DB.add(next(gen_tag))
+ new_namespace = await DB.add(Namespace(name="new"))
+
+ input = {
+ "namespaces": {"ids": [new_namespace.id]},
+ }
+ response = Response(await update_tags(original_tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id, full=True)
+
+ assert set([o.id for o in tag.namespaces]) == set([o.id for o in [new_namespace]])
+
+
+@pytest.mark.anyio
+async def test_update_tag_adds_assocs(update_tags, gen_tag):
+ original_tag = await DB.add(next(gen_tag))
+ new_namespace = await DB.add(Namespace(name="new"))
+ added_namespaces = original_tag.namespaces + [new_namespace]
+
+ input = {
+ "namespaces": {"ids": [new_namespace.id], "options": {"mode": "ADD"}},
+ }
+ response = Response(await update_tags(original_tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id, full=True)
+
+ assert set([o.id for o in tag.namespaces]) == set([o.id for o in added_namespaces])
+
+
+@pytest.mark.anyio
+async def test_update_tag_removes_assocs(update_tags):
+ removed_namespace = Namespace(id=1, name="new")
+ remaining_namespace = Namespace(id=2, name="newtwo")
+ original_tag = await DB.add(
+ Tag(id=1, name="tag", namespaces=[removed_namespace, remaining_namespace])
+ )
+
+ input = {
+ "namespaces": {"ids": [removed_namespace.id], "options": {"mode": "REMOVE"}},
+ }
+ response = Response(await update_tags(original_tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id, full=True)
+
+ assert set([o.id for o in tag.namespaces]) == set([remaining_namespace.id])
+
+
+@pytest.mark.anyio
+async def test_update_tag_changes_updated_at(update_tags):
+ original_tag = Tag(name="tag")
+ original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_tag = await DB.add(original_tag)
+
+ response = Response(await update_tags(original_tag.id, {"name": "updated"}))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id)
+ assert tag.updated_at > original_tag.updated_at
+
+
+@pytest.mark.anyio
+async def test_update_tag_assoc_changes_updated_at(update_tags):
+ original_tag = Tag(name="tag")
+ original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_tag = await DB.add(original_tag)
+ await DB.add(Namespace(id=1, name="namespace"))
+
+ response = Response(
+ await update_tags(original_tag.id, {"namespaces": {"ids": [1]}})
+ )
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id)
+ assert tag.updated_at > original_tag.updated_at
diff --git a/tests/api/test_world.py b/tests/api/test_world.py
new file mode 100644
index 0000000..a3926d1
--- /dev/null
+++ b/tests/api/test_world.py
@@ -0,0 +1,278 @@
+from datetime import datetime as dt
+from datetime import timezone
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import World
+
+
+@pytest.fixture
+def query_world(execute_id):
+ query = """
+ query world($id: Int!) {
+ world(id: $id) {
+ __typename
+ ... on World {
+ id
+ name
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_worlds(execute):
+ query = """
+ query worlds {
+ worlds {
+ __typename
+ count
+ edges {
+ id
+ name
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def add_world(execute_add):
+ mutation = """
+ mutation addWorld($input: AddWorldInput!) {
+ addWorld(input: $input) {
+ __typename
+ ... on AddSuccess {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def update_worlds(execute_update):
+ mutation = """
+ mutation updateWorlds($ids: [Int!]!, $input: UpdateWorldInput!) {
+ updateWorlds(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_worlds(execute_delete):
+ mutation = """
+ mutation deleteWorlds($ids: [Int!]!) {
+ deleteWorlds(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.mark.anyio
+async def test_query_world(query_world, gen_world):
+ world = await DB.add(next(gen_world))
+
+ response = Response(await query_world(world.id))
+ response.assert_is("World")
+
+ assert response.id == world.id
+ assert response.name == world.name
+
+
+@pytest.mark.anyio
+async def test_query_world_fails_not_found(query_world):
+ response = Response(await query_world(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "World ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_worlds(query_worlds, gen_world):
+ worlds = await DB.add_all(*gen_world)
+
+ response = Response(await query_worlds())
+ response.assert_is("WorldFilterResult")
+
+ assert response.count == len(worlds)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(worlds)
+
+ edges = iter(response.edges)
+ for world in sorted(worlds, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == world.id
+ assert edge["name"] == world.name
+
+
+@pytest.mark.anyio
+async def test_add_world(add_world):
+ response = Response(await add_world({"name": "added world"}))
+ response.assert_is("AddSuccess")
+
+ world = await DB.get(World, response.id)
+ assert world is not None
+ assert world.name == "added world"
+
+
+@pytest.mark.anyio
+async def test_add_world_fails_empty_parameter(add_world):
+ response = Response(await add_world({"name": ""}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_add_world_fails_exists(add_world, gen_world):
+ world = await DB.add(next(gen_world))
+
+ response = Response(await add_world({"name": world.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another World with this name exists"
+
+
+@pytest.mark.anyio
+async def test_delete_world(delete_worlds, gen_world):
+ world = await DB.add(next(gen_world))
+ id = world.id
+
+ response = Response(await delete_worlds(id))
+ response.assert_is("DeleteSuccess")
+
+ world = await DB.get(World, id)
+ assert world is None
+
+
+@pytest.mark.anyio
+async def test_delete_world_not_found(delete_worlds):
+ response = Response(await delete_worlds(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "World ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_world(update_worlds, gen_world):
+ world = await DB.add(next(gen_world))
+
+ input = {"name": "updated world"}
+ response = Response(await update_worlds(world.id, input))
+ response.assert_is("UpdateSuccess")
+
+ world = await DB.get(World, world.id)
+ assert world is not None
+ assert world.name == "updated world"
+
+
+@pytest.mark.anyio
+async def test_update_world_fails_exists(update_worlds, gen_world):
+ first = await DB.add(next(gen_world))
+ second = await DB.add(next(gen_world))
+
+ response = Response(await update_worlds(second.id, {"name": first.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another World with this name exists"
+
+
+@pytest.mark.anyio
+async def test_update_world_fails_not_found(update_worlds):
+ response = Response(await update_worlds(1, {"name": "updated world"}))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "World ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_worlds_cannot_bulk_edit_name(update_worlds, gen_world):
+ first = await DB.add(next(gen_world))
+ second = await DB.add(next(gen_world))
+
+ response = Response(await update_worlds([first.id, second.id], {"name": "unique"}))
+ response.assert_is("InvalidParameterError")
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "none",
+ "empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_world_fails_empty_parameter(update_worlds, gen_world, empty):
+ world = await DB.add(next(gen_world))
+
+ response = Response(await update_worlds(world.id, {"name": empty}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_world_changes_updated_at(update_worlds):
+ original_world = World(name="world")
+ original_world.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_world = await DB.add(original_world)
+
+ response = Response(await update_worlds(original_world.id, {"name": "updated"}))
+ response.assert_is("UpdateSuccess")
+
+ world = await DB.get(World, original_world.id)
+ assert world.updated_at > original_world.updated_at
diff --git a/tests/config/data/config.toml b/tests/config/data/config.toml
new file mode 100644
index 0000000..2a21e03
--- /dev/null
+++ b/tests/config/data/config.toml
@@ -0,0 +1,3 @@
+database = "foo"
+scan = "bar"
+objects = "baz"
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..a36be2d
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,594 @@
+import os
+import shutil
+from datetime import date, timedelta
+from datetime import datetime as dt
+from datetime import timezone as tz
+
+import hircine
+import hircine.db as database
+import hircine.db.models as models
+import hircine.plugins
+import pytest
+from hircine.app import schema
+from hircine.enums import Category, Censorship, Direction, Language, Layout, Rating
+from sqlalchemy.ext.asyncio import AsyncSession
+
+
+@pytest.fixture(scope="session")
+def anyio_backend():
+ return "asyncio"
+
+
+def pytest_addoption(parser):
+ parser.addoption(
+ "--sql-echo",
+ action="store_true",
+ help="Enable logging of SQL statements",
+ )
+
+
+@pytest.fixture
+def empty_plugins(monkeypatch):
+ monkeypatch.setattr(hircine.plugins, "scraper_registry", {})
+ monkeypatch.setattr(hircine.plugins, "transformers", [])
+
+
+@pytest.fixture
+def data(tmpdir, request):
+ file = request.module.__file__
+ data = os.path.join(os.path.dirname(file), "data")
+
+ if os.path.isdir(data):
+ shutil.copytree(data, tmpdir, dirs_exist_ok=True)
+
+ return lambda dir: os.path.join(tmpdir, dir)
+
+
+@pytest.fixture(scope="session")
+def engine(pytestconfig):
+ yield database.create_engine(":memory:", echo=pytestconfig.option.sql_echo)
+
+
+@pytest.fixture
+async def session(anyio_backend, engine):
+ async with engine.begin() as conn:
+ await conn.begin_nested()
+ yield AsyncSession(conn, expire_on_commit=False, autoflush=False)
+ await conn.rollback()
+
+
+@pytest.fixture(autouse=True)
+async def patch_session(anyio_backend, session, engine, monkeypatch):
+ monkeypatch.setattr(hircine.db, "session", lambda: session)
+
+
+@pytest.fixture(scope="session", autouse=True)
+async def metadata(engine, anyio_backend):
+ await database.initialize(engine)
+
+
+@pytest.fixture
+def schema_execute():
+ async def _execute(endpoint, variables=None):
+ return await schema.execute(endpoint, variable_values=variables)
+
+ return _execute
+
+
+@pytest.fixture
+def execute(schema_execute):
+ def wrapper(q):
+ async def _execute():
+ return await schema_execute(q)
+
+ return _execute
+
+ return wrapper
+
+
+@pytest.fixture
+def execute_add(schema_execute):
+ def wrapper(q):
+ async def _execute(input):
+ return await schema_execute(q, {"input": input})
+
+ return _execute
+
+ return wrapper
+
+
+@pytest.fixture
+def execute_update(schema_execute):
+ def wrapper(q):
+ async def _execute(ids, input):
+ return await schema_execute(q, {"ids": ids, "input": input})
+
+ return _execute
+
+ return wrapper
+
+
+@pytest.fixture
+def execute_update_single(schema_execute):
+ def wrapper(q):
+ async def _execute(id, input):
+ return await schema_execute(q, {"id": id, "input": input})
+
+ return _execute
+
+ return wrapper
+
+
+@pytest.fixture
+def execute_delete(schema_execute):
+ def wrapper(q):
+ async def _execute(ids):
+ return await schema_execute(q, {"ids": ids})
+
+ return _execute
+
+ return wrapper
+
+
+@pytest.fixture
+def execute_id(schema_execute):
+ def wrapper(q):
+ async def _execute(id):
+ return await schema_execute(q, {"id": id})
+
+ return _execute
+
+ return wrapper
+
+
+@pytest.fixture
+def execute_filter(schema_execute):
+ def wrapper(q):
+ async def _execute(filter=None):
+ return await schema_execute(q, {"filter": filter} if filter else None)
+
+ return _execute
+
+ return wrapper
+
+
+@pytest.fixture
+def execute_sort(schema_execute):
+ def wrapper(q):
+ async def _execute(sort=None):
+ return await schema_execute(q, {"sort": sort} if sort else None)
+
+ return _execute
+
+ return wrapper
+
+
+class DB:
+ @staticmethod
+ async def add(model):
+ async with database.session() as s:
+ s.add(model)
+ await s.commit()
+ return model
+
+ @staticmethod
+ async def add_all(*models):
+ async with database.session() as s:
+ s.add_all(models)
+ await s.commit()
+ return models
+
+ @staticmethod
+ async def get(modelcls, id, full=False):
+ async with database.session() as s:
+ options = modelcls.load_full() if full else []
+ model = await s.get(modelcls, id, options=options)
+ return model
+
+ @staticmethod
+ async def delete(modelcls, id):
+ async with database.session() as s:
+ model = await s.get(modelcls, id)
+ await s.delete(model)
+ await s.commit()
+ return
+
+
+class Response:
+ def __init__(self, response, key=None):
+ assert response.errors is None
+
+ if key is None:
+ assert response.data is not None
+ assert len(response.data) == 1
+ key = next(iter(response.data.keys()))
+
+ assert key in response.data
+ self.data = response.data.get(key)
+ self.errors = response.errors
+
+ def __getattr__(self, name):
+ assert name in self.data
+ return self.data.get(name)
+
+ def assert_is(self, typename):
+ assert self.data["__typename"] == typename
+
+
+@pytest.fixture
+def gen_artist():
+ def _gen():
+ yield models.Artist(id=1, name="alan smithee")
+ yield models.Artist(id=2, name="david agnew")
+ yield models.Artist(id=3, name="robin bland")
+ yield models.Artist(id=4, name="robin smith")
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_character():
+ def _gen():
+ yield models.Character(id=1, name="greta giraffe")
+ yield models.Character(id=2, name="bob bear")
+ yield models.Character(id=3, name="rico rhinoceros")
+ yield models.Character(id=4, name="ziggy zebra")
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_circle():
+ def _gen():
+ yield models.Circle(id=1, name="archimedes")
+ yield models.Circle(id=2, name="bankoff")
+ yield models.Circle(id=3, name="carlyle")
+ yield models.Circle(id=4, name="ford")
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_namespace():
+ def _gen():
+ yield models.Namespace(id=1, name="animal", sort_name="animal")
+ yield models.Namespace(id=2, name="human", sort_name="human")
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_tag():
+ def _gen():
+ yield models.Tag(
+ id=1, name="small", description="barely visible", namespaces=[]
+ )
+ yield models.Tag(
+ id=2,
+ name="medium",
+ description="mostly average",
+ namespaces=[],
+ )
+ yield models.Tag(id=3, name="big", description="impressive", namespaces=[])
+ yield models.Tag(
+ id=4, name="massive", description="what is THAT", namespaces=[]
+ )
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_world():
+ def _gen():
+ yield models.World(id=1, name="animal friends")
+ yield models.World(id=2, name="criminanimals")
+ yield models.World(id=3, name="in the nude")
+ yield models.World(id=4, name="wall street")
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_image():
+ def _gen():
+ yield models.Image(
+ id=1, hash="1bb05614b44bf177589632a51ce216a2", width=3024, height=2106
+ )
+ yield models.Image(
+ id=2, hash="77dfd96aee1bc8c36ab7095fcf18f7ff", width=3024, height=2094
+ )
+ yield models.Image(
+ id=3, hash="109aac22f29bd361fbfb19f975a1b7f0", width=3019, height=2089
+ )
+ yield models.Image(
+ id=4, hash="e18fc95f00a087ff001ecd8675eddd14", width=3024, height=2097
+ )
+ yield models.Image(
+ id=5, hash="0e2cd2f176e792a3777710978768bc90", width=1607, height=2259
+ )
+ yield models.Image(
+ id=6, hash="64e50730eb842750ebe5417a524b83e6", width=1556, height=2264
+ )
+ yield models.Image(
+ id=7, hash="d906ef54788cae72e1a511c9775e6d68", width=1525, height=2259
+ )
+ yield models.Image(
+ id=8, hash="0f8ead4a60df09a1dd071617b5d5583b", width=1545, height=2264
+ )
+ yield models.Image(
+ id=9, hash="912ccb4350fb17ea1248e26ecfb5d983", width=1607, height=2259
+ )
+ yield models.Image(
+ id=10, hash="108edee1b417f022a6d1f999bd32d16d", width=1546, height=2224
+ )
+ yield models.Image(
+ id=11, hash="97c0903cb0962741174f264aaa7015d4", width=1528, height=2257
+ )
+ yield models.Image(
+ id=12, hash="b5490ad31d2a8910087ba932073b4e52", width=1543, height=2271
+ )
+ yield models.Image(
+ id=13, hash="c9ab7febcb81974a992ed1de60c728ba", width=1611, height=2257
+ )
+ yield models.Image(
+ id=14, hash="bcfdf22ec17a09cd4f6a0af86e966e8f", width=1553, height=2265
+ )
+ yield models.Image(
+ id=15, hash="1f58f4b08bf6f4ca92bd29cbce26241e", width=1526, height=2258
+ )
+ yield models.Image(
+ id=16, hash="f87d7e55203b5e7cf9c801db48624ef0", width=1645, height=2262
+ )
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_page(gen_image):
+ def _gen():
+ yield models.Page(id=1, index=1, path="001.png", image=next(gen_image))
+ yield models.Page(id=2, index=2, path="002.png", image=next(gen_image))
+ yield models.Page(id=3, index=3, path="003.png", image=next(gen_image))
+ yield models.Page(id=4, index=4, path="004.png", image=next(gen_image))
+ yield models.Page(id=5, index=1, path="00.jpg", image=next(gen_image))
+ yield models.Page(id=6, index=2, path="01.jpg", image=next(gen_image))
+ yield models.Page(id=7, index=3, path="02.jpg", image=next(gen_image))
+ yield models.Page(id=8, index=4, path="03.jpg", image=next(gen_image))
+ yield models.Page(id=9, index=1, path="1.jpg", image=next(gen_image))
+ yield models.Page(id=10, index=2, path="2.jpg", image=next(gen_image))
+ yield models.Page(id=11, index=3, path="10.jpg", image=next(gen_image))
+ yield models.Page(id=12, index=4, path="11.jpg", image=next(gen_image))
+ yield models.Page(id=13, index=1, path="010.png", image=next(gen_image))
+ yield models.Page(id=14, index=2, path="011.png", image=next(gen_image))
+ yield models.Page(id=15, index=3, path="012.png", image=next(gen_image))
+ yield models.Page(id=16, index=4, path="013.png", image=next(gen_image))
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_jumbled_pages(gen_image):
+ def _gen():
+ yield models.Page(id=101, index=3, path="3.png", image=next(gen_image))
+ yield models.Page(id=52, index=9, path="9.png", image=next(gen_image))
+ yield models.Page(id=13, index=2, path="2.png", image=next(gen_image))
+ yield models.Page(id=258, index=10, path="10.png", image=next(gen_image))
+ yield models.Page(id=7, index=7, path="7.jpg", image=next(gen_image))
+ yield models.Page(id=25, index=5, path="5.jpg", image=next(gen_image))
+ yield models.Page(id=150, index=1, path="1.jpg", image=next(gen_image))
+ yield models.Page(id=69, index=4, path="4.jpg", image=next(gen_image))
+ yield models.Page(id=219, index=6, path="6.jpg", image=next(gen_image))
+ yield models.Page(id=34, index=8, path="8.jpg", image=next(gen_image))
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_jumbled_archive(gen_jumbled_pages):
+ def _gen():
+ pages = [next(gen_jumbled_pages) for _ in range(10)]
+ yield models.Archive(
+ id=100,
+ hash="4e1243bd22c66e76c2ba9eddc1f91394",
+ path="comics/jumbled.zip",
+ size=32559235,
+ mtime=dt(2002, 1, 23).astimezone(),
+ cover=pages[0].image,
+ pages=pages,
+ page_count=len(pages),
+ )
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_archive(gen_page):
+ def _gen():
+ pages = [next(gen_page) for _ in range(4)]
+ yield models.Archive(
+ id=1,
+ hash="1d394f66c49ccb1d3c30870904d31bd4",
+ path="comics/archive-01.zip",
+ size=7340032,
+ mtime=dt(2016, 5, 10).astimezone(),
+ cover=pages[0].image,
+ pages=pages,
+ page_count=len(pages),
+ )
+
+ pages = [next(gen_page) for _ in range(4)]
+ yield models.Archive(
+ id=2,
+ hash="d7d8929b2e606200e863d390f71b53bb",
+ path="comics/archive-02.zip",
+ size=11335106,
+ mtime=dt(2008, 10, 2, tzinfo=tz(timedelta(hours=+6))),
+ cover=pages[0].image,
+ pages=pages,
+ page_count=len(pages),
+ )
+
+ pages = [next(gen_page) for _ in range(4)]
+ yield models.Archive(
+ id=3,
+ hash="02669dbe08c4a5f4820c10b3ff2178fa",
+ path="comics/sub/archive-new.zip",
+ size=51841969,
+ mtime=dt(2005, 11, 17, tzinfo=tz(timedelta(hours=+2))),
+ cover=pages[0].image,
+ pages=pages,
+ page_count=len(pages),
+ )
+
+ pages = [next(gen_page) for _ in range(4)]
+ yield models.Archive(
+ id=4,
+ hash="6b2ecf5ceb8befd6d0c1cd353a3df709",
+ path="comics/archive-03.zip",
+ size=13568769,
+ mtime=dt(1999, 5, 8, tzinfo=tz(timedelta(hours=-2))),
+ cover=pages[0].image,
+ pages=pages,
+ page_count=len(pages),
+ )
+
+ return _gen()
+
+
+@pytest.fixture
+def gen_comic(
+ gen_archive,
+ gen_artist,
+ gen_character,
+ gen_circle,
+ gen_world,
+ gen_tag,
+ gen_namespace,
+):
+ def _gen():
+ artists = {a.id: a for a in gen_artist}
+ characters = {c.id: c for c in gen_character}
+
+ namespaces = {ns.id: ns for ns in gen_namespace}
+ tags = {t.id: t for t in gen_tag}
+
+ def tag(nid, tid):
+ return models.ComicTag(namespace=namespaces[nid], tag=tags[tid])
+
+ archive = next(gen_archive)
+ yield models.Comic(
+ id=1,
+ title="Arid Savannah Adventures",
+ url="file:///home/savannah/adventures",
+ category=Category.MANGA,
+ censorship=Censorship.NONE,
+ date=date(2010, 7, 5),
+ direction=Direction.LEFT_TO_RIGHT,
+ favourite=True,
+ language=Language.EN,
+ layout=Layout.SINGLE,
+ rating=Rating.SAFE,
+ archive=archive,
+ artists=[artists[1], artists[2]],
+ characters=list(characters.values()),
+ circles=[next(gen_circle)],
+ worlds=[next(gen_world)],
+ cover=archive.cover,
+ pages=archive.pages,
+ tags=[
+ tag(1, 1),
+ tag(1, 2),
+ tag(1, 3),
+ tag(1, 4),
+ ],
+ )
+
+ archive = next(gen_archive)
+ yield models.Comic(
+ id=2,
+ title="This Giraffe Stole My Wallet",
+ original_title="Diese Giraffe hat mein Geldbeutel geklaut",
+ url="ftp://crimes.local/giraffes.zip",
+ category=Category.MANGA,
+ censorship=Censorship.BAR,
+ date=date(2002, 2, 17),
+ direction=Direction.LEFT_TO_RIGHT,
+ favourite=False,
+ language=Language.EN,
+ layout=Layout.SINGLE,
+ rating=Rating.QUESTIONABLE,
+ archive=archive,
+ artists=[artists[3]],
+ characters=[characters[1]],
+ circles=[next(gen_circle)],
+ worlds=[next(gen_world)],
+ cover=archive.cover,
+ pages=archive.pages,
+ tags=[
+ tag(1, 3),
+ tag(2, 1),
+ ],
+ )
+
+ archive = next(gen_archive)
+ yield models.Comic(
+ id=3,
+ title="サイのスパ",
+ category=Category.ARTBOOK,
+ censorship=Censorship.MOSAIC,
+ date=date(2017, 5, 3),
+ direction=Direction.RIGHT_TO_LEFT,
+ favourite=False,
+ language=Language.JA,
+ layout=Layout.DOUBLE_OFFSET,
+ rating=Rating.EXPLICIT,
+ archive=archive,
+ artists=[artists[1], artists[4]],
+ characters=[characters[3]],
+ circles=[next(gen_circle)],
+ worlds=[next(gen_world)],
+ cover=archive.cover,
+ pages=archive.pages,
+ tags=[
+ tag(1, 4),
+ ],
+ )
+
+ archive = next(gen_archive)
+ yield models.Comic(
+ id=4,
+ title="In the Company of Vultures",
+ category=Category.DOUJINSHI,
+ date=date(2023, 3, 10),
+ direction=Direction.LEFT_TO_RIGHT,
+ favourite=False,
+ language=Language.EN,
+ layout=Layout.SINGLE,
+ rating=Rating.SAFE,
+ archive=archive,
+ artists=[artists[4]],
+ characters=[characters[4]],
+ circles=[next(gen_circle)],
+ worlds=[next(gen_world)],
+ cover=archive.cover,
+ pages=archive.pages,
+ tags=[
+ tag(2, 1),
+ tag(2, 2),
+ tag(2, 3),
+ ],
+ )
+
+ return _gen()
+
+
+@pytest.fixture
+def empty_comic(gen_archive):
+ archive = next(gen_archive)
+ yield models.Comic(
+ id=100,
+ title="Hic Sunt Dracones",
+ archive=archive,
+ cover=archive.cover,
+ pages=archive.pages,
+ )
diff --git a/tests/plugins/test_plugins.py b/tests/plugins/test_plugins.py
new file mode 100644
index 0000000..dd7042e
--- /dev/null
+++ b/tests/plugins/test_plugins.py
@@ -0,0 +1,9 @@
+import hircine.plugins
+
+
+def test_plugin_transformer_decorator(empty_plugins):
+ @hircine.plugins.transformer
+ def ignore(generator, info):
+ return
+
+ assert hircine.plugins.transformers == [ignore]
diff --git a/tests/scanner/data/contents/archive.zip b/tests/scanner/data/contents/archive.zip
new file mode 100644
index 0000000..990eb98
--- /dev/null
+++ b/tests/scanner/data/contents/archive.zip
Binary files differ
diff --git a/tests/scanner/test_scanner.py b/tests/scanner/test_scanner.py
new file mode 100644
index 0000000..45a966f
--- /dev/null
+++ b/tests/scanner/test_scanner.py
@@ -0,0 +1,311 @@
+import configparser
+import os
+import shutil
+from datetime import datetime, timezone
+from pathlib import Path
+from zipfile import ZipFile
+
+import hircine.thumbnailer
+import pytest
+from conftest import DB
+from hircine.config import DirectoryStructure
+from hircine.db.models import Archive, Image, Page
+from hircine.scanner import Scanner, Status
+from hircine.thumbnailer import object_path
+
+
+def pageset(pages):
+ return set([(page.path, page.archive_id, page.image.hash) for page in pages])
+
+
+@pytest.fixture
+def archive(data):
+ stat = os.stat(data("contents/archive.zip"))
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ cover = Image(
+ id=1,
+ hash="4ac228082aaf8bedc0fbd4859c5324c2acf0d1c63f9097d55e9be88d0804eaa4",
+ width=0,
+ height=0,
+ )
+
+ archive = Archive(
+ id=1,
+ path=data("contents/archive.zip"),
+ hash="8aa2fd72954fb9103776114172d345ad4446babf292e876a892cfbed1c283523",
+ size=stat.st_size,
+ mtime=mtime,
+ cover=cover,
+ pages=[
+ Page(
+ id=1,
+ archive_id=1,
+ index=1,
+ path="01.png",
+ image=cover,
+ ),
+ Page(
+ id=2,
+ archive_id=1,
+ index=2,
+ path="02.png",
+ image=Image(
+ id=2,
+ hash="9b2c7a9c1f3d1c5a07fa1492d9d91ace5122262559c7f513e3b97464d2edb753",
+ width=0,
+ height=0,
+ ),
+ ),
+ Page(
+ id=3,
+ archive_id=1,
+ index=3,
+ path="03.png",
+ image=Image(
+ id=3,
+ hash="ed132e79daf9e93970d14d9443b7870f1aefd12aa9d3fba8cab0096984754ff5",
+ width=0,
+ height=0,
+ ),
+ ),
+ ],
+ page_count=3,
+ )
+
+ yield archive
+
+
+@pytest.fixture
+def scanner(data, monkeypatch):
+ monkeypatch.setattr(
+ hircine.thumbnailer.Thumbnailer, "process", lambda s, a, b: (0, 0)
+ )
+
+ dirs = DirectoryStructure(scan=data("contents/"), objects=data("objects/"))
+ yield Scanner(configparser.ConfigParser(), dirs)
+
+
+@pytest.mark.anyio
+async def test_scanner_adds_new_archive(archive, scanner, capsys):
+ await scanner.scan()
+ added_archive = await DB.get(Archive, 1, full=True)
+
+ assert added_archive.hash == archive.hash
+ assert pageset(added_archive.pages) == pageset(archive.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[+] archive.zip\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_dedups_archive_contents(archive, scanner, capsys):
+ archive = await DB.add(archive)
+
+ dedup_path = archive.path + ".dedup"
+ with ZipFile(archive.path, "r") as zin:
+ with ZipFile(dedup_path, "w") as zout:
+ for info in zin.infolist():
+ base, ext = os.path.splitext(info.filename)
+
+ if base == "03":
+ continue
+
+ if ext == ".png":
+ zout.writestr(f"0{base}.png", zin.read(info))
+ else:
+ zout.writestr(info.filename, zin.read(info))
+
+ await scanner.scan()
+ added_archive = await DB.get(Archive, 2, full=True)
+
+ assert (
+ added_archive.hash
+ == "fc2ea810eddc231824aef44db62d5f3de89b3747e4aea6b5728c1532aabdeccd"
+ )
+
+ pages = set()
+ for page in archive.pages:
+ if page.path == "03.png":
+ continue
+
+ pages.add((f"0{page.path}", 2, page.image.hash))
+
+ assert pageset(added_archive.pages) == pages
+
+ captured = capsys.readouterr()
+ assert captured.out == "[+] archive.zip.dedup\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_skips_same_mtime(archive, scanner, capsys):
+ archive = await DB.add(archive)
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_finds_existing_before_duplicate(archive, scanner, capsys):
+ stat = os.stat(archive.path)
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ before = await DB.add(archive)
+
+ copy_path = before.path + ".copy"
+ shutil.copyfile(Path(before.path), copy_path)
+
+ await scanner.scan()
+
+ after = await DB.get(Archive, before.id, full=True)
+ assert after.hash == before.hash
+ assert after.path == before.path
+ assert after.mtime == mtime
+ assert pageset(after.pages) == pageset(before.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[I] archive.zip.copy\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_skips_non_zip(data, scanner, capsys):
+ Path(data("contents/archive.zip")).unlink()
+ Path(data("contents/non_zip.txt")).touch()
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_skips_link(data, scanner, capsys):
+ Path(data("contents/archive.zip")).rename(data("archive.zip"))
+ os.symlink(data("archive.zip"), data("contents/archive.zip"))
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_updates_mtime(archive, scanner, capsys):
+ Path(archive.path).touch()
+ stat = os.stat(archive.path)
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ archive = await DB.add(archive)
+ await scanner.scan()
+
+ updated_archive = await DB.get(Archive, archive.id, full=True)
+ assert updated_archive.hash == archive.hash
+ assert updated_archive.path == archive.path
+ assert updated_archive.mtime == mtime
+ assert pageset(updated_archive.pages) == pageset(archive.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[*] archive.zip\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_updates_path(archive, scanner, capsys):
+ new_path = archive.path + ".new"
+
+ Path(archive.path).rename(new_path)
+ stat = os.stat(new_path)
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ archive = await DB.add(archive)
+ await scanner.scan()
+
+ updated_archive = await DB.get(Archive, archive.id, full=True)
+ assert updated_archive.hash == archive.hash
+ assert updated_archive.path == new_path
+ assert updated_archive.mtime == mtime
+ assert pageset(updated_archive.pages) == pageset(archive.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[>] archive.zip -> archive.zip.new\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_reports_missing(archive, scanner):
+ archive = await DB.add(archive)
+ Path(archive.path).unlink()
+ await scanner.scan()
+
+ assert scanner.registry.orphans == {archive.hash: (archive.id, archive.path)}
+
+
+@pytest.mark.anyio
+async def test_scanner_reports_duplicate(archive, scanner, capsys):
+ archive = await DB.add(archive)
+ copy_path = archive.path + ".copy"
+ shutil.copyfile(Path(archive.path), copy_path)
+ await scanner.scan()
+
+ assert list(scanner.registry.duplicates) == [
+ [
+ (archive.path, Status.UNCHANGED),
+ (copy_path, Status.IGNORED),
+ ]
+ ]
+
+ captured = capsys.readouterr()
+ assert captured.out == "[I] archive.zip.copy\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_ignores_empty_archive(archive, scanner, capsys):
+ Path(archive.path).unlink()
+
+ empty_path = archive.path + ".empty"
+ ZipFile(empty_path, "w").close()
+
+ await scanner.scan()
+
+ assert scanner.registry.marked == {}
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_reports_conflict(archive, scanner, capsys):
+ archive = await DB.add(archive)
+ ZipFile(archive.path, "w").close()
+
+ await scanner.scan()
+
+ assert scanner.registry.conflicts == {
+ archive.path: (
+ archive.hash,
+ "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262",
+ )
+ }
+
+ captured = capsys.readouterr()
+ assert captured.out == "[!] archive.zip\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_reprocess(archive, data, scanner, capsys):
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == "[+] archive.zip\n"
+
+ old_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full")))
+ old_mtime = datetime.fromtimestamp(old_stat.st_mtime, tz=timezone.utc)
+
+ scanner.reprocess = True
+
+ await scanner.scan()
+
+ new_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full")))
+ new_mtime = datetime.fromtimestamp(new_stat.st_mtime, tz=timezone.utc)
+
+ assert new_mtime > old_mtime
+
+ captured = capsys.readouterr()
+ assert captured.out == "[~] archive.zip\n"
diff --git a/tests/scrapers/test_scraper.py b/tests/scrapers/test_scraper.py
new file mode 100644
index 0000000..6f6f29d
--- /dev/null
+++ b/tests/scrapers/test_scraper.py
@@ -0,0 +1,55 @@
+from hircine.scraper import Scraper, ScrapeWarning
+
+
+class MockScraper(Scraper):
+ is_available = True
+
+ def scrape(self):
+ yield lambda: "foo"
+ yield "bar"
+
+
+class WarningScraper(Scraper):
+ is_available = True
+
+ def warn(self, str):
+ raise ScrapeWarning("Invalid input")
+
+ def scrape(self):
+ yield lambda: "foo"
+ yield lambda: self.warn("bar")
+ yield "baz"
+
+
+class ParserlessScraper(Scraper):
+ is_available = True
+
+ def scrape(self):
+ yield "literal"
+
+
+def test_scraper_collects():
+ generator = MockScraper(None).collect()
+
+ assert set(generator) == set(["foo", "bar"])
+
+
+def test_scraper_collects_with_transformer():
+ generator = MockScraper(None).collect([lambda gen, info: map(str.upper, gen)])
+
+ assert set(generator) == set(["FOO", "BAR"])
+
+
+def test_scraper_collects_warnings():
+ scraper = WarningScraper(None)
+ generator = scraper.collect()
+
+ assert set(generator) == set(["foo", "baz"])
+ assert scraper.get_warnings() == ["Invalid input"]
+
+
+def test_scraper_collects_literal():
+ scraper = ParserlessScraper(None)
+ generator = scraper.collect()
+
+ assert set(generator) == set(["literal"])
diff --git a/tests/scrapers/test_scraper_utils.py b/tests/scrapers/test_scraper_utils.py
new file mode 100644
index 0000000..193cf2a
--- /dev/null
+++ b/tests/scrapers/test_scraper_utils.py
@@ -0,0 +1,28 @@
+from hircine.scraper.utils import parse_dict
+
+
+def test_parse_dict():
+ dict = {
+ "scalar": "foo",
+ "list": ["bar", "baz"],
+ "dict": {"nested_scalar": "qux", "nested_list": ["plugh", "xyzzy"]},
+ }
+
+ def id(type):
+ return lambda item: f"{type}_{item}"
+
+ parsers = {
+ "scalar": id("scalar"),
+ "list": id("list"),
+ "dict": {"nested_scalar": id("scalar"), "nested_list": id("list")},
+ "missing": id("missing"),
+ }
+
+ assert [f() for f in parse_dict(parsers, dict)] == [
+ "scalar_foo",
+ "list_bar",
+ "list_baz",
+ "scalar_qux",
+ "list_plugh",
+ "list_xyzzy",
+ ]
diff --git a/tests/scrapers/test_types.py b/tests/scrapers/test_types.py
new file mode 100644
index 0000000..ed937e7
--- /dev/null
+++ b/tests/scrapers/test_types.py
@@ -0,0 +1,131 @@
+from datetime import date
+
+import pytest
+from hircine.api.types import ScrapedComic
+from hircine.scraper import ScrapeWarning
+from hircine.scraper.types import (
+ Artist,
+ Category,
+ Character,
+ Circle,
+ Date,
+ Language,
+ OriginalTitle,
+ Rating,
+ Tag,
+ Title,
+ World,
+)
+
+
+@pytest.mark.parametrize(
+ "input,options,want",
+ [
+ ("foo", {}, Tag(namespace="none", tag="foo")),
+ ("foo:bar", {}, Tag(namespace="foo", tag="bar")),
+ ("foo:bar:baz", {}, Tag(namespace="foo", tag="bar:baz")),
+ ("foo/bar", {"delimiter": "/"}, Tag(namespace="foo", tag="bar")),
+ ],
+ ids=[
+ "tag only",
+ "tag and namespace",
+ "tag with delimiter",
+ "custom delimiter",
+ ],
+)
+def test_tag_from_string(input, options, want):
+ assert Tag.from_string(input, **options) == want
+
+
+@pytest.mark.parametrize(
+ "input,want",
+ [
+ ("1998-02-07", Date(value=date(1998, 2, 7))),
+ ("2018-07-18T19:15", Date(value=date(2018, 7, 18))),
+ (
+ "2003-12-30T10:37Z",
+ Date(value=date(2003, 12, 30)),
+ ),
+ ],
+)
+def test_date_from_iso(input, want):
+ assert Date.from_iso(input) == want
+
+
+@pytest.mark.parametrize(
+ "input",
+ [
+ ("text"),
+ ("1997 02 07"),
+ ("1997/02/07"),
+ ],
+)
+def test_date_from_iso_fails(input):
+ with pytest.raises(ScrapeWarning, match="Could not parse date:"):
+ Date.from_iso(input)
+
+
+@pytest.mark.parametrize(
+ "input,want",
+ [
+ ("886806000", Date(value=date(1998, 2, 7))),
+ (886806000, Date(value=date(1998, 2, 7))),
+ ],
+)
+def test_date_from_timestamp(input, want):
+ assert Date.from_timestamp(input) == want
+
+
+@pytest.mark.parametrize(
+ "input",
+ [
+ ("text"),
+ ],
+)
+def test_date_from_timestamp_fails(input):
+ with pytest.raises(ScrapeWarning, match="Could not parse date:"):
+ Date.from_timestamp(input)
+
+
+@pytest.mark.parametrize(
+ "item,attr,empty",
+ [
+ (Title(""), "title", None),
+ (OriginalTitle(""), "original_title", None),
+ (Language(None), "language", None),
+ (Date(None), "date", None),
+ (Rating(None), "rating", None),
+ (Category(None), "category", None),
+ (Tag("", ""), "tags", []),
+ (Tag(namespace="", tag=""), "tags", []),
+ (Tag(namespace=None, tag=""), "tags", []),
+ (Tag(namespace="foo", tag=""), "tags", []),
+ (Artist(""), "artists", []),
+ (Character(""), "characters", []),
+ (Circle(""), "circles", []),
+ (World(""), "worlds", []),
+ ],
+ ids=[
+ "title",
+ "original title",
+ "language",
+ "date",
+ "rating",
+ "category",
+ "tag (both empty, positional)",
+ "tag (both empty)",
+ "tag (namespace None, tag empty)",
+ "tag (tag empty)",
+ "artist",
+ "character",
+ "circle",
+ "world",
+ ],
+)
+def test_scraped_comic_silently_ignores_empty(item, attr, empty):
+ def gen():
+ yield item
+
+ comic = ScrapedComic.from_generator(gen())
+
+ assert getattr(comic, attr) == empty
diff --git a/tests/thumbnailer/data/example_palette.png b/tests/thumbnailer/data/example_palette.png
new file mode 100644
index 0000000..6bf25e1
--- /dev/null
+++ b/tests/thumbnailer/data/example_palette.png
Binary files differ
diff --git a/tests/thumbnailer/data/example_rgb.png b/tests/thumbnailer/data/example_rgb.png
new file mode 100644
index 0000000..a245642
--- /dev/null
+++ b/tests/thumbnailer/data/example_rgb.png
Binary files differ
diff --git a/tests/thumbnailer/test_thumbnailer.py b/tests/thumbnailer/test_thumbnailer.py
new file mode 100644
index 0000000..62bf127
--- /dev/null
+++ b/tests/thumbnailer/test_thumbnailer.py
@@ -0,0 +1,74 @@
+import os
+from pathlib import Path
+
+import pytest
+from hircine.thumbnailer import Thumbnailer, ThumbnailParameters
+from PIL import Image
+
+mock_params = ThumbnailParameters(bounds=(1440, 2880), options={})
+
+
+def test_thumbnailer_object():
+ thumb = Thumbnailer("objects/", params={})
+ assert thumb.object("abcdef", "foo") == os.path.join("objects/", "ab/cdef_foo.webp")
+
+
+@pytest.mark.parametrize(
+ "extension, can_process",
+ [
+ (".png", True),
+ (".jpeg", True),
+ (".jpg", True),
+ (".gif", True),
+ (".bmp", True),
+ (".json", False),
+ (".txt", False),
+ ],
+ ids=["png", "jpeg", "jpg", "gif", "bmp", "json", "txt"],
+)
+def test_thumbnailer_can_process(extension, can_process):
+ assert Thumbnailer.can_process(extension) == can_process
+
+
+def test_thumbnailer_process(data):
+ thumb = Thumbnailer(data("objects/"), params={"mock": mock_params})
+
+ with open(data("example_rgb.png"), "rb") as f:
+ size = Image.open(f, mode="r").size
+ reported_size = thumb.process(f, "abcdef")
+
+ assert reported_size == size
+
+ output = thumb.object("abcdef", "mock")
+
+ assert os.path.exists(output)
+
+
+def test_thumbnailer_converts_non_rgb(data):
+ thumb = Thumbnailer(data("objects/"), params={"mock": mock_params})
+
+ with open(data("example_palette.png"), "rb") as f:
+ size = Image.open(f, mode="r").size
+ reported_size = thumb.process(f, "abcdef")
+
+ assert reported_size == size
+
+ output = thumb.object("abcdef", "mock")
+
+ assert os.path.exists(output)
+
+ output_image = Image.open(output)
+ assert output_image.mode == "RGB"
+
+
+def test_thumbnailer_process_ignores_existing(data):
+ thumb = Thumbnailer(data("objects/"), params={"mock": mock_params})
+
+ output = Path(thumb.object("abcdef", "mock"))
+ os.makedirs(os.path.dirname(output))
+ output.touch()
+
+ with open(data("example_palette.png"), "rb") as f:
+ thumb.process(f, "abcdef")
+
+ assert output.stat().st_size == 0