Files
truenas-kmip-unlocker/truenas_kmip_unlock.py
2025-10-24 01:41:46 +00:00

460 lines
18 KiB
Python

#!/usr/bin/env python
import os
import binascii
import logging
import sys
import secrets
import base64
import argparse
import configparser
import getpass
from truenas_api_client import Client
import platform
import subprocess
import time
try:
import simplejson as json
except ImportError:
import json
import hmac as pyhmac
import datetime
from kmip.core import enums
from kmip.demos import utils
from kmip.pie import client
def build_logger(level):
logger = logging.getLogger()
logger.setLevel(level)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
# log to file
#fileHandler = logging.FileHandler(log_file)
#fileHandler.setFormatter(formatter)
#logger.addHandler(fileHandler)
# log to console
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)
return logger
def write_config_file(array, config_file):
logger.debug("Starting to write config file and encrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Converting config from array to json")
array_json = json.dumps(array)
logger.debug("Encrypting config json")
encrypted_array_json = encrypt(array_json)
logger.debug("Attempting to write encrypted config to file")
try:
f = open(config_file, "w")
#f.write(array_json)
f.write(encrypted_array_json)
f.close()
logger.debug("Successfully wrote encrypted config to file")
except Exception as e:
logger.error("Unable to write encrypted config to file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Finshed writing config file and encrypting contents")
def read_config_file(config_file):
logger.debug("Starting to read config file and decrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Attempting to read encrypted config from file")
try:
with open(config_file) as f:
config = f.read()
logger.debug("Successfully read encrypted config from file")
except Exception as e:
logger.error("Unable to read encrypted config from file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Decrypting config contents")
decrypted_array_json = decrypt(config)
logger.debug("Convert config from json to array")
#array = json.loads(config)
array = json.loads(decrypted_array_json)
logger.debug("Finished reading config file and decrypting contents")
return array
def ask_for_confirmation(question):
logger.debug("Asking user for confirmation")
logger.debug("Question: {}".format(question))
print(question)
while True:
confirmation = input("y/n> ")
logger.debug("User answered: {}".format(confirmation))
if confirmation.casefold() == "y":
return True
elif confirmation.casefold() == "n":
return False
else:
print("This value must be one of the following characters: y, n.")
def create_encryption_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
logger.debug("Successfully created a new encryption key.")
logger.debug("Encryption Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create encryption key. Error: {}".format(e))
sys.exit(-1)
# Activate the encryption key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the encryption key.")
return key_id
except Exception as e:
logger.error("Unable to activate encryption key. Error: {}".format(e))
sys.exit(-1)
def create_hmac_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.MAC_GENERATE,
enums.CryptographicUsageMask.MAC_VERIFY
]
)
logger.debug("Successfully created a new HMAC key.")
logger.debug("HMAC Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create hmac key. Error: {}".format(e))
sys.exit(-1)
# Activate the HMAC key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the HMAC key.")
return key_id
except Exception as e:
logger.error("Unable to activate hmac key. Error: {}".format(e))
sys.exit(-1)
def encrypt(data):
try:
data = data.encode('UTF-8')
key_id = create_encryption_key()
iv = secrets.token_bytes(16)
cipher_text, autogenerated_iv = client.encrypt(
data,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
hmac_key_id, hmac = client.mac(
key_id.encode() + iv + cipher_text,
uid = create_hmac_key(),
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
logger.debug("Successfully encrypted the data.")
array = dict()
array['version'] = 1
array['cipher_key_id'] = key_id
array['cipher_text'] = base64.b64encode(cipher_text).decode()
array['iv'] = base64.b64encode(iv).decode()
array['hmac_key_id'] = hmac_key_id
array['hmac'] = base64.b64encode(hmac).decode()
logger.debug("Dict of info: {}".format(array))
array_json = json.dumps(array)
array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode()
return array_json_b64
except Exception as e:
logger.error("Unable to encrypt data. Error: {}".format(e))
sys.exit(-1)
def decrypt(data):
array_json = base64.b64decode(data)
array = json.loads(array_json)
if array['version'] == 1:
return decrypt_v1(array)
else:
logger.error("Unable to detemine encryption version.")
return False
def decrypt_v1(array):
try:
logger.debug("Dict of info: {}".format(array))
key_id = array['cipher_key_id']
iv = base64.b64decode(array['iv'])
cipher_text = base64.b64decode(array['cipher_text'])
hmac_key_id = array['hmac_key_id']
hmac = base64.b64decode(array['hmac'])
hmac_key_id_test, hmac_test = client.mac(
key_id.encode() + iv + cipher_text,
uid = hmac_key_id,
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
if pyhmac.compare_digest(hmac, hmac_test):
logger.debug("HMAC matches.")
else:
logger.error("HMAC does not match, data is corrupted/tampered.")
sys.exit(-1)
plain_text = client.decrypt(
cipher_text,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
logger.debug("Successfully decrypted the data.")
plain_text = plain_text.decode('utf-8')
return plain_text
except Exception as e:
logger.error("Unable to decrypt data. Error: {}".format(e))
sys.exit(-1)
def new_pool_details():
print("Requesting pool details to add to config.")
pool_name = input("Please enter pool name: ")
while True:
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
if pyhmac.compare_digest(pool_passphrase, pool_passphrase_confirm):
break
else:
print("The pool encryption passphrases do not match, please try again.")
array = dict()
array[pool_name] = dict()
array[pool_name]["pool_passphrase"] = pool_passphrase
return array
def edit_pool_details(pools, pool_to_edit):
if ask_for_confirmation("Would you like to edit the pool name?\nCurrent Value: {}".format(pool_to_edit)):
pool_name = input("Please enter pool name: ")
else:
pool_name = pool_to_edit
if ask_for_confirmation("Would you like to edit the pool encryption passphraset?"):
while True:
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
if pyhmac.compare_digest(pool_passphrase, pool_passphrase_confirm):
break
else:
print("The pool encryption passphrases do not match, please try again.")
else:
pool_passphrase = pools[pool_to_edit]['pool_passphrase']
array = dict()
array[pool_name] = dict()
array[pool_name]["pool_passphrase"] = pool_passphrase
return array
def select_pool(pools, wording = "edit"):
print("Which pool would you like to {}:".format(wording))
print(" ")
pool_names = list(pools)
for i in range(0, len(pools)):
pretty_number = i + 1
print("{}) {}".format(pretty_number, pool_names[i]))
print(" ")
while True:
pool_to_modify = int(input("Please enter number relating to the pool you wish to {}: ".format(wording))) - 1
try:
return pool_names[pool_to_modify]
except IndexError as error:
print("you entered a number out of range, please try again")
if __name__ == '__main__':
# Build and parse arguments
parser = argparse.ArgumentParser(
usage="{} [options]".format(os.path.basename(__file__)),
description="Run Truenas pool unlock operation. This will unlock encrypted pools on truenas.")
parser.add_argument(
"-c",
"--config",
action="store_true",
dest="config",
help="Edit pools configuration."
)
#parser.add_argument(
# "-r",
# "--restartAppsVMs",
# action="store_true",
# dest="restartAppsVMs",
# help="Restarts apps and VMs if running"
#)
parser.add_argument (
"-v",
"--verbose",
action="store_true",
dest="debug",
help="Output debug/verbose info to the console for troubleshooting."
)
opts = parser.parse_args()
script_directory = os.path.dirname(os.path.realpath(__file__))
script_name = os.path.basename(__file__)
secrets_config_file = os.path.join(script_directory, "secrets.config")
pykmip_client_config_file = os.path.join(script_directory, "conf", "client.conf")
log_file = os.path.join(script_directory, "log.log")
datetime_string = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
if opts.debug:
logger = build_logger(logging.DEBUG)
else:
logger = build_logger(logging.INFO)
client = client.ProxyKmipClient(config_file=pykmip_client_config_file)
client.open()
if opts.config:
while True:
if not os.path.exists(secrets_config_file):
print("No pools found, do you want to add a new one?")
print(" ")
print("n) New config")
print("q) Quit config")
while True:
user_input = input("n/q> ")
if user_input.casefold() == "n":
config = dict()
config['URI'] = input("Please enter your hostname or IP address (default: localhost): ") or "localhost"
if ask_for_confirmation("Would you like to verify the TLS cert?"):
config['Verify TLS'] = True
else:
config['Verify TLS'] = False
config['API Key'] = input("Please enter your API key: ")
config['Pools'] = new_pool_details()
write_config_file(config, secrets_config_file)
break
elif user_input.casefold() == "q":
sys.exit(0)
else:
print("This value must be one of the following characters: n, q.")
while True:
config = read_config_file(secrets_config_file)
print("Current pools:")
print(" ")
for pool in config['Pools']:
print(pool)
print(" ")
print("URI: {}".format(config['URI']))
print("Verify TLS Cert: {}".format(config['Verify TLS']))
print(" ")
print("h) Edit Host, TLS, and API Key")
print("e) Edit pool")
print("n) New pool")
print("d) Delete pool")
print("q) Quit config")
user_input = input("h/e/n/d/q> ")
# Edit Host, TLS, and API Key
if user_input.casefold() == "h":
if ask_for_confirmation("Would you like to edit the hostname or IP address?\nCurrent Value: {}".format(config['URI'])):
config['URI'] = input("Please enter your hostname or IP address: ")
if ask_for_confirmation("Would you like to edit verify TLS cert?\nCurrent Value: {}".format(config['Verify TLS'])):
if ask_for_confirmation("Would you like to verify the TLS cert?"):
config['Verify TLS'] = True
else:
config['Verify TLS'] = False
if ask_for_confirmation("Would you like to edit the API key?"):
config['API Key'] = input("Please enter your API key: ")
write_config_file(config, secrets_config_file)
# Editing a pool
elif user_input.casefold() == "e":
pool_to_edit = select_pool(config['Pools'])
pool_details = edit_pool_details(config['Pools'], pool_to_edit)
del config['Pools'][pool_to_edit]
config['Pools'].update(pool_details)
write_config_file(config, secrets_config_file)
break
# Createing a new pool
elif user_input.casefold() == "n":
pool_details = new_pool_details()
config['Pools'].update(pool_details)
write_config_file(config, secrets_config_file)
break
# Deleting a pool
elif user_input.casefold() == "d":
pool_to_delete = select_pool(config['Pools'], "delete")
if not ask_for_confirmation("Are you sure you wish to delete {} pool? ".format(pool_to_delete)):
break
del config['Pools'][pool_to_delete]
if len(config['Pools']) == 0:
if ask_for_confirmation("There are no pools left in the config, do you want to remove the secrets file?"):
# no more accounts, remove secrets file if requests
os.remove(secrets_config_file)
else:
write_config_file(config, secrets_config_file)
break
# Quit the config
elif user_input.casefold() == "q":
sys.exit(0)
# Catch all for non-valid characters
else:
print("This value must be one of the following characters: h, e, n, d, q")
if not os.path.exists(secrets_config_file):
print("No configuration file found. Please run {} -c/--config for configuration".format(script_name))
sys.exit(-1)
# run the decryption of the keys and unlock the pool
config = read_config_file(secrets_config_file)
URI = config['URI']
VERIFY_TLS = config['Verify TLS']
API_KEY = config['API Key']
with Client(uri="wss://{}/api/current".format(URI), verify_ssl=VERIFY_TLS) as c:
if c.call("auth.login_with_api_key", API_KEY):
logger.debug('Successfully logged into {}'.format(URI))
else:
logger.error('Unable to login to {}'.format(URI))
SYSTEM_INFORMATION = c.call("system.info")
logger.debug('System Info: {}'.format(SYSTEM_INFORMATION))
POOL_DATASETS = c.call("pool.dataset.details")
logger.debug('Pool Datasets: {}'.format(POOL_DATASETS))
for config_pool in config['Pools']:
config_pool_name = config_pool
pool_passphrase = config['Pools'][config_pool_name]['pool_passphrase']
logger.info("Attempting to unlock {} pool".format(config_pool_name))
for pool_dataset in POOL_DATASETS:
pool_name = pool_dataset['name']
if pool_name == config_pool_name:
pool_encrypted = pool_dataset['encrypted']
pool_unlocked = pool_dataset['key_loaded']
if pool_encrypted == True and pool_unlocked == False:
logger.info('Dataset {} is locked'.format(pool_name))
unlock_args = {"datasets": [{"name": pool_name, "passphrase": pool_passphrase}]}
unlock_status = c.call("pool.dataset.unlock", pool_name, unlock_args, job=True)
# return will look like this: {'unlocked': ['test'], 'failed': {}}
if pool_name in unlock_status.get('unlocked', []):
logger.info('Dataset {} was unlocked successfully'.format(pool_name))
else:
logger.error('Dataset {} was NOT unlocked'.format(pool_name))
else:
logger.info('Dataset {} is already unlocked'.format(pool_name))
client.close()
sys.exit(0)