Files
truenas-kmip-unlocker/truenas_kmip_unlock.py

251 lines
10 KiB
Python

#!/usr/bin/env python
import os
import binascii
import logging
import sys
import secrets
import base64
import optparse
import configparser
import requests
import platform
import subprocess
import simplejson as json
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from getpass import getpass
from kmip.core import enums
from kmip.demos import utils
from kmip.pie import client
def request(resource, api_key, method='GET', data=None):
if data is None:
data = ''
else:
data = json.dumps(data)
url = 'https://127.0.0.1/api/v2.0/{}'.format(resource)
logger.debug('Request URL: {}'.format(url))
logger.debug('Request Data: {}'.format(data))
r = requests.request(
method,
url,
data=data,
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(api_key)},
verify=False
)
logger.debug('Request Status Code: {}'.format(r.status_code))
if r.ok:
try:
logger.debug('Request Returned JSON: {}'.format(r.json()))
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.json()}
except:
logger.debug('Request Returned Text: {}'.format(r.text))
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.text}
raise ValueError(r)
def create_key(client):
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
logger.debug("Successfully created a new encryption key.")
logger.debug("Secret ID: {}".format(key_id))
except Exception as e:
logger.error(e)
sys.exit(-1)
# Activate the encryption key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the encryption key.")
return key_id
except Exception as e:
logger.error(e)
sys.exit(-1)
def encrypt(client, data):
try:
data = data.encode('UTF-8')
key_id = create_key(client)
iv = secrets.token_bytes(16)
cipher_text, autogenerated_iv = client.encrypt(
data,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
logger.debug("Successfully encrypted the data: {}".format(data))
cipher_text_base64_bytes = base64.b64encode(cipher_text)
cipher_text_base64 = cipher_text_base64_bytes.decode('ascii')
logger.debug("Cipher text (raw): {}".format(cipher_text))
logger.debug("Cipher txt (encoded): {}".format(cipher_text_base64))
logger.debug("IV (raw): {}".format(iv))
iv_base64_bytes = base64.b64encode(iv)
iv_base64 = iv_base64_bytes.decode('ascii')
logger.debug("IV (encoded): {}".format(iv_base64))
padded_key_id = str(key_id).zfill(9)
logger.debug("Padding Key ID {} with zeros: {}".format(key_id,padded_key_id))
cipher_data = base64.b64encode(padded_key_id.encode('UTF-8') + iv + cipher_text).decode()
logger.debug("padded_key_id + iv + cipher_text (encoded): {}".format(cipher_data))
return cipher_data
except Exception as e:
logger.error(e)
def decrypt(client, data):
try:
cipher_data = base64.b64decode(data)
padded_key_id = cipher_data[:9].decode('UTF-8')
iv = cipher_data[9:25]
cipher_text = cipher_data[25:]
logger.debug("Removing padding from Key ID: {}".format(padded_key_id))
key_id = padded_key_id.lstrip('0')
logger.debug("Decrypting with Key ID: {}".format(key_id))
logger.debug("Decrypting with IV (raw): {}".format(iv))
logger.debug("Decrypting cipher text (raw): {}".format(cipher_text))
plain_text = client.decrypt(
cipher_text,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
logger.debug("Successfully decrypted the data.")
plain_text = plain_text.decode('utf-8')
logger.debug("Plain text: '{}'".format(plain_text))
return plain_text
except Exception as e:
logger.error(e)
if __name__ == '__main__':
# get directory of script
cwd = os.path.dirname(os.path.realpath(__file__))
# Build and parse arguments
parser = utils.build_cli_parser(enums.Operation.DECRYPT)
parser.add_option(
"-e",
"--encrypt",
action="store_true",
dest="encrypt",
help="Encrypt a passphrase/password."
)
parser.add_option(
"-d",
"--decrypt",
action="store_true",
dest="decrypt",
help="Decrypt a passphrase/password."
)
parser.add_option (
"-v",
"--verbose",
action="store_true",
dest="debug",
help="Output debug/verbose info to the console for troubleshooting."
)
opts, args = parser.parse_args(sys.argv[1:])
if opts.debug:
logger = utils.build_console_logger(logging.DEBUG)
else:
logger = utils.build_console_logger(logging.INFO)
config = opts.config
passphrase = opts.message
client = client.ProxyKmipClient(config=config, config_file=cwd + '/pykmip/client.conf')
client.open()
if opts.encrypt:
if not passphrase:
while True:
passphrase = getpass("Please enter your pool/dataset unlock passphrase to encrypt: ")
passphrase2 = getpass("Please enter the passphrase again: ")
if passphrase == passphrase2:
break
else:
print("The passphrases do not match, please try again.")
encrypted_passphrase = encrypt(client, passphrase)
print("Your encrypted passphrase: {}".format(encrypted_passphrase))
sys.exit(0)
elif opts.decrypt:
if not passphrase:
passphrase = input("Please enter the encryted passphrase to decrypt: ")
decrypted_passphrase = decrypt(client, passphrase)
print("Your decrypted passphrase: {}".format(decrypted_passphrase))
sys.exit(0)
else:
# run the decryption of the keys and unlock the pool
parser = configparser.ConfigParser()
parser.read(cwd + '/secrets.ini')
encrypted_api_key = parser.get('DEFAULT', 'encrypted_api_key')
api_key = decrypt(client, encrypted_api_key)
logger.debug("API Key: {}".format(api_key))
POOLS = request('pool', api_key)['response']
DATASETS = request('pool/dataset', api_key)['response']
for pool_dataset_name in parser.sections():
if pool_dataset_name != 'DEFAULT':
name = pool_dataset_name
encrypted_key = parser.get(name, 'encrypted_key')
logger.debug("Attempting to decrypt {} pool with encrypted key: {}".format(name,encrypted_key))
decrypted_key = decrypt(client, encrypted_key)
for pool in POOLS:
if pool['name'] == name:
# GELI encrypted pool
if pool['encrypt'] == 2:
if pool['is_decrypted'] == False:
logger.info('Pool {} is locked'.format(pool['name']))
response = request('pool/id/{}/unlock'.format(pool['id']), api_key, 'POST', {'passphrase': '{}'.format(decrypted_key), 'services_restart': ['cifs', 'nfs']})
if response['ok']:
logger.info('Pool {} was unlocked successfully'.format(pool['name']))
else:
logger.error('Pool {} was NOT unlocked successfully'.format(pool['name']))
else:
logger.info('Pool {} is already unlocked'.format(pool['name']))
# start detection of locked zfs dataset
elif pool['encrypt'] == 0:
# loop through the datasets to find the one we are looking for
for dataset in DATASETS:
if dataset['name'] == name:
if dataset['locked'] == True:
logger.info('Dataset {} is locked'.format(dataset['name']))
response = request('pool/dataset/unlock',api_key, 'POST', {'id': '{}'.format(dataset['id']), 'unlock_options': {'recursive': True, 'datasets': [{'name': '{}'.format(dataset['name']), 'passphrase': '{}'.format(decrypted_key)}]}})
if response['ok']:
logger.info('Dataset {} was unlocked successfully'.format(dataset['name']))
# restart services since the dataset was unlocked
logger.info('Restarting the NFS service')
response = request('service/restart', api_key, 'POST', {'service': 'nfs'})
logger.info('Restarting the CIFS/SMB service')
response = request('service/restart', api_key, 'POST', {'service': 'cifs'})
else:
logger.error('Dataset {} was NOT unlocked successfully'.format(dataset['name']))
else:
logger.info('Dataset {} is already unlocked'.format(dataset['name']))
#print('pool {0} is decrypted: {1}'.format(pool['name'],pool['is_decrypted']))
# this line should be after the actual unlock on the disk
#logger.debug("Decrypted {0} pool key.".format(name))
sys.exit(0)
client.close()