From d1d654ebac2d51e3841675faeb56480e440f622f Mon Sep 17 00:00:00 2001 From: Wolfgang Müller Date: Tue, 5 Mar 2024 18:08:09 +0100 Subject: Initial commit --- tests/scanner/test_scanner.py | 311 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 311 insertions(+) create mode 100644 tests/scanner/test_scanner.py (limited to 'tests/scanner/test_scanner.py') diff --git a/tests/scanner/test_scanner.py b/tests/scanner/test_scanner.py new file mode 100644 index 0000000..45a966f --- /dev/null +++ b/tests/scanner/test_scanner.py @@ -0,0 +1,311 @@ +import configparser +import os +import shutil +from datetime import datetime, timezone +from pathlib import Path +from zipfile import ZipFile + +import hircine.thumbnailer +import pytest +from conftest import DB +from hircine.config import DirectoryStructure +from hircine.db.models import Archive, Image, Page +from hircine.scanner import Scanner, Status +from hircine.thumbnailer import object_path + + +def pageset(pages): + return set([(page.path, page.archive_id, page.image.hash) for page in pages]) + + +@pytest.fixture +def archive(data): + stat = os.stat(data("contents/archive.zip")) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + cover = Image( + id=1, + hash="4ac228082aaf8bedc0fbd4859c5324c2acf0d1c63f9097d55e9be88d0804eaa4", + width=0, + height=0, + ) + + archive = Archive( + id=1, + path=data("contents/archive.zip"), + hash="8aa2fd72954fb9103776114172d345ad4446babf292e876a892cfbed1c283523", + size=stat.st_size, + mtime=mtime, + cover=cover, + pages=[ + Page( + id=1, + archive_id=1, + index=1, + path="01.png", + image=cover, + ), + Page( + id=2, + archive_id=1, + index=2, + path="02.png", + image=Image( + id=2, + hash="9b2c7a9c1f3d1c5a07fa1492d9d91ace5122262559c7f513e3b97464d2edb753", + width=0, + height=0, + ), + ), + Page( + id=3, + archive_id=1, + index=3, + path="03.png", + image=Image( + id=3, + hash="ed132e79daf9e93970d14d9443b7870f1aefd12aa9d3fba8cab0096984754ff5", + width=0, + height=0, + ), + ), + ], + page_count=3, + ) + + yield archive + + +@pytest.fixture +def scanner(data, monkeypatch): + monkeypatch.setattr( + hircine.thumbnailer.Thumbnailer, "process", lambda s, a, b: (0, 0) + ) + + dirs = DirectoryStructure(scan=data("contents/"), objects=data("objects/")) + yield Scanner(configparser.ConfigParser(), dirs) + + +@pytest.mark.anyio +async def test_scanner_adds_new_archive(archive, scanner, capsys): + await scanner.scan() + added_archive = await DB.get(Archive, 1, full=True) + + assert added_archive.hash == archive.hash + assert pageset(added_archive.pages) == pageset(archive.pages) + + captured = capsys.readouterr() + assert captured.out == "[+] archive.zip\n" + + +@pytest.mark.anyio +async def test_scanner_dedups_archive_contents(archive, scanner, capsys): + archive = await DB.add(archive) + + dedup_path = archive.path + ".dedup" + with ZipFile(archive.path, "r") as zin: + with ZipFile(dedup_path, "w") as zout: + for info in zin.infolist(): + base, ext = os.path.splitext(info.filename) + + if base == "03": + continue + + if ext == ".png": + zout.writestr(f"0{base}.png", zin.read(info)) + else: + zout.writestr(info.filename, zin.read(info)) + + await scanner.scan() + added_archive = await DB.get(Archive, 2, full=True) + + assert ( + added_archive.hash + == "fc2ea810eddc231824aef44db62d5f3de89b3747e4aea6b5728c1532aabdeccd" + ) + + pages = set() + for page in archive.pages: + if page.path == "03.png": + continue + + pages.add((f"0{page.path}", 2, page.image.hash)) + + assert pageset(added_archive.pages) == pages + + captured = capsys.readouterr() + assert captured.out == "[+] archive.zip.dedup\n" + + +@pytest.mark.anyio +async def test_scanner_skips_same_mtime(archive, scanner, capsys): + archive = await DB.add(archive) + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_finds_existing_before_duplicate(archive, scanner, capsys): + stat = os.stat(archive.path) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + before = await DB.add(archive) + + copy_path = before.path + ".copy" + shutil.copyfile(Path(before.path), copy_path) + + await scanner.scan() + + after = await DB.get(Archive, before.id, full=True) + assert after.hash == before.hash + assert after.path == before.path + assert after.mtime == mtime + assert pageset(after.pages) == pageset(before.pages) + + captured = capsys.readouterr() + assert captured.out == "[I] archive.zip.copy\n" + + +@pytest.mark.anyio +async def test_scanner_skips_non_zip(data, scanner, capsys): + Path(data("contents/archive.zip")).unlink() + Path(data("contents/non_zip.txt")).touch() + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_skips_link(data, scanner, capsys): + Path(data("contents/archive.zip")).rename(data("archive.zip")) + os.symlink(data("archive.zip"), data("contents/archive.zip")) + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_updates_mtime(archive, scanner, capsys): + Path(archive.path).touch() + stat = os.stat(archive.path) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + archive = await DB.add(archive) + await scanner.scan() + + updated_archive = await DB.get(Archive, archive.id, full=True) + assert updated_archive.hash == archive.hash + assert updated_archive.path == archive.path + assert updated_archive.mtime == mtime + assert pageset(updated_archive.pages) == pageset(archive.pages) + + captured = capsys.readouterr() + assert captured.out == "[*] archive.zip\n" + + +@pytest.mark.anyio +async def test_scanner_updates_path(archive, scanner, capsys): + new_path = archive.path + ".new" + + Path(archive.path).rename(new_path) + stat = os.stat(new_path) + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + + archive = await DB.add(archive) + await scanner.scan() + + updated_archive = await DB.get(Archive, archive.id, full=True) + assert updated_archive.hash == archive.hash + assert updated_archive.path == new_path + assert updated_archive.mtime == mtime + assert pageset(updated_archive.pages) == pageset(archive.pages) + + captured = capsys.readouterr() + assert captured.out == "[>] archive.zip -> archive.zip.new\n" + + +@pytest.mark.anyio +async def test_scanner_reports_missing(archive, scanner): + archive = await DB.add(archive) + Path(archive.path).unlink() + await scanner.scan() + + assert scanner.registry.orphans == {archive.hash: (archive.id, archive.path)} + + +@pytest.mark.anyio +async def test_scanner_reports_duplicate(archive, scanner, capsys): + archive = await DB.add(archive) + copy_path = archive.path + ".copy" + shutil.copyfile(Path(archive.path), copy_path) + await scanner.scan() + + assert list(scanner.registry.duplicates) == [ + [ + (archive.path, Status.UNCHANGED), + (copy_path, Status.IGNORED), + ] + ] + + captured = capsys.readouterr() + assert captured.out == "[I] archive.zip.copy\n" + + +@pytest.mark.anyio +async def test_scanner_ignores_empty_archive(archive, scanner, capsys): + Path(archive.path).unlink() + + empty_path = archive.path + ".empty" + ZipFile(empty_path, "w").close() + + await scanner.scan() + + assert scanner.registry.marked == {} + + captured = capsys.readouterr() + assert captured.out == "" + + +@pytest.mark.anyio +async def test_scanner_reports_conflict(archive, scanner, capsys): + archive = await DB.add(archive) + ZipFile(archive.path, "w").close() + + await scanner.scan() + + assert scanner.registry.conflicts == { + archive.path: ( + archive.hash, + "af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262", + ) + } + + captured = capsys.readouterr() + assert captured.out == "[!] archive.zip\n" + + +@pytest.mark.anyio +async def test_scanner_reprocess(archive, data, scanner, capsys): + await scanner.scan() + + captured = capsys.readouterr() + assert captured.out == "[+] archive.zip\n" + + old_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full"))) + old_mtime = datetime.fromtimestamp(old_stat.st_mtime, tz=timezone.utc) + + scanner.reprocess = True + + await scanner.scan() + + new_stat = os.stat(data(object_path("objects/", archive.cover.hash, "full"))) + new_mtime = datetime.fromtimestamp(new_stat.st_mtime, tz=timezone.utc) + + assert new_mtime > old_mtime + + captured = capsys.readouterr() + assert captured.out == "[~] archive.zip\n" -- cgit v1.2.3-2-gb3c3