1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
import csv
import os
import re
from dataclasses import dataclass
from datetime import datetime
import pytz
from beancount.core.number import D
from beanprices import source
FOLDER = "~/net/downloads"
PATTERN = re.compile(r"Depot_\d{2}\.\d{2}\.\d{4}\.csv")
TZ = pytz.timezone("Europe/Berlin")
@dataclass
class WatchlistEntry:
date: datetime
price_map: dict
def parse_header(line):
datestr = line.strip().removeprefix("Depotbewertung vom ")
return datetime.strptime(datestr, "%d.%m.%Y %H:%M:%S")
def parse_file(handle):
date = parse_header(handle.readline()).replace(tzinfo=TZ)
handle.readline()
handle.readline()
handle.readline()
handle.readline()
handle.readline()
reader = csv.DictReader(handle, delimiter=";")
prices = dict()
for row in reader:
if isin := row["ISIN"]:
price = D(row["Aktueller Preis"].replace(",", "."))
prices[isin] = source.SourcePrice(price, date, row["Währung"])
return WatchlistEntry(date, prices)
def get_entries():
for entry in os.scandir(os.path.expanduser(FOLDER)):
if entry.is_file():
if not PATTERN.match(entry.name):
continue
with open(entry, encoding="cp1252") as handle:
yield parse_file(handle)
class Source(source.Source):
def get_latest_price(self, isin):
entries = sorted(get_entries(), key=lambda e: e.date, reverse=True)
if not entries:
return None
return entries[0].price_map.get(isin, None)
def get_historical_price(self, isin, time):
for entry in get_entries():
if entry.date.date() == time.date():
return entry.price_map.get(isin, None)
return None
|