summaryrefslogtreecommitdiffstatshomepage
path: root/tests/scanner/test_scanner.py
diff options
context:
space:
mode:
authorWolfgang Müller2024-03-05 18:08:09 +0100
committerWolfgang Müller2024-03-05 19:25:59 +0100
commitd1d654ebac2d51e3841675faeb56480e440f622f (patch)
tree56ef123c1a15a10dfd90836e4038e27efde950c6 /tests/scanner/test_scanner.py
downloadhircine-0.1.0.tar.gz
Initial commit0.1.0
Diffstat (limited to '')
-rw-r--r--tests/scanner/test_scanner.py311
1 files changed, 311 insertions, 0 deletions
diff --git a/tests/scanner/test_scanner.py b/tests/scanner/test_scanner.py
new file mode 100644
index 0000000..45a966f
--- /dev/null
+++ b/tests/scanner/test_scanner.py
@@ -0,0 +1,311 @@
+import configparser
+import os
+import shutil
+from datetime import datetime, timezone
+from pathlib import Path
+from zipfile import ZipFile
+
+import hircine.thumbnailer
+import pytest
+from conftest import DB
+from hircine.config import DirectoryStructure
+from hircine.db.models import Archive, Image, Page
+from hircine.scanner import Scanner, Status
+from hircine.thumbnailer import object_path
+
+
+def pageset(pages):
+ return set([(page.path, page.archive_id, page.image.hash) for page in pages])
+
+
+@pytest.fixture
+def archive(data):
+ stat = os.stat(data("contents/archive.zip"))
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ cover = Image(
+ id=1,
+ hash="4ac228082aaf8bedc0fbd4859c5324c2acf0d1c63f9097d55e9be88d0804eaa4",
+ width=0,
+ height=0,
+ )
+
+ archive = Archive(
+ id=1,
+ path=data("contents/archive.zip"),
+ hash="8aa2fd72954fb9103776114172d345ad4446babf292e876a892cfbed1c283523",
+ size=stat.st_size,
+ mtime=mtime,
+ cover=cover,
+ pages=[
+ Page(
+ id=1,
+ archive_id=1,
+ index=1,
+ path="01.png",
+ image=cover,
+ ),
+ Page(
+ id=2,
+ archive_id=1,
+ index=2,
+ path="02.png",
+ image=Image(
+ id=2,
+ hash="9b2c7a9c1f3d1c5a07fa1492d9d91ace5122262559c7f513e3b97464d2edb753",
+ width=0,
+ height=0,
+ ),
+ ),
+ Page(
+ id=3,
+ archive_id=1,
+ index=3,
+ path="03.png",
+ image=Image(
+ id=3,
+ hash="ed132e79daf9e93970d14d9443b7870f1aefd12aa9d3fba8cab0096984754ff5",
+ width=0,
+ height=0,
+ ),
+ ),
+ ],
+ page_count=3,
+ )
+
+ yield archive
+
+
+@pytest.fixture
+def scanner(data, monkeypatch):
+ monkeypatch.setattr(
+ hircine.thumbnailer.Thumbnailer, "process", lambda s, a, b: (0, 0)
+ )
+
+ dirs = DirectoryStructure(scan=data("contents/"), objects=data("objects/"))
+ yield Scanner(configparser.ConfigParser(), dirs)
+
+
+@pytest.mark.anyio
+async def test_scanner_adds_new_archive(archive, scanner, capsys):
+ await scanner.scan()
+ added_archive = await DB.get(Archive, 1, full=True)
+
+ assert added_archive.hash == archive.hash
+ assert pageset(added_archive.pages) == pageset(archive.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[+] archive.zip\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_dedups_archive_contents(archive, scanner, capsys):
+ archive = await DB.add(archive)
+
+ dedup_path = archive.path + ".dedup"
+ with ZipFile(archive.path, "r") as zin:
+ with ZipFile(dedup_path, "w") as zout:
+ for info in zin.infolist():
+ base, ext = os.path.splitext(info.filename)
+
+ if base == "03":
+ continue
+
+ if ext == ".png":
+ zout.writestr(f"0{base}.png", zin.read(info))
+ else:
+ zout.writestr(info.filename, zin.read(info))
+
+ await scanner.scan()
+ added_archive = await DB.get(Archive, 2, full=True)
+
+ assert (
+ added_archive.hash
+ == "fc2ea810eddc231824aef44db62d5f3de89b3747e4aea6b5728c1532aabdeccd"
+ )
+
+ pages = set()
+ for page in archive.pages:
+ if page.path == "03.png":
+ continue
+
+ pages.add((f"0{page.path}", 2, page.image.hash))
+
+ assert pageset(added_archive.pages) == pages
+
+ captured = capsys.readouterr()
+ assert captured.out == "[+] archive.zip.dedup\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_skips_same_mtime(archive, scanner, capsys):
+ archive = await DB.add(archive)
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_finds_existing_before_duplicate(archive, scanner, capsys):
+ stat = os.stat(archive.path)
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ before = await DB.add(archive)
+
+ copy_path = before.path + ".copy"
+ shutil.copyfile(Path(before.path), copy_path)
+
+ await scanner.scan()
+
+ after = await DB.get(Archive, before.id, full=True)
+ assert after.hash == before.hash
+ assert after.path == before.path
+ assert after.mtime == mtime
+ assert pageset(after.pages) == pageset(before.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[I] archive.zip.copy\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_skips_non_zip(data, scanner, capsys):
+ Path(data("contents/archive.zip")).unlink()
+ Path(data("contents/non_zip.txt")).touch()
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_skips_link(data, scanner, capsys):
+ Path(data("contents/archive.zip")).rename(data("archive.zip"))
+ os.symlink(data("archive.zip"), data("contents/archive.zip"))
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_updates_mtime(archive, scanner, capsys):
+ Path(archive.path).touch()
+ stat = os.stat(archive.path)
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ archive = await DB.add(archive)
+ await scanner.scan()
+
+ updated_archive = await DB.get(Archive, archive.id, full=True)
+ assert updated_archive.hash == archive.hash
+ assert updated_archive.path == archive.path
+ assert updated_archive.mtime == mtime
+ assert pageset(updated_archive.pages) == pageset(archive.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[*] archive.zip\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_updates_path(archive, scanner, capsys):
+ new_path = archive.path + ".new"
+
+ Path(archive.path).rename(new_path)
+ stat = os.stat(new_path)
+ mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
+
+ archive = await DB.add(archive)
+ await scanner.scan()
+
+ updated_archive = await DB.get(Archive, archive.id, full=True)
+ assert updated_archive.hash == archive.hash
+ assert updated_archive.path == new_path
+ assert updated_archive.mtime == mtime
+ assert pageset(updated_archive.pages) == pageset(archive.pages)
+
+ captured = capsys.readouterr()
+ assert captured.out == "[>] archive.zip -> archive.zip.new\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_reports_missing(archive, scanner):
+ archive = await DB.add(archive)
+ Path(archive.path).unlink()
+ await scanner.scan()
+
+ assert scanner.registry.orphans == {archive.hash: (archive.id, archive.path)}
+
+
+@pytest.mark.anyio
+async def test_scanner_reports_duplicate(archive, scanner, capsys):
+ archive = await DB.add(archive)
+ copy_path = archive.path + ".copy"
+ shutil.copyfile(Path(archive.path), copy_path)
+ await scanner.scan()
+
+ assert list(scanner.registry.duplicates) == [
+ [
+ (archive.path, Status.UNCHANGED),
+ (copy_path, Status.IGNORED),
+ ]
+ ]
+
+ captured = capsys.readouterr()
+ assert captured.out == "[I] archive.zip.copy\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_ignores_empty_archive(archive, scanner, capsys):
+ Path(archive.path).unlink()
+
+ empty_path = archive.path + ".empty"
+ ZipFile(empty_path, "w").close()
+
+ await scanner.scan()
+
+ assert scanner.registry.marked == {}
+
+ captured = capsys.readouterr()
+ assert captured.out == ""
+
+
+@pytest.mark.anyio
+async def test_scanner_reports_conflict(archive, scanner, capsys):
+ archive = await DB.add(archive)
+ ZipFile(archive.path, "w").close()
+
+ await scanner.scan()
+
+ assert scanner.registry.conflicts == {
+ archive.path: (
+ archive.hash,
+ "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262",
+ )
+ }
+
+ captured = capsys.readouterr()
+ assert captured.out == "[!] archive.zip\n"
+
+
+@pytest.mark.anyio
+async def test_scanner_reprocess(archive, data, scanner, capsys):
+ await scanner.scan()
+
+ captured = capsys.readouterr()
+ assert captured.out == "[+] archive.zip\n"
+
+ old_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full")))
+ old_mtime = datetime.fromtimestamp(old_stat.st_mtime, tz=timezone.utc)
+
+ scanner.reprocess = True
+
+ await scanner.scan()
+
+ new_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full")))
+ new_mtime = datetime.fromtimestamp(new_stat.st_mtime, tz=timezone.utc)
+
+ assert new_mtime > old_mtime
+
+ captured = capsys.readouterr()
+ assert captured.out == "[~] archive.zip\n"