summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonah BrĂ¼chert <jbb@kaidan.im>2024-04-16 18:43:06 +0200
committerJonah BrĂ¼chert <jbb@kaidan.im>2024-04-16 18:43:06 +0200
commit3fcbc1586ef863ef7f503fd35df3f59c3df4b9f6 (patch)
tree9ea3b49f171f79df8a72d0373c25141784c8c6b2
parentb695b5b3b12448b501686a108440787fa2395ac7 (diff)
parent9f6e876e14a074c386660c582c7f78e1503877aa (diff)
downloadtools-debian.tar.gz
tools-debian.tar.bz2
tools-debian.zip
Merge branch 'master' into debiandebian
-rw-r--r--.gitignore1
-rwxr-xr-xbin/hostinfo55
-rw-r--r--hostinfo/prefix.py11
-rw-r--r--hostinfo/printer.py89
-rwxr-xr-xsetup.py6
-rw-r--r--version.py54
6 files changed, 83 insertions, 133 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..751553b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+*.bak
diff --git a/bin/hostinfo b/bin/hostinfo
index 8700be7..639102c 100755
--- a/bin/hostinfo
+++ b/bin/hostinfo
@@ -8,6 +8,8 @@ import os
import pkg_resources
from dns import resolver, reversename
+from typing import Optional
+
OWN_DIRECTORY = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
LIB = os.path.join(OWN_DIRECTORY, '..')
if os.path.exists(os.path.join(LIB, 'hostinfo')):
@@ -16,9 +18,11 @@ if os.path.exists(os.path.join(LIB, 'hostinfo')):
from hostinfo import printer
from hostinfo import utils
+
def _get_data(path):
- stream = file(path, 'r')
- return yaml.load(stream)
+ stream = open(path, 'r')
+ return yaml.safe_load(stream)
+
def _match_key(data, keys):
if data is None:
@@ -53,6 +57,7 @@ def _match_key(data, keys):
return None
+
def _match(host, search_key, search_value, negate):
search_keys = search_key.split('.')
@@ -71,6 +76,7 @@ def _match(host, search_key, search_value, negate):
return (search_key, result)
return (search_key, None)
+
def _parse_search(search):
if search[0] != '?':
sys.stderr.write("Invalid search string.")
@@ -90,10 +96,13 @@ def _parse_search(search):
return (search, value, negate)
-def print_search(basepath, flags, search, filter_key=None):
+
+def print_search(basepath: str, flags: argparse.Namespace,
+ search: str, filter_key: Optional[str] = None):
def _get_label(host):
if flags.short:
- return host.replace('.spline.inf.fu-berlin.de','')
+ return host.replace('.spline.inf.fu-berlin.de',
+ '')
return host
metadata = os.path.join(basepath, 'metadata', 'hosts')
@@ -110,7 +119,7 @@ def print_search(basepath, flags, search, filter_key=None):
key, result = _match(data, search_key, search_value, negate)
if result is not None:
if flags.only_names:
- print(_get_label(host))
+ print((_get_label(host)))
continue
p = printer.Printer(data, flags)
@@ -119,30 +128,32 @@ def print_search(basepath, flags, search, filter_key=None):
maxlength=max(length), force=True)
else:
if key is None:
- print(_get_label(host))
+ print((_get_label(host)))
else:
p.info(key, label=_get_label(host),
maxlength=max(length), force=True)
+
def print_info(path, flags, key=None):
data = _get_data(path)
p = printer.Printer(data, flags)
p.info(key)
+
def print_keys(path):
def _print_keys(data, prefix = ''):
if isinstance(data, str):
return
- for key in data.keys():
- print "%s%s" % (prefix, key)
+ for key in list(data.keys()):
+ print("%s%s" % (prefix, key))
if key == 'addresses':
- for k in utils.group_by(data[key], 'interface').keys():
- print "%s%s.%s" % (prefix, key, k)
+ for k in list(utils.group_by(data[key], 'interface').keys()):
+ print("%s%s.%s" % (prefix, key, k))
elif key == 'ports':
- for k in utils.group_by(data[key], 'process', 'UNKNOWN').keys():
- print "%s%s.%s" % (prefix, key, k)
+ for k in list(utils.group_by(data[key], 'process', 'UNKNOWN').keys()):
+ print("%s%s.%s" % (prefix, key, k))
elif isinstance(data[key], dict):
_print_keys(data[key], "%s%s." % (prefix, key))
elif isinstance(data[key], list):
@@ -152,14 +163,15 @@ def print_keys(path):
data = _get_data(path)
_print_keys(data)
-def print_hosts(path, short):
+
+def print_hosts(path: str, short: bool):
metadata = os.path.join(path, 'metadata', 'hosts')
if os.path.exists(metadata):
- hosts = yaml.load(file(metadata, 'r'))
+ hosts = yaml.safe_load(open(metadata, 'r'))
if 'hosts' in hosts:
for host in hosts['hosts']:
if short:
- print(host.replace('.spline.inf.fu-berlin.de',''))
+ print((host.replace('.spline.inf.fu-berlin.de','')))
else:
print(host)
return True
@@ -167,7 +179,8 @@ def print_hosts(path, short):
sys.stderr.write("'%s' not found!\n" % metadata)
return False
-def find_host(basepath, host):
+
+def find_host(basepath: str, host: str):
path = os.path.join(basepath, host)
if os.path.exists(path):
return path
@@ -191,6 +204,7 @@ def find_host(basepath, host):
return None
+
def print_version_and_exit():
ver = None
try:
@@ -202,10 +216,11 @@ def print_version_and_exit():
if ver is None:
sys.stderr.write('Unable to identify the version information.')
sys.exit(1)
- print("hostinfo-tools %s" % ver)
+ print(("hostinfo-tools %s" % ver))
sys.exit(0)
-def main():
+
+def main() -> None:
basepath = '/usr/local/share/hostinfo'
if 'HOSTINFO_PATH' in os.environ and os.environ['HOSTINFO_PATH'] != '':
basepath = os.environ['HOSTINFO_PATH']
@@ -256,7 +271,8 @@ def main():
if args.name.startswith('?'):
# search
- print_search(basepath, search=args.name, filter_key=args.filter, flags=args)
+ print_search(basepath, search=args.name, filter_key=args.filter,
+ flags=args)
else:
# info
path = find_host(basepath, args.name)
@@ -273,5 +289,6 @@ def main():
sys.exit(0)
+
if __name__ == '__main__':
main()
diff --git a/hostinfo/prefix.py b/hostinfo/prefix.py
index e1b72b5..7233c48 100644
--- a/hostinfo/prefix.py
+++ b/hostinfo/prefix.py
@@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
-from __future__ import print_function
+class Flags:
+ oneline: bool = False
+ nospaces: bool = False
+
class Printer:
- flags = list()
+ flags = Flags()
def __init__(self, full_key='', printer=None):
if printer is None:
@@ -20,7 +23,7 @@ class Printer:
def set_label(self, label='', maxlength=0):
self.label = self._get_label(label, maxlength)
- def _get_label(self, label, maxlength):
+ def _get_label(self, label: str, maxlength: int):
if label == '':
return label
@@ -33,7 +36,7 @@ class Printer:
else:
return label.ljust(maxlength+2)
- def pprint(self, data):
+ def pprint(self, data: str):
self.output("%s%s" % (self.label, data))
self.has_output = True
if not self.empty:
diff --git a/hostinfo/printer.py b/hostinfo/printer.py
index ea612a5..4054df2 100644
--- a/hostinfo/printer.py
+++ b/hostinfo/printer.py
@@ -1,27 +1,33 @@
# -*- coding: utf-8 -*-
-from __future__ import absolute_import
+
from hostinfo import prefix
from hostinfo import utils
+from typing import Iterable, Optional, Any, Union
+
+
def _get_full_key(prev_key, key):
if prev_key == '':
return key
return "%s.%s" % (prev_key, key)
-def _sort_with_list(iterable, sort):
- def helper(value):
+
+def _sort_with_list(iterable: Iterable, sort: list[str]):
+ def helper(value) -> str:
if sort is None:
return value
(key, _) = value
if key in sort:
- return sort.index(key)
+ return str(sort.index(key))
return "%d %s" % (len(sort), key)
return sorted(iterable, key=helper)
-def _space(filter_key, full_key, printer, force=False):
+
+def _space(filter_key: Optional[str], full_key: str,
+ printer: prefix.Printer, force=False):
if filter_key is None and full_key == '':
printer.space(force)
@@ -46,49 +52,21 @@ class Printer:
self.flags = flags
prefix.Printer.flags = flags
- def cb_print_addresses(self, value, full_key, filter_key):
- def _print_ip(address):
- return '%s/%s' % (address['address'], address['netmask'])
-
- display_check = self._is_group_displayed(full_key, filter_key)
- return utils.group_by(value, 'interface', None, display_check, _print_ip)
-
- def cb_print_ports(self, value, full_key, filter_key):
- def _print_port(port):
- if port['proto'] in ['tcp6', 'udp6']:
- return '(%s) [%s]:%s' % (port['proto'].replace('6', ''),
- port['ip'], port['port'])
- return '(%s) %s:%s' % (port['proto'], port['ip'], port['port'])
-
- display_check = self._is_group_displayed(full_key, filter_key)
- return (utils.group_by(value, 'process', 'UNKNOWN', display_check, _print_port),
- ['sshd', 'nrpe', 'munin-node'])
-
- def cb_print_vserver(self, value, full_key, filter_key):
- if value == 'guest' and 'vserver_host' in self.data and \
- self.data['vserver_host'] is not None:
- return 'guest running on %s' % self.data['vserver_host']
- else:
- return value
-
- def _should_display(self, full_key, filter_key=None):
+ def _should_display(self, full_key,
+ filter_key: Optional[str] = None):
if full_key not in self.ignore:
return True
if filter_key is not None and filter_key.startswith(full_key):
return True
return False
- def _is_group_displayed(self, prev_key, filter_key):
- return (lambda key: self._should_display(_get_full_key(prev_key, key),
- filter_key))
-
- def _print(self, value, printer, filter_key=None, sort=None, force=False):
- try:
- value = value.strip().splitlines()
- except AttributeError:
- pass
-
- if isinstance(value, dict):
+ def _print(self, value: Union[str, dict[str, Any], list[str]],
+ printer: prefix.Printer,
+ filter_key: Optional[str] = None,
+ sort=None, force=False):
+ if isinstance(value, str):
+ self._print_list(value.strip().splitlines(), printer, filter_key)
+ elif isinstance(value, dict):
self._print_dict(value, printer, filter_key, sort, force)
elif isinstance(value, list):
self._print_list(value, printer, filter_key)
@@ -98,7 +76,8 @@ class Printer:
else:
self._print_value(value, printer, filter_key)
- def _print_key(self, key, value, printer, filter_key):
+ def _print_key(self, key: str, value: str,
+ printer: prefix.Printer, filter_key):
sort = None
try:
method = getattr(self, 'cb_print_%s' % key.replace('.', '_'))
@@ -113,29 +92,33 @@ class Printer:
self._print(value, printer, filter_key, sort)
- def _print_value(self, value, printer, filter_key):
+ def _print_value(self, value: str, printer: prefix.Printer,
+ filter_key: Optional[str]):
full_key = _get_full_key(printer.full_key, value)
if self._should_display(full_key, filter_key) and \
filter_key is None or full_key == filter_key:
printer.pprint(value)
- def _print_list(self, values, printer, filter_key):
+ def _print_list(self, values: list[str],
+ printer: prefix.Printer,
+ filter_key: Optional[str]):
for value in values:
- if isinstance(value, basestring):
+ if isinstance(value, str):
self._print_value(value, printer, filter_key)
else:
self._print(value, printer, filter_key)
- def _print_dict(self, value, printer, filter_key, sort, force):
+ def _print_dict(self, value: dict[str, str], printer: prefix.Printer,
+ filter_key: Optional[str], sort: list, force: bool):
keys = _sort_with_list(
- [(key, full_key) for key in value.keys()
+ [(key, full_key) for key in list(value.keys())
for full_key in [_get_full_key(printer.full_key, key)]
if self._should_display(full_key, filter_key)],
sort)
if len(keys) == 0:
return
- maxlength = max([len(self._get_label(key, full_key)) \
+ maxlength = max([len(self._get_label(key, full_key))
for (key, full_key) in keys])
_space(filter_key, printer.full_key, printer, True)
@@ -152,7 +135,8 @@ class Printer:
found = True
new_printer = prefix.Printer(full_key, printer)
if filter_key is None:
- new_printer.set_label(self._get_label(key, full_key), maxlength)
+ new_printer.set_label(self._get_label(key, full_key),
+ maxlength)
self._print_key(full_key, value[key], new_printer, new_filter_key)
_space(filter_key, printer.full_key, new_printer)
@@ -160,12 +144,13 @@ class Printer:
if force and not found:
printer.pprint('')
- def _get_label(self, key, full_key):
+ def _get_label(self, key: str, full_key: str):
if full_key in self.labels:
return self.labels[full_key]
return key
- def info(self, key, label=None, maxlength=0, force=False):
+ def info(self, key: Optional[str], label: Optional[str] = None,
+ maxlength=0, force=False):
printer = prefix.Printer()
if label is not None:
printer.set_label(label, maxlength)
diff --git a/setup.py b/setup.py
index 44f74a8..be96542 100755
--- a/setup.py
+++ b/setup.py
@@ -1,14 +1,12 @@
#!/usr/bin/env python
from distutils.core import setup
-from version import *
setup(name='hostinfo-tools',
- version=get_git_version(),
+ version="0.2.3",
description='Hostinfo database interface scripts',
author='Alexander Sulfrian',
author_email='alex@spline.inf.fu-berlin.de',
url='http://git.spline.inf.fu-berlin.de/hostinfo-tools/',
packages=['hostinfo'],
- scripts=['bin/hostinfo'],
- )
+ scripts=['bin/hostinfo'])
diff --git a/version.py b/version.py
deleted file mode 100644
index 08bc587..0000000
--- a/version.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# -*- coding: utf-8 -*-
-# To use this script, simply import it your setup.py file, and use the
-# results of get_git_version() as your package version:
-#
-# from version import *
-#
-# setup(
-# version=get_git_version(),
-# .
-# .
-# .
-# )
-
-__all__ = ["get_git_version"]
-
-import os
-import re
-from subprocess import Popen, PIPE
-
-OWN_DIR = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
-
-def call_git_describe(abbrev=4):
- try:
- p = Popen(['git', 'describe', '--abbrev=%d' % abbrev, '--tags'],
- cwd=OWN_DIR, stdout=PIPE, stderr=PIPE)
- p.stderr.close()
- line = p.stdout.readlines()[0]
- return line.strip()
-
- except:
- return None
-
-def parse_debian_changelog():
- try:
- from debian import changelog
-
- with open(os.path.join(OWN_DIR, 'debian', 'changelog')) as cfile:
- clog = changelog.Changelog(cfile)
- return str(clog.get_version())
- except:
- return None
-
-def get_git_version(abbrev=4):
- version = call_git_describe(abbrev)
- if version is None:
- version = parse_debian_changelog()
-
- if version is None:
- raise ValueError("Cannot find the version number!")
-
- return re.sub('^debian/', '', version)
-
-if __name__ == "__main__":
- print get_git_version()