aboutsummaryrefslogtreecommitdiffstats
path: root/beancount_oriole/prices
diff options
context:
space:
mode:
Diffstat (limited to 'beancount_oriole/prices')
-rw-r--r--beancount_oriole/prices/__init__.py0
-rw-r--r--beancount_oriole/prices/ing_csv.py72
2 files changed, 72 insertions, 0 deletions
diff --git a/beancount_oriole/prices/__init__.py b/beancount_oriole/prices/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/beancount_oriole/prices/__init__.py
diff --git a/beancount_oriole/prices/ing_csv.py b/beancount_oriole/prices/ing_csv.py
new file mode 100644
index 0000000..a6d394d
--- /dev/null
+++ b/beancount_oriole/prices/ing_csv.py
@@ -0,0 +1,72 @@
+import csv
+import os
+import re
+from dataclasses import dataclass
+from datetime import datetime
+
+import pytz
+from beancount.core.number import D
+from beanprices import source
+
+FOLDER = "~/net/downloads"
+PATTERN = re.compile(r"Depot_\d{2}\.\d{2}\.\d{4}\.csv")
+TZ = pytz.timezone("Europe/Berlin")
+
+
+@dataclass
+class WatchlistEntry:
+ date: datetime
+ price_map: dict
+
+
+def parse_header(line):
+ datestr = line.strip().removeprefix("Depotbewertung vom ")
+
+ return datetime.strptime(datestr, "%d.%m.%Y %H:%M:%S")
+
+
+def parse_file(handle):
+ date = parse_header(handle.readline()).replace(tzinfo=TZ)
+ handle.readline()
+ handle.readline()
+
+ handle.readline()
+ handle.readline()
+ handle.readline()
+
+ reader = csv.DictReader(handle, delimiter=";")
+
+ prices = dict()
+
+ for row in reader:
+ if isin := row["ISIN"]:
+ price = D(row["Aktueller Preis"].replace(",", "."))
+ prices[isin] = source.SourcePrice(price, date, row["Währung"])
+
+ return WatchlistEntry(date, prices)
+
+
+def get_entries():
+ for entry in os.scandir(os.path.expanduser(FOLDER)):
+ if entry.is_file():
+ if not PATTERN.match(entry.name):
+ continue
+
+ with open(entry, encoding="cp1252") as handle:
+ yield parse_file(handle)
+
+
+class Source(source.Source):
+ def get_latest_price(self, isin):
+ entries = sorted(get_entries(), key=lambda e: e.date, reverse=True)
+ if not entries:
+ return None
+
+ return entries[0].price_map.get(isin, None)
+
+ def get_historical_price(self, isin, time):
+ for entry in get_entries():
+ if entry.date.date() == time.date():
+ return entry.price_map.get(isin, None)
+
+ return None