import csv import os import re from dataclasses import dataclass from datetime import datetime import pytz from beancount.core.number import D from beanprices import source FOLDER = "~/net/downloads" PATTERN = re.compile(r"Depot_\d{2}\.\d{2}\.\d{4}\.csv") TZ = pytz.timezone("Europe/Berlin") @dataclass class WatchlistEntry: date: datetime price_map: dict def parse_header(line): datestr = line.strip().removeprefix("Depotbewertung vom ") return datetime.strptime(datestr, "%d.%m.%Y %H:%M:%S") def parse_file(handle): date = parse_header(handle.readline()).replace(tzinfo=TZ) handle.readline() handle.readline() handle.readline() handle.readline() handle.readline() reader = csv.DictReader(handle, delimiter=";") prices = dict() for row in reader: if isin := row["ISIN"]: price = D(row["Aktueller Preis"].replace(",", ".")) prices[isin] = source.SourcePrice(price, date, row["Währung"]) return WatchlistEntry(date, prices) def get_entries(): for entry in os.scandir(os.path.expanduser(FOLDER)): if entry.is_file(): if not PATTERN.match(entry.name): continue with open(entry, encoding="cp1252") as handle: yield parse_file(handle) class Source(source.Source): def get_latest_price(self, isin): entries = sorted(get_entries(), key=lambda e: e.date, reverse=True) if not entries: return None return entries[0].price_map.get(isin, None) def get_historical_price(self, isin, time): for entry in get_entries(): if entry.date.date() == time.date(): return entry.price_map.get(isin, None) return None