From d1d654ebac2d51e3841675faeb56480e440f622f Mon Sep 17 00:00:00 2001 From: Wolfgang Müller Date: Tue, 5 Mar 2024 18:08:09 +0100 Subject: Initial commit --- tests/api/test_tag.py | 441 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 tests/api/test_tag.py (limited to 'tests/api/test_tag.py') diff --git a/tests/api/test_tag.py b/tests/api/test_tag.py new file mode 100644 index 0000000..c863a00 --- /dev/null +++ b/tests/api/test_tag.py @@ -0,0 +1,441 @@ +from datetime import datetime as dt +from datetime import timezone + +import pytest +from conftest import DB, Response +from hircine.db.models import Namespace, Tag + + +@pytest.fixture +def query_tag(execute_id): + query = """ + query tag($id: Int!) { + tag(id: $id) { + __typename + ... on FullTag { + id + name + description + namespaces { + __typename + id + } + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_id(query) + + +@pytest.fixture +def query_tags(execute): + query = """ + query tags { + tags { + __typename + count + edges { + id + name + description + } + } + } + """ + + return execute(query) + + +@pytest.fixture +def add_tag(execute_add): + mutation = """ + mutation addTag($input: AddTagInput!) { + addTag(input: $input) { + __typename + ... on AddSuccess { + id + } + ... on Error { + message + } + ... on InvalidParameterError { + parameter + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_add(mutation) + + +@pytest.fixture +def update_tags(execute_update): + mutation = """ + mutation updateTags($ids: [Int!]!, $input: UpdateTagInput!) { + updateTags(ids: $ids, input: $input) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + ... on InvalidParameterError { + parameter + } + } + } + """ # noqa: E501 + + return execute_update(mutation) + + +@pytest.fixture +def delete_tags(execute_delete): + mutation = """ + mutation deleteTags($ids: [Int!]!) { + deleteTags(ids: $ids) { + __typename + ... on Success { + message + } + ... on Error { + message + } + ... on IDNotFoundError { + id + } + } + } + """ + + return execute_delete(mutation) + + +@pytest.mark.anyio +async def test_query_tag(query_tag, gen_tag): + tag = await DB.add(next(gen_tag)) + + response = Response(await query_tag(tag.id)) + response.assert_is("FullTag") + + assert response.id == tag.id + assert response.name == tag.name + assert response.description == tag.description + assert set([n["id"] for n in response.namespaces]) == set( + [n.id for n in tag.namespaces] + ) + + +@pytest.mark.anyio +async def test_query_tag_fails_not_found(query_tag): + response = Response(await query_tag(1)) + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_query_tags(query_tags, gen_tag): + tags = await DB.add_all(*gen_tag) + response = Response(await query_tags()) + response.assert_is("TagFilterResult") + + assert response.count == len(tags) + assert isinstance((response.edges), list) + assert len(response.edges) == len(tags) + + edges = iter(response.edges) + for tag in sorted(tags, key=lambda a: a.name): + edge = next(edges) + assert edge["id"] == tag.id + assert edge["name"] == tag.name + assert edge["description"] == tag.description + + +@pytest.mark.anyio +async def test_add_tag(add_tag): + response = Response( + await add_tag({"name": "added", "description": "it's been added!"}) + ) + response.assert_is("AddSuccess") + + tag = await DB.get(Tag, response.id) + assert tag is not None + assert tag.name == "added" + assert tag.description == "it's been added!" + + +@pytest.mark.anyio +async def test_add_tag_with_namespace(add_tag): + namespace = await DB.add(Namespace(id=1, name="new")) + + response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) + response.assert_is("AddSuccess") + + tag = await DB.get(Tag, response.id, full=True) + assert tag is not None + assert tag.name == "added" + assert tag.namespaces[0].id == namespace.id + assert tag.namespaces[0].name == namespace.name + + +@pytest.mark.anyio +async def test_add_tag_fails_empty_parameter(add_tag): + response = Response(await add_tag({"name": ""})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_add_tag_fails_namespace_not_found(add_tag): + response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.anyio +async def test_add_tag_fails_exists(add_tag, gen_tag): + tag = await DB.add(next(gen_tag)) + + response = Response(await add_tag({"name": tag.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Tag with this name exists" + + +@pytest.mark.anyio +async def test_delete_tag(delete_tags, gen_tag): + tag = await DB.add(next(gen_tag)) + id = tag.id + + response = Response(await delete_tags(id)) + response.assert_is("DeleteSuccess") + + tag = await DB.get(Tag, id) + assert tag is None + + +@pytest.mark.anyio +async def test_delete_tag_not_found(delete_tags): + response = Response(await delete_tags(1)) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_tag(update_tags, gen_tag, gen_namespace): + tag = await DB.add(next(gen_tag)) + namespace = await DB.add(next(gen_namespace)) + + input = { + "name": "updated", + "description": "how different, how unique", + "namespaces": {"ids": [1]}, + } + response = Response(await update_tags(tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, tag.id, full=True) + assert tag is not None + assert tag.name == "updated" + assert tag.description == "how different, how unique" + assert tag.namespaces[0].id == namespace.id + assert tag.namespaces[0].name == namespace.name + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "with None", + "with empty string", + ], +) +@pytest.mark.anyio +async def test_update_tag_clears_description(update_tags, gen_tag, empty): + tag = await DB.add(next(gen_tag)) + + input = { + "description": empty, + } + response = Response(await update_tags(tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, tag.id) + assert tag is not None + assert tag.description is None + + +@pytest.mark.anyio +async def test_update_tag_fails_exists(update_tags, gen_tag): + first = await DB.add(next(gen_tag)) + second = await DB.add(next(gen_tag)) + + response = Response(await update_tags(second.id, {"name": first.name})) + response.assert_is("NameExistsError") + assert response.message == "Another Tag with this name exists" + + +@pytest.mark.anyio +async def test_update_tag_fails_not_found(update_tags): + response = Response(await update_tags(1, {"name": "updated"})) + + response.assert_is("IDNotFoundError") + assert response.id == 1 + assert response.message == "Tag ID not found: '1'" + + +@pytest.mark.anyio +async def test_update_tags_cannot_bulk_edit_name(update_tags, gen_tag): + first = await DB.add(next(gen_tag)) + second = await DB.add(next(gen_tag)) + + response = Response(await update_tags([first.id, second.id], {"name": "unique"})) + response.assert_is("InvalidParameterError") + + +@pytest.mark.parametrize( + "empty", + [ + None, + "", + ], + ids=[ + "none", + "empty string", + ], +) +@pytest.mark.anyio +async def test_update_tag_fails_empty_parameter(update_tags, gen_tag, empty): + tag = await DB.add(next(gen_tag)) + response = Response(await update_tags(tag.id, {"name": empty})) + + response.assert_is("InvalidParameterError") + assert response.parameter == "name" + assert response.message == "Invalid parameter 'name': cannot be empty" + + +@pytest.mark.anyio +async def test_update_tag_fails_namespace_not_found(update_tags, gen_tag): + tag = await DB.add(next(gen_tag)) + response = Response(await update_tags(tag.id, {"namespaces": {"ids": [1]}})) + response.assert_is("IDNotFoundError") + + assert response.id == 1 + assert response.message == "Namespace ID not found: '1'" + + +@pytest.mark.parametrize( + "options", + [ + None, + {}, + {"mode": "REPLACE"}, + ], + ids=[ + "by default (none)", + "by default (empty record)", + "when defined explicitly", + ], +) +@pytest.mark.anyio +async def test_update_tag_replaces_assocs(update_tags, gen_tag, options): + original_tag = await DB.add(next(gen_tag)) + new_namespace = await DB.add(Namespace(name="new")) + + input = { + "namespaces": {"ids": [new_namespace.id]}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([o.id for o in [new_namespace]]) + + +@pytest.mark.anyio +async def test_update_tag_adds_assocs(update_tags, gen_tag): + original_tag = await DB.add(next(gen_tag)) + new_namespace = await DB.add(Namespace(name="new")) + added_namespaces = original_tag.namespaces + [new_namespace] + + input = { + "namespaces": {"ids": [new_namespace.id], "options": {"mode": "ADD"}}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([o.id for o in added_namespaces]) + + +@pytest.mark.anyio +async def test_update_tag_removes_assocs(update_tags): + removed_namespace = Namespace(id=1, name="new") + remaining_namespace = Namespace(id=2, name="newtwo") + original_tag = await DB.add( + Tag(id=1, name="tag", namespaces=[removed_namespace, remaining_namespace]) + ) + + input = { + "namespaces": {"ids": [removed_namespace.id], "options": {"mode": "REMOVE"}}, + } + response = Response(await update_tags(original_tag.id, input)) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id, full=True) + + assert set([o.id for o in tag.namespaces]) == set([remaining_namespace.id]) + + +@pytest.mark.anyio +async def test_update_tag_changes_updated_at(update_tags): + original_tag = Tag(name="tag") + original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_tag = await DB.add(original_tag) + + response = Response(await update_tags(original_tag.id, {"name": "updated"})) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id) + assert tag.updated_at > original_tag.updated_at + + +@pytest.mark.anyio +async def test_update_tag_assoc_changes_updated_at(update_tags): + original_tag = Tag(name="tag") + original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) + original_tag = await DB.add(original_tag) + await DB.add(Namespace(id=1, name="namespace")) + + response = Response( + await update_tags(original_tag.id, {"namespaces": {"ids": [1]}}) + ) + response.assert_is("UpdateSuccess") + + tag = await DB.get(Tag, original_tag.id) + assert tag.updated_at > original_tag.updated_at -- cgit v1.2.3-2-gb3c3