from datetime import datetime as dt from datetime import timezone import pytest from conftest import DB, Response from hircine.db.models import Namespace, Tag @pytest.fixture def query_tag(execute_id): query = """ query tag($id: Int!) { tag(id: $id) { __typename ... on FullTag { id name description namespaces { __typename id } } ... on Error { message } ... on IDNotFoundError { id } } } """ return execute_id(query) @pytest.fixture def query_tags(execute): query = """ query tags { tags { __typename count edges { id name description } } } """ return execute(query) @pytest.fixture def add_tag(execute_add): mutation = """ mutation addTag($input: AddTagInput!) { addTag(input: $input) { __typename ... on AddSuccess { id } ... on Error { message } ... on InvalidParameterError { parameter } ... on IDNotFoundError { id } } } """ return execute_add(mutation) @pytest.fixture def update_tags(execute_update): mutation = """ mutation updateTags($ids: [Int!]!, $input: UpdateTagInput!) { updateTags(ids: $ids, input: $input) { __typename ... on Success { message } ... on Error { message } ... on IDNotFoundError { id } ... on InvalidParameterError { parameter } } } """ # noqa: E501 return execute_update(mutation) @pytest.fixture def delete_tags(execute_delete): mutation = """ mutation deleteTags($ids: [Int!]!) { deleteTags(ids: $ids) { __typename ... on Success { message } ... on Error { message } ... on IDNotFoundError { id } } } """ return execute_delete(mutation) @pytest.mark.anyio async def test_query_tag(query_tag, gen_tag): tag = await DB.add(next(gen_tag)) response = Response(await query_tag(tag.id)) response.assert_is("FullTag") assert response.id == tag.id assert response.name == tag.name assert response.description == tag.description assert set([n["id"] for n in response.namespaces]) == set( [n.id for n in tag.namespaces] ) @pytest.mark.anyio async def test_query_tag_fails_not_found(query_tag): response = Response(await query_tag(1)) response.assert_is("IDNotFoundError") assert response.id == 1 assert response.message == "Tag ID not found: '1'" @pytest.mark.anyio async def test_query_tags(query_tags, gen_tag): tags = await DB.add_all(*gen_tag) response = Response(await query_tags()) response.assert_is("TagFilterResult") assert response.count == len(tags) assert isinstance((response.edges), list) assert len(response.edges) == len(tags) edges = iter(response.edges) for tag in sorted(tags, key=lambda a: a.name): edge = next(edges) assert edge["id"] == tag.id assert edge["name"] == tag.name assert edge["description"] == tag.description @pytest.mark.anyio async def test_add_tag(add_tag): response = Response( await add_tag({"name": "added", "description": "it's been added!"}) ) response.assert_is("AddSuccess") tag = await DB.get(Tag, response.id) assert tag is not None assert tag.name == "added" assert tag.description == "it's been added!" @pytest.mark.anyio async def test_add_tag_with_namespace(add_tag): namespace = await DB.add(Namespace(id=1, name="new")) response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) response.assert_is("AddSuccess") tag = await DB.get(Tag, response.id, full=True) assert tag is not None assert tag.name == "added" assert tag.namespaces[0].id == namespace.id assert tag.namespaces[0].name == namespace.name @pytest.mark.anyio async def test_add_tag_fails_empty_parameter(add_tag): response = Response(await add_tag({"name": ""})) response.assert_is("InvalidParameterError") assert response.parameter == "name" assert response.message == "Invalid parameter 'name': cannot be empty" @pytest.mark.anyio async def test_add_tag_fails_namespace_not_found(add_tag): response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}})) response.assert_is("IDNotFoundError") assert response.id == 1 assert response.message == "Namespace ID not found: '1'" @pytest.mark.anyio async def test_add_tag_fails_exists(add_tag, gen_tag): tag = await DB.add(next(gen_tag)) response = Response(await add_tag({"name": tag.name})) response.assert_is("NameExistsError") assert response.message == "Another Tag with this name exists" @pytest.mark.anyio async def test_delete_tag(delete_tags, gen_tag): tag = await DB.add(next(gen_tag)) id = tag.id response = Response(await delete_tags(id)) response.assert_is("DeleteSuccess") tag = await DB.get(Tag, id) assert tag is None @pytest.mark.anyio async def test_delete_tag_not_found(delete_tags): response = Response(await delete_tags(1)) response.assert_is("IDNotFoundError") assert response.id == 1 assert response.message == "Tag ID not found: '1'" @pytest.mark.anyio async def test_update_tag(update_tags, gen_tag, gen_namespace): tag = await DB.add(next(gen_tag)) namespace = await DB.add(next(gen_namespace)) input = { "name": "updated", "description": "how different, how unique", "namespaces": {"ids": [1]}, } response = Response(await update_tags(tag.id, input)) response.assert_is("UpdateSuccess") tag = await DB.get(Tag, tag.id, full=True) assert tag is not None assert tag.name == "updated" assert tag.description == "how different, how unique" assert tag.namespaces[0].id == namespace.id assert tag.namespaces[0].name == namespace.name @pytest.mark.parametrize( "empty", [ None, "", ], ids=[ "with None", "with empty string", ], ) @pytest.mark.anyio async def test_update_tag_clears_description(update_tags, gen_tag, empty): tag = await DB.add(next(gen_tag)) input = { "description": empty, } response = Response(await update_tags(tag.id, input)) response.assert_is("UpdateSuccess") tag = await DB.get(Tag, tag.id) assert tag is not None assert tag.description is None @pytest.mark.anyio async def test_update_tag_fails_exists(update_tags, gen_tag): first = await DB.add(next(gen_tag)) second = await DB.add(next(gen_tag)) response = Response(await update_tags(second.id, {"name": first.name})) response.assert_is("NameExistsError") assert response.message == "Another Tag with this name exists" @pytest.mark.anyio async def test_update_tag_fails_not_found(update_tags): response = Response(await update_tags(1, {"name": "updated"})) response.assert_is("IDNotFoundError") assert response.id == 1 assert response.message == "Tag ID not found: '1'" @pytest.mark.anyio async def test_update_tags_cannot_bulk_edit_name(update_tags, gen_tag): first = await DB.add(next(gen_tag)) second = await DB.add(next(gen_tag)) response = Response(await update_tags([first.id, second.id], {"name": "unique"})) response.assert_is("InvalidParameterError") @pytest.mark.parametrize( "empty", [ None, "", ], ids=[ "none", "empty string", ], ) @pytest.mark.anyio async def test_update_tag_fails_empty_parameter(update_tags, gen_tag, empty): tag = await DB.add(next(gen_tag)) response = Response(await update_tags(tag.id, {"name": empty})) response.assert_is("InvalidParameterError") assert response.parameter == "name" assert response.message == "Invalid parameter 'name': cannot be empty" @pytest.mark.anyio async def test_update_tag_fails_namespace_not_found(update_tags, gen_tag): tag = await DB.add(next(gen_tag)) response = Response(await update_tags(tag.id, {"namespaces": {"ids": [1]}})) response.assert_is("IDNotFoundError") assert response.id == 1 assert response.message == "Namespace ID not found: '1'" @pytest.mark.parametrize( "options", [ None, {}, {"mode": "REPLACE"}, ], ids=[ "by default (none)", "by default (empty record)", "when defined explicitly", ], ) @pytest.mark.anyio async def test_update_tag_replaces_assocs(update_tags, gen_tag, options): original_tag = await DB.add(next(gen_tag)) new_namespace = await DB.add(Namespace(name="new")) input = { "namespaces": {"ids": [new_namespace.id]}, } response = Response(await update_tags(original_tag.id, input)) response.assert_is("UpdateSuccess") tag = await DB.get(Tag, original_tag.id, full=True) assert set([o.id for o in tag.namespaces]) == set([o.id for o in [new_namespace]]) @pytest.mark.anyio async def test_update_tag_adds_assocs(update_tags, gen_tag): original_tag = await DB.add(next(gen_tag)) new_namespace = await DB.add(Namespace(name="new")) added_namespaces = original_tag.namespaces + [new_namespace] input = { "namespaces": {"ids": [new_namespace.id], "options": {"mode": "ADD"}}, } response = Response(await update_tags(original_tag.id, input)) response.assert_is("UpdateSuccess") tag = await DB.get(Tag, original_tag.id, full=True) assert set([o.id for o in tag.namespaces]) == set([o.id for o in added_namespaces]) @pytest.mark.anyio async def test_update_tag_removes_assocs(update_tags): removed_namespace = Namespace(id=1, name="new") remaining_namespace = Namespace(id=2, name="newtwo") original_tag = await DB.add( Tag(id=1, name="tag", namespaces=[removed_namespace, remaining_namespace]) ) input = { "namespaces": {"ids": [removed_namespace.id], "options": {"mode": "REMOVE"}}, } response = Response(await update_tags(original_tag.id, input)) response.assert_is("UpdateSuccess") tag = await DB.get(Tag, original_tag.id, full=True) assert set([o.id for o in tag.namespaces]) == set([remaining_namespace.id]) @pytest.mark.anyio async def test_update_tag_changes_updated_at(update_tags): original_tag = Tag(name="tag") original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) original_tag = await DB.add(original_tag) response = Response(await update_tags(original_tag.id, {"name": "updated"})) response.assert_is("UpdateSuccess") tag = await DB.get(Tag, original_tag.id) assert tag.updated_at > original_tag.updated_at @pytest.mark.anyio async def test_update_tag_assoc_changes_updated_at(update_tags): original_tag = Tag(name="tag") original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc) original_tag = await DB.add(original_tag) await DB.add(Namespace(id=1, name="namespace")) response = Response( await update_tags(original_tag.id, {"namespaces": {"ids": [1]}}) ) response.assert_is("UpdateSuccess") tag = await DB.get(Tag, original_tag.id) assert tag.updated_at > original_tag.updated_at