summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/Plugins/ACL.py
blob: 37f51a2a161b121962f54de968c8791b04c8101e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
""" Support for client ACLs based on IP address and client metadata """

import os
import struct
import socket
import Bcfg2.Server.Plugin


def rmi_names_equal(first, second):
    """ Compare two XML-RPC method names and see if they match.
    Resolves some limited wildcards; see
    :ref:`server-plugins-misc-acl-wildcards` for details.

    :param first: One of the ACLs to compare
    :type first: string
    :param second: The other ACL to compare
    :type second: string
    :returns: bool """
    if first == second:
        # single wildcard is special, and matches everything
        return True
    if first is None or second is None:
        return False
    if '*' not in first + second:
        # no wildcards, and not exactly equal
        return False
    first_parts = first.split('.')
    second_parts = second.split('.')
    if len(first_parts) != len(second_parts):
        return False
    for i in range(len(first_parts)):
        if (first_parts[i] != second_parts[i] and first_parts[i] != '*' and
                second_parts[i] != '*'):
            return False
    return True


def ip2int(ip):
    """ convert a dotted-quad IP address into an integer
    representation of the same """
    return struct.unpack('>L', socket.inet_pton(socket.AF_INET, ip))[0]


def ip_matches(ip, entry):
    """ Return True if the given IP matches the IP or IP and netmask
    in the given ACL entry; False otherwise """
    if entry.get("netmask"):
        try:
            mask = int("1" * int(entry.get("netmask")) +
                       "0" * (32 - int(entry.get("netmask"))), 2)
        except ValueError:
            mask = ip2int(entry.get("netmask"))
        return ip2int(ip) & mask == ip2int(entry.get("address")) & mask
    elif entry.get("address") is None:
        # no address, no netmask -- match all
        return True
    elif ip == entry.get("address"):
        # just a plain ip address
        return True
    return False


class IPACLFile(Bcfg2.Server.Plugin.XMLFileBacked):
    """ representation of ACL ip.xml, for IP-based ACLs """
    __identifier__ = None
    actions = dict(Allow=True,
                   Deny=False,
                   Defer=None)

    def check_acl(self, address, rmi):
        """ Check a client address against the ACL list """
        if not len(self.entries):
            # default defer if no ACLs are defined.
            self.debug_log("ACL: %s requests %s: No IP ACLs, defer" %
                           (address, rmi))
            return self.actions["Defer"]
        for entry in self.entries:
            if (ip_matches(address, entry) and
                    rmi_names_equal(entry.get("method"), rmi)):
                self.debug_log("ACL: %s requests %s: Found matching IP ACL, "
                               "%s" % (address, rmi, entry.tag.lower()))
                return self.actions[entry.tag]
        if address == "127.0.0.1":
            self.debug_log("ACL: %s requests %s: No matching IP ACLs, "
                           "localhost allowed" % (address, rmi))
            return self.actions['Allow']  # default allow for localhost

        self.debug_log("ACL: %s requests %s: No matching IP ACLs, defer" %
                       (address, rmi))
        return self.actions["Defer"]  # default defer for other machines


class MetadataACLFile(Bcfg2.Server.Plugin.StructFile):
    """ representation of ACL metadata.xml, for metadata-based ACLs """
    def check_acl(self, metadata, rmi):
        """ check client metadata against the ACL list """
        if not len(self.entries):
            # default allow if no ACLs are defined.
            self.debug_log("ACL: %s requests %s: No metadata ACLs, allow" %
                           (metadata.hostname, rmi))
            return True
        for el in self.Match(metadata):
            if rmi_names_equal(el.get("method"), rmi):
                self.debug_log("ACL: %s requests %s: Found matching metadata "
                               "ACL, %s" % (metadata.hostname, rmi,
                                            el.tag.lower()))
                return el.tag == "Allow"
        if metadata.hostname in ['localhost', 'localhost.localdomain']:
            # default allow for localhost
            self.debug_log("ACL: %s requests %s: No matching metadata ACLs, "
                           "localhost allowed" % (metadata.hostname, rmi))
            return True
        self.debug_log("ACL: %s requests %s: No matching metadata ACLs, deny" %
                       (metadata.hostname, rmi))
        return False  # default deny for other machines


class ACL(Bcfg2.Server.Plugin.Plugin,
          Bcfg2.Server.Plugin.ClientACLs):
    """ allow connections to bcfg-server based on IP address """

    def __init__(self, core):
        Bcfg2.Server.Plugin.Plugin.__init__(self, core)
        Bcfg2.Server.Plugin.ClientACLs.__init__(self)
        self.ip_acls = IPACLFile(os.path.join(self.data, 'ip.xml'),
                                 should_monitor=True)
        self.metadata_acls = MetadataACLFile(os.path.join(self.data,
                                                          'metadata.xml'),
                                             should_monitor=True)

    def check_acl_ip(self, address, rmi):
        self.debug_log("ACL: %s requests %s: Checking IP ACLs" %
                       (address[0], rmi))
        return self.ip_acls.check_acl(address[0], rmi)

    def check_acl_metadata(self, metadata, rmi):
        self.debug_log("ACL: %s requests %s: Checking metadata ACLs" %
                       (metadata.hostname, rmi))
        return self.metadata_acls.check_acl(metadata, rmi)

    def set_debug(self, debug):
        rv = Bcfg2.Server.Plugin.Plugin.set_debug(self, debug)
        self.ip_acls.set_debug(debug)
        self.metadata_acls.set_debug(debug)
        return rv
    set_debug.__doc__ = Bcfg2.Server.Plugin.Plugin.set_debug.__doc__