summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Options/Types.py
blob: da54bbcc4b6c9e8aef306da4c35048a91502253b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
""" :mod:`Bcfg2.Options` provides a number of useful types for use
with the :class:`Bcfg2.Options.Option` constructor. """

import os
import re
import pwd
import grp

_COMMA_SPLIT_RE = re.compile(r'\s*,\s*')


def path(value):
    """ A generic path.  ``~`` will be expanded with
    :func:`os.path.expanduser` and the absolute resulting path will be
    used.  This does *not* ensure that the path exists. """
    return os.path.abspath(os.path.expanduser(value))


def comma_list(value):
    """ Split a comma-delimited list, with optional whitespace around
    the commas."""
    if value == '':
        return []
    return _COMMA_SPLIT_RE.split(value)


def colon_list(value):
    """ Split a colon-delimited list.  Whitespace is not allowed
    around the colons. """
    if value == '':
        return []
    return value.split(':')


def comma_dict(value):
    """ Split an option string on commas, optionally surrounded by
    whitespace, and split the resulting items again on equals signs,
    returning a dict """
    result = dict()
    if value:
        items = comma_list(value)
        for item in items:
            if '=' in item:
                key, value = item.split(r'=', 1)
                try:
                    result[key] = bool(value)
                except ValueError:
                    try:
                        result[key] = int(value)
                    except ValueError:
                        result[key] = value
            else:
                result[item] = True
    return result


def anchored_regex_list(value):
    """ Split an option string on whitespace and compile each element as
    an anchored regex """
    try:
        return [re.compile('^' + x + '$') for x in re.split(r'\s+', value)]
    except re.error:
        raise ValueError("Not a list of regexes", value)


def octal(value):
    """ Given an octal string, get an integer representation. """
    return int(value, 8)


def username(value):
    """ Given a username or numeric UID, get a numeric UID.  The user
    must exist."""
    try:
        return int(value)
    except ValueError:
        return int(pwd.getpwnam(value)[2])


def groupname(value):
    """ Given a group name or numeric GID, get a numeric GID.  The
    user must exist."""
    try:
        return int(value)
    except ValueError:
        return int(grp.getgrnam(value)[2])


def timeout(value):
    """ Convert the value into a float or None. """
    if value is None:
        return value
    rv = float(value)  # pass ValueError up the stack
    if rv <= 0:
        return None
    return rv


# pylint: disable=C0103
_bytes_multipliers = dict(k=1,
                          m=2,
                          g=3,
                          t=4)
_suffixes = "".join(_bytes_multipliers.keys()).lower()
_suffixes += _suffixes.upper()
_bytes_re = re.compile(r'(?P<value>\d+)(?P<multiplier>[%s])?' % _suffixes)
# pylint: enable=C0103


def size(value):
    """ Given a number of bytes in a human-readable format (e.g.,
    '512m', '2g'), get the absolute number of bytes as an integer.
    """
    if value == -1:
        return value
    mat = _bytes_re.match(value)
    if not mat:
        raise ValueError("Not a valid size", value)
    rvalue = int(mat.group("value"))
    mult = mat.group("multiplier")
    if mult:
        return rvalue * (1024 ** _bytes_multipliers[mult.lower()])
    else:
        return rvalue