summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Encryption.py
blob: 0a2d486bfa1bf7fe8f801d1cb664ac9e879da3ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/python -Ott

import os
import base64
from M2Crypto import Rand
from M2Crypto.EVP import Cipher, EVPError
from Bcfg2.Compat import StringIO

try:
    from hashlib import md5
except ImportError:
    from md5 import md5

ENCRYPT = 1
DECRYPT = 0
ALGORITHM = "aes_256_cbc"
IV = '\0' * 16

Rand.rand_seed(os.urandom(1024))

def _cipher_filter(cipher, instr):
    inbuf = StringIO(instr)
    outbuf = StringIO()
    while 1:
        buf = inbuf.read()
        if not buf:
            break
        outbuf.write(cipher.update(buf))
    outbuf.write(cipher.final())
    rv = outbuf.getvalue()
    inbuf.close()
    outbuf.close()
    return rv

def str_encrypt(plaintext, key, iv=IV, algorithm=ALGORITHM, salt=None):
    """ encrypt a string """
    cipher = Cipher(alg=algorithm, key=key, iv=iv, op=ENCRYPT, salt=salt)
    return _cipher_filter(cipher, plaintext)
    
def str_decrypt(crypted, key, iv=IV, algorithm=ALGORITHM):
    """ decrypt a string """
    cipher = Cipher(alg=algorithm, key=key, iv=iv, op=DECRYPT)
    return _cipher_filter(cipher, crypted)
    
def ssl_decrypt(data, passwd, algorithm=ALGORITHM):
    """ decrypt openssl-encrypted data """
    # base64-decode the data if necessary
    try:
        data = base64.b64decode(data)
    except TypeError:
        # already decoded
        pass
    
    salt = data[8:16]
    hashes = [md5(passwd + salt).digest()]
    for i in range(1,3):
        hashes.append(md5(hashes[i-1] + passwd + salt).digest())
    key = hashes[0] + hashes[1]
    iv = hashes[2]
    
    return str_decrypt(data[16:], key=key, iv=iv)

def ssl_encrypt(plaintext, passwd, algorithm=ALGORITHM, salt=None):
    """ encrypt data in a format that is openssl compatible """
    if salt is None:
        salt = Rand.rand_bytes(8)
    
    hashes = [md5(passwd + salt).digest()]
    for i in range(1,3):
        hashes.append(md5(hashes[i-1] + passwd + salt).digest())
    key = hashes[0] + hashes[1]
    iv = hashes[2]
    
    crypted = str_encrypt(plaintext, key=key, salt=salt, iv=iv)
    return base64.b64encode("Salted__" + salt + crypted) + "\n"

def get_passphrases(setup):
    """ Get all candidate encryption passphrases from the config file.

    :param setup: The Bcfg2 option set to extract passphrases from
    :type setup: Bcfg2.Options.OptionParser
    returns: dict - a dict of <passphrase name>: <passphrase>
    """
    section = "encryption"
    if setup.cfp.has_section(section):
        return dict([(o, setup.cfp.get(section, o))
                     for o in setup.cfp.options(section)])
    else:
        return dict()

def bruteforce_decrypt(crypted, passphrases=None, setup=None):
    """ Convenience method to decrypt the given encrypted string by
    trying the given passphrases or all passphrases (as returned by
    :func:`Bcfg2.Encryption.passphrases`) sequentially until one is
    found that works.

    Either ``passphrases`` or ``setup`` must be provided.

    :param crypted: The data to decrypt
    :type crypted: string
    :param passphrases: The passphrases to try.
    :type passphrases: list
    :param setup: A Bcfg2 option set to extract passphrases from
    :type setup: Bcfg2.Options.OptionParser
    :returns: string - The decrypted data
    :raises: :class:`M2Crypto.EVP.EVPError`, if the data cannot be decrypted
    """
    if not passphrases:
        passphrases = get_passphrases(setup).values()
    for passwd in passphrases:
        try:
            return ssl_decrypt(crypted, passwd)
        except EVPError:
            pass
    raise EVPError("Failed to decrypt")