summaryrefslogtreecommitdiffstatshomepage
path: root/tests/api/test_tag.py
diff options
context:
space:
mode:
authorWolfgang Müller2024-03-05 18:08:09 +0100
committerWolfgang Müller2024-03-05 19:25:59 +0100
commitd1d654ebac2d51e3841675faeb56480e440f622f (patch)
tree56ef123c1a15a10dfd90836e4038e27efde950c6 /tests/api/test_tag.py
downloadhircine-d1d654ebac2d51e3841675faeb56480e440f622f.tar.gz
Initial commit0.1.0
Diffstat (limited to 'tests/api/test_tag.py')
-rw-r--r--tests/api/test_tag.py441
1 files changed, 441 insertions, 0 deletions
diff --git a/tests/api/test_tag.py b/tests/api/test_tag.py
new file mode 100644
index 0000000..c863a00
--- /dev/null
+++ b/tests/api/test_tag.py
@@ -0,0 +1,441 @@
+from datetime import datetime as dt
+from datetime import timezone
+
+import pytest
+from conftest import DB, Response
+from hircine.db.models import Namespace, Tag
+
+
+@pytest.fixture
+def query_tag(execute_id):
+ query = """
+ query tag($id: Int!) {
+ tag(id: $id) {
+ __typename
+ ... on FullTag {
+ id
+ name
+ description
+ namespaces {
+ __typename
+ id
+ }
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_id(query)
+
+
+@pytest.fixture
+def query_tags(execute):
+ query = """
+ query tags {
+ tags {
+ __typename
+ count
+ edges {
+ id
+ name
+ description
+ }
+ }
+ }
+ """
+
+ return execute(query)
+
+
+@pytest.fixture
+def add_tag(execute_add):
+ mutation = """
+ mutation addTag($input: AddTagInput!) {
+ addTag(input: $input) {
+ __typename
+ ... on AddSuccess {
+ id
+ }
+ ... on Error {
+ message
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_add(mutation)
+
+
+@pytest.fixture
+def update_tags(execute_update):
+ mutation = """
+ mutation updateTags($ids: [Int!]!, $input: UpdateTagInput!) {
+ updateTags(ids: $ids, input: $input) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ ... on InvalidParameterError {
+ parameter
+ }
+ }
+ }
+ """ # noqa: E501
+
+ return execute_update(mutation)
+
+
+@pytest.fixture
+def delete_tags(execute_delete):
+ mutation = """
+ mutation deleteTags($ids: [Int!]!) {
+ deleteTags(ids: $ids) {
+ __typename
+ ... on Success {
+ message
+ }
+ ... on Error {
+ message
+ }
+ ... on IDNotFoundError {
+ id
+ }
+ }
+ }
+ """
+
+ return execute_delete(mutation)
+
+
+@pytest.mark.anyio
+async def test_query_tag(query_tag, gen_tag):
+ tag = await DB.add(next(gen_tag))
+
+ response = Response(await query_tag(tag.id))
+ response.assert_is("FullTag")
+
+ assert response.id == tag.id
+ assert response.name == tag.name
+ assert response.description == tag.description
+ assert set([n["id"] for n in response.namespaces]) == set(
+ [n.id for n in tag.namespaces]
+ )
+
+
+@pytest.mark.anyio
+async def test_query_tag_fails_not_found(query_tag):
+ response = Response(await query_tag(1))
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_query_tags(query_tags, gen_tag):
+ tags = await DB.add_all(*gen_tag)
+ response = Response(await query_tags())
+ response.assert_is("TagFilterResult")
+
+ assert response.count == len(tags)
+ assert isinstance((response.edges), list)
+ assert len(response.edges) == len(tags)
+
+ edges = iter(response.edges)
+ for tag in sorted(tags, key=lambda a: a.name):
+ edge = next(edges)
+ assert edge["id"] == tag.id
+ assert edge["name"] == tag.name
+ assert edge["description"] == tag.description
+
+
+@pytest.mark.anyio
+async def test_add_tag(add_tag):
+ response = Response(
+ await add_tag({"name": "added", "description": "it's been added!"})
+ )
+ response.assert_is("AddSuccess")
+
+ tag = await DB.get(Tag, response.id)
+ assert tag is not None
+ assert tag.name == "added"
+ assert tag.description == "it's been added!"
+
+
+@pytest.mark.anyio
+async def test_add_tag_with_namespace(add_tag):
+ namespace = await DB.add(Namespace(id=1, name="new"))
+
+ response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}}))
+ response.assert_is("AddSuccess")
+
+ tag = await DB.get(Tag, response.id, full=True)
+ assert tag is not None
+ assert tag.name == "added"
+ assert tag.namespaces[0].id == namespace.id
+ assert tag.namespaces[0].name == namespace.name
+
+
+@pytest.mark.anyio
+async def test_add_tag_fails_empty_parameter(add_tag):
+ response = Response(await add_tag({"name": ""}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_add_tag_fails_namespace_not_found(add_tag):
+ response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}}))
+ response.assert_is("IDNotFoundError")
+
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_add_tag_fails_exists(add_tag, gen_tag):
+ tag = await DB.add(next(gen_tag))
+
+ response = Response(await add_tag({"name": tag.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Tag with this name exists"
+
+
+@pytest.mark.anyio
+async def test_delete_tag(delete_tags, gen_tag):
+ tag = await DB.add(next(gen_tag))
+ id = tag.id
+
+ response = Response(await delete_tags(id))
+ response.assert_is("DeleteSuccess")
+
+ tag = await DB.get(Tag, id)
+ assert tag is None
+
+
+@pytest.mark.anyio
+async def test_delete_tag_not_found(delete_tags):
+ response = Response(await delete_tags(1))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_tag(update_tags, gen_tag, gen_namespace):
+ tag = await DB.add(next(gen_tag))
+ namespace = await DB.add(next(gen_namespace))
+
+ input = {
+ "name": "updated",
+ "description": "how different, how unique",
+ "namespaces": {"ids": [1]},
+ }
+ response = Response(await update_tags(tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, tag.id, full=True)
+ assert tag is not None
+ assert tag.name == "updated"
+ assert tag.description == "how different, how unique"
+ assert tag.namespaces[0].id == namespace.id
+ assert tag.namespaces[0].name == namespace.name
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "with None",
+ "with empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_tag_clears_description(update_tags, gen_tag, empty):
+ tag = await DB.add(next(gen_tag))
+
+ input = {
+ "description": empty,
+ }
+ response = Response(await update_tags(tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, tag.id)
+ assert tag is not None
+ assert tag.description is None
+
+
+@pytest.mark.anyio
+async def test_update_tag_fails_exists(update_tags, gen_tag):
+ first = await DB.add(next(gen_tag))
+ second = await DB.add(next(gen_tag))
+
+ response = Response(await update_tags(second.id, {"name": first.name}))
+ response.assert_is("NameExistsError")
+ assert response.message == "Another Tag with this name exists"
+
+
+@pytest.mark.anyio
+async def test_update_tag_fails_not_found(update_tags):
+ response = Response(await update_tags(1, {"name": "updated"}))
+
+ response.assert_is("IDNotFoundError")
+ assert response.id == 1
+ assert response.message == "Tag ID not found: '1'"
+
+
+@pytest.mark.anyio
+async def test_update_tags_cannot_bulk_edit_name(update_tags, gen_tag):
+ first = await DB.add(next(gen_tag))
+ second = await DB.add(next(gen_tag))
+
+ response = Response(await update_tags([first.id, second.id], {"name": "unique"}))
+ response.assert_is("InvalidParameterError")
+
+
+@pytest.mark.parametrize(
+ "empty",
+ [
+ None,
+ "",
+ ],
+ ids=[
+ "none",
+ "empty string",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_tag_fails_empty_parameter(update_tags, gen_tag, empty):
+ tag = await DB.add(next(gen_tag))
+ response = Response(await update_tags(tag.id, {"name": empty}))
+
+ response.assert_is("InvalidParameterError")
+ assert response.parameter == "name"
+ assert response.message == "Invalid parameter 'name': cannot be empty"
+
+
+@pytest.mark.anyio
+async def test_update_tag_fails_namespace_not_found(update_tags, gen_tag):
+ tag = await DB.add(next(gen_tag))
+ response = Response(await update_tags(tag.id, {"namespaces": {"ids": [1]}}))
+ response.assert_is("IDNotFoundError")
+
+ assert response.id == 1
+ assert response.message == "Namespace ID not found: '1'"
+
+
+@pytest.mark.parametrize(
+ "options",
+ [
+ None,
+ {},
+ {"mode": "REPLACE"},
+ ],
+ ids=[
+ "by default (none)",
+ "by default (empty record)",
+ "when defined explicitly",
+ ],
+)
+@pytest.mark.anyio
+async def test_update_tag_replaces_assocs(update_tags, gen_tag, options):
+ original_tag = await DB.add(next(gen_tag))
+ new_namespace = await DB.add(Namespace(name="new"))
+
+ input = {
+ "namespaces": {"ids": [new_namespace.id]},
+ }
+ response = Response(await update_tags(original_tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id, full=True)
+
+ assert set([o.id for o in tag.namespaces]) == set([o.id for o in [new_namespace]])
+
+
+@pytest.mark.anyio
+async def test_update_tag_adds_assocs(update_tags, gen_tag):
+ original_tag = await DB.add(next(gen_tag))
+ new_namespace = await DB.add(Namespace(name="new"))
+ added_namespaces = original_tag.namespaces + [new_namespace]
+
+ input = {
+ "namespaces": {"ids": [new_namespace.id], "options": {"mode": "ADD"}},
+ }
+ response = Response(await update_tags(original_tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id, full=True)
+
+ assert set([o.id for o in tag.namespaces]) == set([o.id for o in added_namespaces])
+
+
+@pytest.mark.anyio
+async def test_update_tag_removes_assocs(update_tags):
+ removed_namespace = Namespace(id=1, name="new")
+ remaining_namespace = Namespace(id=2, name="newtwo")
+ original_tag = await DB.add(
+ Tag(id=1, name="tag", namespaces=[removed_namespace, remaining_namespace])
+ )
+
+ input = {
+ "namespaces": {"ids": [removed_namespace.id], "options": {"mode": "REMOVE"}},
+ }
+ response = Response(await update_tags(original_tag.id, input))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id, full=True)
+
+ assert set([o.id for o in tag.namespaces]) == set([remaining_namespace.id])
+
+
+@pytest.mark.anyio
+async def test_update_tag_changes_updated_at(update_tags):
+ original_tag = Tag(name="tag")
+ original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_tag = await DB.add(original_tag)
+
+ response = Response(await update_tags(original_tag.id, {"name": "updated"}))
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id)
+ assert tag.updated_at > original_tag.updated_at
+
+
+@pytest.mark.anyio
+async def test_update_tag_assoc_changes_updated_at(update_tags):
+ original_tag = Tag(name="tag")
+ original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
+ original_tag = await DB.add(original_tag)
+ await DB.add(Namespace(id=1, name="namespace"))
+
+ response = Response(
+ await update_tags(original_tag.id, {"namespaces": {"ids": [1]}})
+ )
+ response.assert_is("UpdateSuccess")
+
+ tag = await DB.get(Tag, original_tag.id)
+ assert tag.updated_at > original_tag.updated_at