#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import argparse import yaml import os import pkg_resources from dns import resolver, reversename from typing import Optional OWN_DIRECTORY = os.path.dirname(os.path.abspath(os.path.realpath(__file__))) LIB = os.path.join(OWN_DIRECTORY, '..') if os.path.exists(os.path.join(LIB, 'hostinfo')): sys.path = [LIB] + sys.path from hostinfo import printer from hostinfo import utils def _get_data(path): stream = open(path, 'r') return yaml.safe_load(stream) def _match_key(data, keys): if data is None: return None if len(keys) == 0: return data key = keys[0] rest = keys[1:] if key == 'addresses' and 'addresses' in data: return _match_key(utils.group_by(data['addresses'], 'interface'), rest) if key == 'ports' and 'ports' in data: return _match_key(utils.group_by(data['ports'], 'process', 'UNKNOWN'), rest) if isinstance(data, dict): if key in data: return _match_key(data[key], rest) return None if isinstance(data, list): for elem in data: result = _match_key(elem, keys) if result is not None: return result return None if data == key: return data return None def _match(host, search_key, search_value, negate): search_keys = search_key.split('.') result = _match_key(host, search_keys) if search_value is None: if negate: if result is not None: return (search_key, None) return (None, True) return (search_key, result) else: if result is None and negate: return (search_key, True) if result is not None: if (search_value in str(result)) != negate: return (search_key, result) return (search_key, None) def _parse_search(search): if search[0] != '?': sys.stderr.write("Invalid search string.") sys.exit(1) search = search[1:] negate = False if search[0] == '~': search = search[1:] negate = True value = None if '=' in search: search_parts = search.split('=', 2) search = search_parts[0] value = search_parts[1] return (search, value, negate) def print_search(basepath: str, flags: argparse.Namespace, search: str, filter_key: Optional[str] = None): def _get_label(host): if flags.short: return host.replace('.spline.inf.fu-berlin.de', '') return host metadata = os.path.join(basepath, 'metadata', 'hosts') if not os.path.exists(metadata): sys.stderr.write("Invalid hostinfo data. " "'metadata/hosts' not found!\n") sys.exit(1) hosts = _get_data(metadata) length = [len(_get_label(host)) for host in hosts['hosts']] search_key, search_value, negate = _parse_search(search) for host in hosts['hosts']: data = _get_data(os.path.join(basepath, host)) key, result = _match(data, search_key, search_value, negate) if result is not None: if flags.only_names: print((_get_label(host))) continue p = printer.Printer(data, flags) if filter_key is not None or flags.details: p.info(filter_key, label=_get_label(host), maxlength=max(length), force=True) else: if key is None: print((_get_label(host))) else: p.info(key, label=_get_label(host), maxlength=max(length), force=True) def print_info(path, flags, key=None): data = _get_data(path) p = printer.Printer(data, flags) p.info(key) def print_keys(path): def _print_keys(data, prefix = ''): if isinstance(data, str): return for key in list(data.keys()): print("%s%s" % (prefix, key)) if key == 'addresses': for k in list(utils.group_by(data[key], 'interface').keys()): print("%s%s.%s" % (prefix, key, k)) elif key == 'ports': for k in list(utils.group_by(data[key], 'process', 'UNKNOWN').keys()): print("%s%s.%s" % (prefix, key, k)) elif isinstance(data[key], dict): _print_keys(data[key], "%s%s." % (prefix, key)) elif isinstance(data[key], list): for value in data[key]: _print_keys(value, "%s%s." % (prefix, key)) data = _get_data(path) _print_keys(data) def print_hosts(path: str, short: bool): metadata = os.path.join(path, 'metadata', 'hosts') if os.path.exists(metadata): hosts = yaml.safe_load(open(metadata, 'r')) if 'hosts' in hosts: for host in hosts['hosts']: if short: print((host.replace('.spline.inf.fu-berlin.de',''))) else: print(host) return True sys.stderr.write("'%s' not found!\n" % metadata) return False def find_host(basepath: str, host: str): path = os.path.join(basepath, host) if os.path.exists(path): return path # try to build the fqdn path = os.path.join(basepath, "%s.spline.inf.fu-berlin.de" % host.replace('.spline.de', '')) if os.path.exists(path): return path try: # try to use reverse dns addr = reversename.from_address(host) hostname = str(resolver.query(addr,"PTR")[0]) path = os.path.join(basepath, hostname[0:-1]) except: pass if os.path.exists(path): return path return None def print_version_and_exit(): ver = None try: import version ver = version.get_git_version() except: ver = pkg_resources.require("hostinfo-tools")[0].version if ver is None: sys.stderr.write('Unable to identify the version information.') sys.exit(1) print(("hostinfo-tools %s" % ver)) sys.exit(0) def main() -> None: basepath = '/usr/local/share/hostinfo' if 'HOSTINFO_PATH' in os.environ and os.environ['HOSTINFO_PATH'] != '': basepath = os.environ['HOSTINFO_PATH'] parser = argparse.ArgumentParser() parser.add_argument("name", nargs="?") parser.add_argument("filter", nargs="?") parser.add_argument("-o", "--oneline", action="store_true", help="each line is a complete record") parser.add_argument("-f", "--file", action="store_true", help="print the path of the file the " "information is read from") parser.add_argument("-k", "--keys", action="store_true", help="print only the available keys " "(used for bash completion)") parser.add_argument("-v", "--verbose", action="store_true", help="increase output verbosity") parser.add_argument("-n", "--nospaces", action="store_true", help="remove unnecessary spaces from output") parser.add_argument("-p", "--path", default=basepath, help="set the basepath to the hostinfo data") parser.add_argument("-l", "--hosts", action="store_true", help="lists all available hosts") parser.add_argument("-s", "--short", action="store_true", help="remove the domain from the output") parser.add_argument("--only-names", action="store_true", help="only print the hostname of the matching entries") parser.add_argument("-d", "--details", action="store_true", help="print details about matching hosts") parser.add_argument("-V", "--version", action="store_true", help="only print the version number and exit") args = parser.parse_args() if args.version: print_version_and_exit() if args.path: basepath = args.path if args.hosts: if not print_hosts(basepath, args.short): sys.exit(1) sys.exit(0) if args.name is None: parser.print_help() sys.exit(1) if args.name.startswith('?'): # search print_search(basepath, search=args.name, filter_key=args.filter, flags=args) else: # info path = find_host(basepath, args.name) if path is None: sys.stderr.write("Host '%s' could not be found!\n" % args.name) sys.exit(1) if args.file: print(path) elif args.keys: print_keys(path) else: print_info(path, key=args.filter, flags=args) sys.exit(0) if __name__ == '__main__': main()