From 0fb4fc20559c9a5de5a10c74c1247635a1523255 Mon Sep 17 00:00:00 2001 From: Wolfgang Müller Date: Sun, 14 Nov 2021 18:55:52 +0100 Subject: Initial commit --- ywalk/__init__.py | 0 ywalk/graph.py | 125 +++++++++++++++++++++++++++++++++++++++++++++++++ ywalk/main.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ ywalk/parser.py | 47 +++++++++++++++++++ ywalk/types.py | 46 ++++++++++++++++++ 5 files changed, 354 insertions(+) create mode 100644 ywalk/__init__.py create mode 100644 ywalk/graph.py create mode 100644 ywalk/main.py create mode 100644 ywalk/parser.py create mode 100644 ywalk/types.py (limited to 'ywalk') diff --git a/ywalk/__init__.py b/ywalk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ywalk/graph.py b/ywalk/graph.py new file mode 100644 index 0000000..18556cd --- /dev/null +++ b/ywalk/graph.py @@ -0,0 +1,125 @@ +import math + +from collections import deque +from ywalk.types import Connection, Mode + +class Graph: + WEIGHTS = { + 'least-hop': lambda c: 1, + 'least-time': lambda c: c.time + c.mode.weight, + } + + def __init__(self): + self.connections = {} + self.nodes = 0 + self.edges = 0 + self.predicates = [] + + self.set_weight() + + def add_predicate(self, predicate): + self.predicates.append(predicate) + + def set_weight(self, spec=None): + self.weight_method = Graph.WEIGHTS[spec or 'least-time'] + + def add_connection(self, conn: Connection): + for place in [conn.origin, conn.destination]: + if place not in self.connections: + self.nodes = self.nodes + 1 + self.connections[place] = set() + + if conn not in self.connections[conn.origin]: + self.edges = self.edges + 1 + self.connections[conn.origin].add(conn) + + def add_recall(self, destination): + for origin in self.get_places(): + if origin == destination: + continue + self.add_connection(Connection(origin, destination, Mode.RECALL, 0)) + + def __contains__(self, place): + return place in self.connections.keys() + + def get_connections_from(self, place): + for conn in self.connections[place]: + yield conn + + def get_connections_to(self, place): + for origin in self.get_places(): + yield from self.get_connections_between(origin, place) + + def get_connections_between(self, origin, destination): + for conn in self.connections[origin]: + if conn.destination == destination: + yield conn + + def get_places(self): + return self.connections.keys() + + def populate(self, gen): + for conn in gen: + self.add_connection(conn) + + def shortest_paths(self, origin, destination=None): + tentative = list(self.get_places()) + weights = {place: math.inf for place in self.get_places()} + hops = {} + + weights[origin] = 0 + + while tentative: + current = min(tentative, key=weights.get) + + tentative.remove(current) + + # Exit early if we were given a destination and have reached it + if destination and current == destination: + return weights, hops + + for conn in self.connections[current]: + if conn.destination not in tentative: + continue + + next_hop = conn.destination + + # Make sure to filter out any unwanted connections by setting + # their weight to infinity + if all(f(conn) for f in self.predicates): + alt_weights = weights[current] + self.weight_method(conn) + else: + alt_weights = math.inf + + if alt_weights < weights[next_hop]: + weights[next_hop] = alt_weights + hops[next_hop] = conn + + return weights, hops + + def find_path(self, *args): + stops = deque(args) + current_stop = stops.popleft() + + journey = [] + while stops: + next_stop = stops.popleft() + _, hops = self.shortest_paths(current_stop, next_stop) + + if not hops or next_stop not in hops: + return None + + path = deque() + last_hop = next_stop + + while last_hop in hops: + conn = hops[last_hop] + path.appendleft(conn) + last_hop = conn.origin + + if path is None: + return None + + journey.extend(path) + current_stop = next_stop + return journey diff --git a/ywalk/main.py b/ywalk/main.py new file mode 100644 index 0000000..3ea4145 --- /dev/null +++ b/ywalk/main.py @@ -0,0 +1,136 @@ +import argparse +import configparser +import math +import os +import sys +import ywalk.parser as Parser + +from ywalk.types import Mode, Place +from ywalk.graph import Graph + +class ParseModeRestriction(argparse.Action): + def __call__(self, parser, namespace, args, option_string=None): + arglist = set(args.split(',')) + + for arg in arglist: + if arg.upper() not in Mode.__members__: + errx(f'Not a valid Mode: {arg}') + + modes = [Mode[arg.upper()] for arg in arglist] + + saved = getattr(namespace, self.dest) or [] + saved.extend(modes) + setattr(namespace, self.dest, saved) + +# pylint: disable=line-too-long +cli = argparse.ArgumentParser() +cli.add_argument('place', type=Place, nargs='*', help='a place in Morrowind') +cli.add_argument('-w', dest='weight_spec', metavar='WEIGHTING_METHOD', help='the weighting method to apply when searching for a route') +cli.add_argument('-P', dest='deny_places', type=Place, action='append', metavar='PLACE', help='avoid traveling through this place') +cli.add_argument('-m', dest='allow_modes', action=ParseModeRestriction, metavar='MODES', help='allow only these modes of travel') +cli.add_argument('-M', dest='deny_modes', action=ParseModeRestriction, metavar='MODES', help='deny these modes of travel') +cli.add_argument('-T', dest='deny_teleport', action='store_true', help='deny teleportation') + +def errx(msg): + sys.exit(f'ywalk: {msg}') + +def pretty_print_path(path): + origin = path[0].origin + destination = path[-1].destination + + time = 0 + print(f'Start in {origin}') + for conn in path: + time = time + conn.time + + print(f' then {conn.mode.pretty} to {conn.destination}', end='') + if conn.time: + print(f' ({conn.time} {"hour" if conn.time == 1 else "hours"})') + else: + print() + + if time: + print(f'Arrive in {destination} after {time} hours.') + +def pretty_print_place(place, graph): + def pretty_print_connections(conns): + for conn in sorted(conns, key=lambda c: c.mode.value): + print(conn) + + print(f'Direct connections from {place}:') + pretty_print_connections(graph.get_connections_from(place)) + print() + + print(f'Direct connections to {place}:') + pretty_print_connections(graph.get_connections_to(place)) + + print() + print ('Reachability map:') + weights, _ = graph.shortest_paths(place) + for destination in sorted(weights.keys(), key=weights.get): + if place == destination: + continue + + reachability = "unreachable" if weights[destination] == math.inf else weights[destination] + print(f'{destination}: {reachability}') + +def get_xdg_home(xdg_type, fallback): + env = f'XDG_{xdg_type}_HOME' + if env in os.environ: + return os.environ[env] + return os.path.expanduser(fallback) + +def apply_predicates(args, graph): + if args.allow_modes: + graph.add_predicate(lambda c: c.mode in args.allow_modes) + + if args.deny_modes: + graph.add_predicate(lambda c: c.mode not in args.deny_modes) + + if args.deny_teleport: + graph.add_predicate(lambda c: not c.mode.teleport) + + if args.deny_places: + graph.add_predicate(lambda c: c.destination not in args.deny_places) + +def main(): + args = cli.parse_intermixed_args() + + config = configparser.ConfigParser() + config.read(os.path.join(get_xdg_home('CONFIG', '~/.config'), 'ywalk', 'config')) + + datafile = config.get('misc', 'data', fallback='goty') + datapath = os.path.join(get_xdg_home('DATA', '~/.local/share'), 'ywalk', f'{datafile}.tsv') + + graph = Graph() + + try: + graph.populate(Parser.parse_tsv(datapath)) + except (FileNotFoundError, Parser.InputError, ValueError) as err: + errx(err) + + recall = config.get('misc', 'recall', fallback=None) + if recall: + graph.add_recall(Place(recall)) + + for place in args.place: + if place not in graph: + errx(f'Can\'t find "{place}".') + + graph.set_weight(args.weight_spec or config.get('misc', 'weighting_method', fallback=None)) + apply_predicates(args, graph) + + if not args.place: + for place in sorted(graph.get_places(), key=lambda p: p.name): + print(place) + elif len(args.place) == 1: + pretty_print_place(args.place[0], graph) + elif len(args.place) > 1: + path = graph.find_path(*args.place) + if path is None: + errx(f'Graph exhausted, no connection between {", ".join(map(str, args.place))}') + + pretty_print_path(path) + +if __name__ == '__main__': + main() diff --git a/ywalk/parser.py b/ywalk/parser.py new file mode 100644 index 0000000..40d5f88 --- /dev/null +++ b/ywalk/parser.py @@ -0,0 +1,47 @@ +import csv + +from ywalk.types import Connection, Mode, Place + +FIELD_NAMES = ['Origin', 'Destination', 'Mode', 'Time'] + +class InputError(Exception): + def __init__(self, message, file, line, context): + super().__init__(message) + self.message = message + self.file = file + self.line = line + self.context = context + + def __str__(self): + return f'{self.file}:{self.line}: {self.message}\n {self.context}' + +def parse_row(row): + for field in FIELD_NAMES: + if row[field] is None: + raise ValueError(f'Empty field \'{field}\'') + + # This is an enum lookup, not a creation. The Enum class replaces a custom + # __new__ after class creation with one that only does lookups. + # pylint: disable=no-value-for-parameter + mode = Mode(row['Mode']) + time = int(row['Time']) + + if time < 0: + raise ValueError('Negative travel time') + + return Connection(Place(row['Origin']), Place(row['Destination']), mode, time) + +def check_fieldnames(names): + for name in FIELD_NAMES: + if name not in names: + raise ValueError(f'Missing field \'{name}\' in {names}') + +def parse_tsv(path): + with open(path, encoding='utf-8', newline='') as file: + reader = csv.DictReader(file, dialect=csv.excel_tab) + check_fieldnames(reader.fieldnames) + for line, row in enumerate(reader): + try: + yield parse_row(row) + except ValueError as err: + raise InputError(err, path, line + 1, row) from err diff --git a/ywalk/types.py b/ywalk/types.py new file mode 100644 index 0000000..33048cd --- /dev/null +++ b/ywalk/types.py @@ -0,0 +1,46 @@ +from dataclasses import dataclass +from enum import Enum + +class Mode(Enum): + ALMSIVI = 'Almsivi Intervention', 'invoke Almsivi Intervention', True, 0.25 + BOAT = 'Boat', 'take the Boat', False, 0 + CARAVAN = 'Caravan', 'use the Caravan', False, 0 + DIVINE = 'Divine Intervention', 'invoke Divine Intervention', True, 0.25 + FOOT = 'Foot', 'walk', False, 0.75 + GUIDE = 'Guild Guide', 'take the Guild Guide', True, 0.5 + PROPYLON = 'Propylon Index', 'travel by Propylon', True, 0.5 + RECALL = 'Recall', 'recall', True, 0 + RIVER_STRIDER = 'River Strider', 'take the River Strider', False, 0 + SILT_STRIDER = 'Silt Strider', 'take the Silt Strider', False, 0 + + def __new__(cls, value, pretty, teleport, weight): + obj = object.__new__(cls) + obj._value_ = value + obj.pretty = pretty + obj.weight = weight + obj.teleport = teleport + return obj + + def __str__(self) -> str: + return self.value + +@dataclass(frozen=True) +class Place: + name: str + + def __str__(self) -> str: + return self.name + +@dataclass(frozen=True) +class Connection: + origin: Place + destination: Place + mode: Mode + time: int + + def __str__(self) -> str: + time = 'instantaneous' + if self.time > 0: + time = f'{self.time} {"hour" if self.time == 1 else "hours"}' + + return f'{self.origin} -> {self.destination} ({self.mode}, {time})' -- cgit v1.2.3-2-gb3c3