from datetime import datetime as dt
from datetime import timezone
import pytest
from conftest import DB, Response
from hircine.db.models import Namespace, Tag
@pytest.fixture
def query_tag(execute_id):
query = """
query tag($id: Int!) {
tag(id: $id) {
__typename
... on FullTag {
id
name
description
namespaces {
__typename
id
}
}
... on Error {
message
}
... on IDNotFoundError {
id
}
}
}
"""
return execute_id(query)
@pytest.fixture
def query_tags(execute):
query = """
query tags {
tags {
__typename
count
edges {
id
name
description
}
}
}
"""
return execute(query)
@pytest.fixture
def add_tag(execute_add):
mutation = """
mutation addTag($input: AddTagInput!) {
addTag(input: $input) {
__typename
... on AddSuccess {
id
}
... on Error {
message
}
... on InvalidParameterError {
parameter
}
... on IDNotFoundError {
id
}
}
}
"""
return execute_add(mutation)
@pytest.fixture
def update_tags(execute_update):
mutation = """
mutation updateTags($ids: [Int!]!, $input: UpdateTagInput!) {
updateTags(ids: $ids, input: $input) {
__typename
... on Success {
message
}
... on Error {
message
}
... on IDNotFoundError {
id
}
... on InvalidParameterError {
parameter
}
}
}
""" # noqa: E501
return execute_update(mutation)
@pytest.fixture
def delete_tags(execute_delete):
mutation = """
mutation deleteTags($ids: [Int!]!) {
deleteTags(ids: $ids) {
__typename
... on Success {
message
}
... on Error {
message
}
... on IDNotFoundError {
id
}
}
}
"""
return execute_delete(mutation)
@pytest.mark.anyio
async def test_query_tag(query_tag, gen_tag):
tag = await DB.add(next(gen_tag))
response = Response(await query_tag(tag.id))
response.assert_is("FullTag")
assert response.id == tag.id
assert response.name == tag.name
assert response.description == tag.description
assert set([n["id"] for n in response.namespaces]) == set(
[n.id for n in tag.namespaces]
)
@pytest.mark.anyio
async def test_query_tag_fails_not_found(query_tag):
response = Response(await query_tag(1))
response.assert_is("IDNotFoundError")
assert response.id == 1
assert response.message == "Tag ID not found: '1'"
@pytest.mark.anyio
async def test_query_tags(query_tags, gen_tag):
tags = await DB.add_all(*gen_tag)
response = Response(await query_tags())
response.assert_is("TagFilterResult")
assert response.count == len(tags)
assert isinstance((response.edges), list)
assert len(response.edges) == len(tags)
edges = iter(response.edges)
for tag in sorted(tags, key=lambda a: a.name):
edge = next(edges)
assert edge["id"] == tag.id
assert edge["name"] == tag.name
assert edge["description"] == tag.description
@pytest.mark.anyio
async def test_add_tag(add_tag):
response = Response(
await add_tag({"name": "added", "description": "it's been added!"})
)
response.assert_is("AddSuccess")
tag = await DB.get(Tag, response.id)
assert tag is not None
assert tag.name == "added"
assert tag.description == "it's been added!"
@pytest.mark.anyio
async def test_add_tag_with_namespace(add_tag):
namespace = await DB.add(Namespace(id=1, name="new"))
response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}}))
response.assert_is("AddSuccess")
tag = await DB.get(Tag, response.id, full=True)
assert tag is not None
assert tag.name == "added"
assert tag.namespaces[0].id == namespace.id
assert tag.namespaces[0].name == namespace.name
@pytest.mark.anyio
async def test_add_tag_fails_empty_parameter(add_tag):
response = Response(await add_tag({"name": ""}))
response.assert_is("InvalidParameterError")
assert response.parameter == "name"
assert response.message == "Invalid parameter 'name': cannot be empty"
@pytest.mark.anyio
async def test_add_tag_fails_namespace_not_found(add_tag):
response = Response(await add_tag({"name": "added", "namespaces": {"ids": [1]}}))
response.assert_is("IDNotFoundError")
assert response.id == 1
assert response.message == "Namespace ID not found: '1'"
@pytest.mark.anyio
async def test_add_tag_fails_exists(add_tag, gen_tag):
tag = await DB.add(next(gen_tag))
response = Response(await add_tag({"name": tag.name}))
response.assert_is("NameExistsError")
assert response.message == "Another Tag with this name exists"
@pytest.mark.anyio
async def test_delete_tag(delete_tags, gen_tag):
tag = await DB.add(next(gen_tag))
id = tag.id
response = Response(await delete_tags(id))
response.assert_is("DeleteSuccess")
tag = await DB.get(Tag, id)
assert tag is None
@pytest.mark.anyio
async def test_delete_tag_not_found(delete_tags):
response = Response(await delete_tags(1))
response.assert_is("IDNotFoundError")
assert response.id == 1
assert response.message == "Tag ID not found: '1'"
@pytest.mark.anyio
async def test_update_tag(update_tags, gen_tag, gen_namespace):
tag = await DB.add(next(gen_tag))
namespace = await DB.add(next(gen_namespace))
input = {
"name": "updated",
"description": "how different, how unique",
"namespaces": {"ids": [1]},
}
response = Response(await update_tags(tag.id, input))
response.assert_is("UpdateSuccess")
tag = await DB.get(Tag, tag.id, full=True)
assert tag is not None
assert tag.name == "updated"
assert tag.description == "how different, how unique"
assert tag.namespaces[0].id == namespace.id
assert tag.namespaces[0].name == namespace.name
@pytest.mark.parametrize(
"empty",
[
None,
"",
],
ids=[
"with None",
"with empty string",
],
)
@pytest.mark.anyio
async def test_update_tag_clears_description(update_tags, gen_tag, empty):
tag = await DB.add(next(gen_tag))
input = {
"description": empty,
}
response = Response(await update_tags(tag.id, input))
response.assert_is("UpdateSuccess")
tag = await DB.get(Tag, tag.id)
assert tag is not None
assert tag.description is None
@pytest.mark.anyio
async def test_update_tag_fails_exists(update_tags, gen_tag):
first = await DB.add(next(gen_tag))
second = await DB.add(next(gen_tag))
response = Response(await update_tags(second.id, {"name": first.name}))
response.assert_is("NameExistsError")
assert response.message == "Another Tag with this name exists"
@pytest.mark.anyio
async def test_update_tag_fails_not_found(update_tags):
response = Response(await update_tags(1, {"name": "updated"}))
response.assert_is("IDNotFoundError")
assert response.id == 1
assert response.message == "Tag ID not found: '1'"
@pytest.mark.anyio
async def test_update_tags_cannot_bulk_edit_name(update_tags, gen_tag):
first = await DB.add(next(gen_tag))
second = await DB.add(next(gen_tag))
response = Response(await update_tags([first.id, second.id], {"name": "unique"}))
response.assert_is("InvalidParameterError")
@pytest.mark.parametrize(
"empty",
[
None,
"",
],
ids=[
"none",
"empty string",
],
)
@pytest.mark.anyio
async def test_update_tag_fails_empty_parameter(update_tags, gen_tag, empty):
tag = await DB.add(next(gen_tag))
response = Response(await update_tags(tag.id, {"name": empty}))
response.assert_is("InvalidParameterError")
assert response.parameter == "name"
assert response.message == "Invalid parameter 'name': cannot be empty"
@pytest.mark.anyio
async def test_update_tag_fails_namespace_not_found(update_tags, gen_tag):
tag = await DB.add(next(gen_tag))
response = Response(await update_tags(tag.id, {"namespaces": {"ids": [1]}}))
response.assert_is("IDNotFoundError")
assert response.id == 1
assert response.message == "Namespace ID not found: '1'"
@pytest.mark.parametrize(
"options",
[
None,
{},
{"mode": "REPLACE"},
],
ids=[
"by default (none)",
"by default (empty record)",
"when defined explicitly",
],
)
@pytest.mark.anyio
async def test_update_tag_replaces_assocs(update_tags, gen_tag, options):
original_tag = await DB.add(next(gen_tag))
new_namespace = await DB.add(Namespace(name="new"))
input = {
"namespaces": {"ids": [new_namespace.id]},
}
response = Response(await update_tags(original_tag.id, input))
response.assert_is("UpdateSuccess")
tag = await DB.get(Tag, original_tag.id, full=True)
assert set([o.id for o in tag.namespaces]) == set([o.id for o in [new_namespace]])
@pytest.mark.anyio
async def test_update_tag_adds_assocs(update_tags, gen_tag):
original_tag = await DB.add(next(gen_tag))
new_namespace = await DB.add(Namespace(name="new"))
added_namespaces = original_tag.namespaces + [new_namespace]
input = {
"namespaces": {"ids": [new_namespace.id], "options": {"mode": "ADD"}},
}
response = Response(await update_tags(original_tag.id, input))
response.assert_is("UpdateSuccess")
tag = await DB.get(Tag, original_tag.id, full=True)
assert set([o.id for o in tag.namespaces]) == set([o.id for o in added_namespaces])
@pytest.mark.anyio
async def test_update_tag_removes_assocs(update_tags):
removed_namespace = Namespace(id=1, name="new")
remaining_namespace = Namespace(id=2, name="newtwo")
original_tag = await DB.add(
Tag(id=1, name="tag", namespaces=[removed_namespace, remaining_namespace])
)
input = {
"namespaces": {"ids": [removed_namespace.id], "options": {"mode": "REMOVE"}},
}
response = Response(await update_tags(original_tag.id, input))
response.assert_is("UpdateSuccess")
tag = await DB.get(Tag, original_tag.id, full=True)
assert set([o.id for o in tag.namespaces]) == set([remaining_namespace.id])
@pytest.mark.anyio
async def test_update_tag_changes_updated_at(update_tags):
original_tag = Tag(name="tag")
original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
original_tag = await DB.add(original_tag)
response = Response(await update_tags(original_tag.id, {"name": "updated"}))
response.assert_is("UpdateSuccess")
tag = await DB.get(Tag, original_tag.id)
assert tag.updated_at > original_tag.updated_at
@pytest.mark.anyio
async def test_update_tag_assoc_changes_updated_at(update_tags):
original_tag = Tag(name="tag")
original_tag.updated_at = dt(2023, 1, 1, tzinfo=timezone.utc)
original_tag = await DB.add(original_tag)
await DB.add(Namespace(id=1, name="namespace"))
response = Response(
await update_tags(original_tag.id, {"namespaces": {"ids": [1]}})
)
response.assert_is("UpdateSuccess")
tag = await DB.get(Tag, original_tag.id)
assert tag.updated_at > original_tag.updated_at