Files
truenas-kmip-unlocker/truenas_kmip_unlock.py

545 lines
24 KiB
Python

#!/usr/bin/env python
import os
import binascii
import logging
import sys
import secrets
import base64
import argparse
import configparser
import getpass
import requests
import platform
import subprocess
import time
import simplejson as json
import hmac as pyhmac
import datetime
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from kmip.core import enums
from kmip.demos import utils
from kmip.pie import client
def request(resource, api_key, method='GET', data=None):
if data is None:
data = ''
else:
data = json.dumps(data)
url = 'https://127.0.0.1/api/v2.0/{}'.format(resource)
logger.debug('Request URL: {}'.format(url))
logger.debug('Request Data: {}'.format(data))
r = requests.request(
method,
url,
data=data,
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(api_key)},
verify=False
)
logger.debug('Request Status Code: {}'.format(r.status_code))
if r.ok:
try:
logger.debug('Request Returned JSON: {}'.format(r.json()))
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.json()}
except:
logger.debug('Request Returned Text: {}'.format(r.text))
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.text}
raise ValueError(r)
def build_logger(level):
logger = logging.getLogger()
logger.setLevel(level)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
# log to file
#fileHandler = logging.FileHandler(log_file)
#fileHandler.setFormatter(formatter)
#logger.addHandler(fileHandler)
# log to console
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)
return logger
def write_config_file(array, config_file):
logger.debug("Starting to write config file and encrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Converting config from array to json")
array_json = json.dumps(array)
logger.debug("Encrypting config json")
encrypted_array_json = encrypt(array_json)
logger.debug("Attempting to write encrypted config to file")
try:
f = open(config_file, "w")
#f.write(array_json)
f.write(encrypted_array_json)
f.close()
logger.debug("Successfully wrote encrypted config to file")
except Exception as e:
logger.error("Unable to write encrypted config to file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Finshed writing config file and encrypting contents")
def read_config_file(config_file):
logger.debug("Starting to read config file and decrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Attempting to read encrypted config from file")
try:
with open(config_file) as f:
config = f.read()
logger.debug("Successfully read encrypted config from file")
except Exception as e:
logger.error("Unable to read encrypted config from file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Decrypting config contents")
decrypted_array_json = decrypt(config)
logger.debug("Convert config from json to array")
#array = json.loads(config)
array = json.loads(decrypted_array_json)
logger.debug("Finished reading config file and decrypting contents")
return array
def ask_for_confirmation(question):
logger.debug("Asking user for confirmation")
logger.debug("Question: {}".format(question))
print(question)
while True:
confirmation = input("y/n> ")
logger.debug("User answered: {}".format(confirmation))
if confirmation.casefold() == "y":
return True
elif confirmation.casefold() == "n":
return False
else:
print("This value must be one of the following characters: y, n.")
def create_encryption_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
logger.debug("Successfully created a new encryption key.")
logger.debug("Encryption Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create encryption key. Error: {}".format(e))
sys.exit(-1)
# Activate the encryption key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the encryption key.")
return key_id
except Exception as e:
logger.error("Unable to activate encryption key. Error: {}".format(e))
sys.exit(-1)
def create_hmac_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.MAC_GENERATE,
enums.CryptographicUsageMask.MAC_VERIFY
]
)
logger.debug("Successfully created a new HMAC key.")
logger.debug("HMAC Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create hmac key. Error: {}".format(e))
sys.exit(-1)
# Activate the HMAC key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the HMAC key.")
return key_id
except Exception as e:
logger.error("Unable to activate hmac key. Error: {}".format(e))
sys.exit(-1)
def encrypt(data):
try:
data = data.encode('UTF-8')
key_id = create_encryption_key()
iv = secrets.token_bytes(16)
cipher_text, autogenerated_iv = client.encrypt(
data,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
hmac_key_id, hmac = client.mac(
key_id.encode() + iv + cipher_text,
uid = create_hmac_key(),
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
logger.debug("Successfully encrypted the data.")
array = dict()
array['version'] = 1
array['cipher_key_id'] = key_id
array['cipher_text'] = base64.b64encode(cipher_text).decode()
array['iv'] = base64.b64encode(iv).decode()
array['hmac_key_id'] = hmac_key_id
array['hmac'] = base64.b64encode(hmac).decode()
logger.debug("Dict of info: {}".format(array))
array_json = json.dumps(array)
array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode()
return array_json_b64
except Exception as e:
logger.error("Unable to encrypt data. Error: {}".format(e))
sys.exit(-1)
def decrypt(data):
array_json = base64.b64decode(data)
array = json.loads(array_json)
if array['version'] == 1:
return decrypt_v1(array)
else:
logger.error("Unable to detemine encryption version.")
return False
def decrypt_v1(array):
try:
logger.debug("Dict of info: {}".format(array))
key_id = array['cipher_key_id']
iv = base64.b64decode(array['iv'])
cipher_text = base64.b64decode(array['cipher_text'])
hmac_key_id = array['hmac_key_id']
hmac = base64.b64decode(array['hmac'])
hmac_key_id_test, hmac_test = client.mac(
key_id.encode() + iv + cipher_text,
uid = hmac_key_id,
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
if pyhmac.compare_digest(hmac, hmac_test):
logger.debug("HMAC matches.")
else:
logger.error("HMAC does not match, data is corrupted/tampered.")
sys.exit(-1)
plain_text = client.decrypt(
cipher_text,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
logger.debug("Successfully decrypted the data.")
plain_text = plain_text.decode('utf-8')
return plain_text
except Exception as e:
logger.error("Unable to decrypt data. Error: {}".format(e))
sys.exit(-1)
def new_pool_details():
print("Requesting pool details to add to config.")
pool_name = input("Please enter pool name: ")
while True:
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
if pyhmac.compare_digest(pool_passphrase, pool_passphrase_confirm):
break
else:
print("The pool encryption passphrases do not match, please try again.")
array = dict()
array[pool_name] = dict()
array[pool_name]["pool_passphrase"] = pool_passphrase
return array
def edit_pool_details(pools, pool_to_edit):
if ask_for_confirmation("Would you like to edit the pool name?\nCurrent Value: {}".format(pool_to_edit)):
pool_name = input("Please enter pool name: ")
else:
pool_name = pool_to_edit
if ask_for_confirmation("Would you like to edit the pool encryption passphraset?"):
while True:
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
if pyhmac.compare_digest(pool_passphrase, pool_passphrase_confirm):
break
else:
print("The pool encryption passphrases do not match, please try again.")
else:
pool_passphrase = pools[pool_to_edit]['pool_passphrase']
array = dict()
array[pool_name] = dict()
array[pool_name]["pool_passphrase"] = pool_passphrase
return array
def select_pool(pools, wording = "edit"):
print("Which pool would you like to {}:".format(wording))
print(" ")
pool_names = list(pools)
for i in range(0, len(pools)):
pretty_number = i + 1
print("{}) {}".format(pretty_number, pool_names[i]))
print(" ")
while True:
pool_to_modify = int(input("Please enter number relating to the pool you wish to {}: ".format(wording))) - 1
try:
return pool_names[pool_to_modify]
except IndexError as error:
print("you entered a number out of range, please try again")
if __name__ == '__main__':
# Build and parse arguments
parser = argparse.ArgumentParser(
usage="{} [options]".format(os.path.basename(__file__)),
description="Run Truenas pool unlock operation. This will unlock encrypted pools on truenas.")
parser.add_argument(
"-c",
"--config",
action="store_true",
dest="config",
help="Edit pools configuration."
)
parser.add_argument(
"-r",
"--restartJails",
action="store_true",
dest="restartJails",
help="Restarts jails if running"
)
parser.add_argument (
"-v",
"--verbose",
action="store_true",
dest="debug",
help="Output debug/verbose info to the console for troubleshooting."
)
opts = parser.parse_args()
script_directory = os.path.dirname(os.path.realpath(__file__))
script_name = os.path.basename(__file__)
secrets_config_file = os.path.join(script_directory, "secrets.config")
pykmip_client_config_file = os.path.join(script_directory, "conf", "client.conf")
log_file = os.path.join(script_directory, "log.log")
datetime_string = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
if opts.debug:
logger = build_logger(logging.DEBUG)
else:
logger = build_logger(logging.INFO)
client = client.ProxyKmipClient(config_file=pykmip_client_config_file)
client.open()
if opts.config:
while True:
if not os.path.exists(secrets_config_file):
print("No pools found, do you want to add a new one?")
print(" ")
print("n) New config")
print("q) Quit config")
while True:
user_input = input("n/q> ")
if user_input.casefold() == "n":
config = dict()
config['API Key'] = input("Please enter your API key: ")
config['Pools'] = new_pool_details()
config['jailStoragePool'] = select_pool(config['Pools'], "select for jail storage")
write_config_file(config, secrets_config_file)
break
elif user_input.casefold() == "q":
sys.exit(0)
else:
print("This value must be one of the following characters: n, q.")
while True:
config = read_config_file(secrets_config_file)
print("Current pools:")
print(" ")
for pool in config['Pools']:
print(pool)
print(" ")
print("Jail storage pool: {}".format(config['jailStoragePool']))
print(" ")
print("a) Edit API Key")
print("e) Edit pool")
print("j) Edit jail storage pool")
print("n) New pool")
print("d) Delete pool")
print("q) Quit config")
user_input = input("a/e/j/n/d/q> ")
# Edit API Key
if user_input.casefold() == "a":
config['API Key'] = input("Please enter your API key: ")
write_config_file(config, secrets_config_file)
# Editing an account
elif user_input.casefold() == "e":
pool_to_edit = select_pool(config['Pools'])
pool_details = edit_pool_details(config['Pools'], pool_to_edit)
del config['Pools'][pool_to_edit]
config['Pools'].update(pool_details)
write_config_file(config, secrets_config_file)
break
# Editing jail storage pool
elif user_input.casefold() == "j":
config['jailStoragePool'] = select_pool(config['Pools'], "select for jail storage")
write_config_file(config, secrets_config_file)
break
# Createing a new account
elif user_input.casefold() == "n":
pool_details = new_pool_details()
config['Pools'].update(pool_details)
write_config_file(config, secrets_config_file)
break
# Deleting an account
elif user_input.casefold() == "d":
pool_to_delete = select_pool(config['Pools'], "delete")
if not ask_for_confirmation("Are you sure you wish to delete {} pool? ".format(pool_to_delete)):
break
del config['Pools'][pool_to_delete]
if len(config['Pools']) == 0:
# no more accounts, remove secrets file
os.remove(secrets_config_file)
else:
write_config_file(config, secrets_config_file)
break
# Quit the config
elif user_input.casefold() == "q":
sys.exit(0)
# Catch all for non-valid characters
else:
print("This value must be one of the following characters: a, e, j, n, d, q.")
if not os.path.exists(secrets_config_file):
print("No configuration file found. Please run {} -c for configuration.".format(script_name))
sys.exit(-1)
# run the decryption of the keys and unlock the pool
config = read_config_file(secrets_config_file)
api_key = config['API Key']
jail_storage_pool = config['jailStoragePool']
API_POOLS = request('pool', api_key)['response']
API_DATASETS = request('pool/dataset', api_key)['response']
for pool_dataset_name in config['Pools']:
if pool_dataset_name != 'DEFAULT':
name = pool_dataset_name
pool_passphrase = config['Pools'][name]['pool_passphrase']
logger.info("Attempting to unlock {} pool.".format(name))
for pool in API_POOLS:
if pool['name'] == name:
# GELI encrypted pool
if pool['encrypt'] == 2:
if pool['is_decrypted'] == False:
logger.info('Pool {} is locked'.format(pool['name']))
services_to_restart = request('pool/unlock_services_restart_choices', api_key, "POST", pool['id'])['response']
first = True
for service_to_restart in services_to_restart:
if service_to_restart == "jails":
logger.debug('Skipping adding the {} service'.format(services_to_restart[service_to_restart]))
continue
if first:
pool_services = "'{}'".format(service_to_restart)
first = False
else:
pool_services += ", '{}'".format(service_to_restart)
logger.debug('Restarting {} services when unlocking pool.'.format(pool_services))
response = request('pool/id/{}/unlock'.format(pool['id']), api_key, 'POST', {'passphrase': '{}'.format(pool_passphrase), 'services_restart': "[" + pool_services + "]"})
if response['ok']:
logger.info('Pool {} was unlocked and services restarted successfully'.format(pool['name']))
else:
logger.error('Pool {} was NOT unlocked and services NOT restarted'.format(pool['name']))
else:
logger.info('Pool {} is already unlocked'.format(pool['name']))
# start detection of locked zfs dataset
elif pool['encrypt'] == 0:
# loop through the datasets to find the one we are looking for
for dataset in API_DATASETS:
if dataset['name'] == name:
if dataset['locked'] == True:
logger.info('Dataset {} is locked'.format(dataset['name']))
response = request('pool/dataset/unlock', api_key, 'POST', {'id': '{}'.format(dataset['id']), 'unlock_options': {'recursive': True, 'datasets': [{'name': '{}'.format(dataset['name']), 'passphrase': '{}'.format(pool_passphrase)}]}})
if response['ok']:
logger.info('Dataset {} was unlocked successfully'.format(dataset['name']))
# restart services since the dataset was unlocked
logger.info('Restarting services')
services_to_restart = request('pool/unlock_services_restart_choices', api_key, "POST", pool['id'])['response']
for service_to_restart in services_to_restart:
if service_to_restart == "jails":
logger.debug('Skipping restarting the {} service'.format(services_to_restart[service_to_restart]))
continue
logger.debug('Restarting the {} service'.format(services_to_restart[service_to_restart]))
response = request('service/restart', api_key, 'POST', {'service': service_to_restart})
if response['ok']:
logger.debug('Service {} was restarted successfully'.format(services_to_restart[service_to_restart]))
else:
logger.error('Service {} was NOT restarted'.format(services_to_restart[service_to_restart]))
else:
logger.error('Dataset {} was NOT unlocked'.format(dataset['name']))
else:
logger.info('Dataset {} is already unlocked'.format(dataset['name']))
# make sure jail storage pool is set
logger.info('Setting jail storage pool to {}'.format(jail_storage_pool))
response = request('jail/activate', api_key, 'POST', '{}'.format(jail_storage_pool))
if response['ok']:
logger.info('Successfully set jail storage pool to {}'.format(jail_storage_pool))
logger.debug("Sleeping for 60 seconds")
time.sleep(60)
# restart the jails service
#jails_service_name = 'jails'
#logger.debug('Restarting the {} service'.format(jails_service_name))
#response = request('service/restart', api_key, 'POST', {'service': jails_service_name})
#if response['ok']:
#logger.debug('Service {} was restarted successfully'.format(jails_service_name))
#else:
#logger.error('Service {} was NOT restarted'.format(jails_service_name))
# start any jails that are set to start on boot but are most likely on an encrypted pool
API_JAILS = request('jail', api_key)['response']
for jail in API_JAILS:
if jail['boot'] == 1:
logger.info('Jail {} is set to start on boot'.format(jail['id']))
if jail['state'] == 'down':
logger.info('Jail {} is stopped'.format(jail['id']))
logger.info('Starting jail {}'.format(jail['id']))
response = request('jail/start', api_key, 'POST', '{}'.format(jail['id']))
if response['ok']:
logger.info('Started jail {} successfully'.format(jail['id']))
else:
logger.error('Jail {} was NOT started'.format(jail['id']))
else:
if opts.restartJails:
logger.info('Jail {} is already started but --restartJails was passed in. Restarting jail.'.format(jail['id']))
response = request('jail/restart', api_key, 'POST', '{}'.format(jail['id']))
if response['ok']:
logger.info('Restarted jail {} successfully'.format(jail['id']))
else:
logger.error('Jail {} was NOT restarted'.format(jail['id']))
else:
logger.info('Jail {} is already started'.format(jail['id']))
else:
logger.error('Unable to set jail storage pool to {}'.format(jail_storage_pool))
client.close()
sys.exit(0)