Files
truenas-kmip-unlocker/truenas_kmip_unlock.py
2023-04-01 21:56:16 -04:00

488 lines
20 KiB
Python

#!/usr/bin/env python
import os
import binascii
import logging
import sys
import secrets
import base64
import argparse
import configparser
import getpass
import requests
import platform
import subprocess
import time
import simplejson as json
import hmac as pyhmac
import datetime
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from kmip.core import enums
from kmip.demos import utils
from kmip.pie import client
def request(resource, api_key, method='GET', data=None):
if data is None:
data = ''
else:
data = json.dumps(data)
url = 'https://127.0.0.1/api/v2.0/{}'.format(resource)
logger.debug('Request URL: {}'.format(url))
logger.debug('Request Data: {}'.format(data))
r = requests.request(
method,
url,
data=data,
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(api_key)},
verify=False
)
logger.debug('Request Status Code: {}'.format(r.status_code))
if r.ok:
try:
logger.debug('Request Returned JSON: {}'.format(r.json()))
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.json()}
except:
logger.debug('Request Returned Text: {}'.format(r.text))
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.text}
raise ValueError(r)
def build_logger(level):
logger = logging.getLogger()
logger.setLevel(level)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
# log to file
#fileHandler = logging.FileHandler(log_file)
#fileHandler.setFormatter(formatter)
#logger.addHandler(fileHandler)
# log to console
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)
return logger
def write_config_file(array, config_file):
logger.debug("Starting to write config file and encrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Converting config from array to json")
array_json = json.dumps(array)
logger.debug("Encrypting config json")
encrypted_array_json = encrypt(array_json)
logger.debug("Attempting to write encrypted config to file")
try:
f = open(config_file, "w")
#f.write(array_json)
f.write(encrypted_array_json)
f.close()
logger.debug("Successfully wrote encrypted config to file")
except Exception as e:
logger.error("Unable to write encrypted config to file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Finshed writing config file and encrypting contents")
def read_config_file(config_file):
logger.debug("Starting to read config file and decrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Attempting to read encrypted config from file")
try:
with open(config_file) as f:
config = f.read()
logger.debug("Successfully read encrypted config from file")
except Exception as e:
logger.error("Unable to read encrypted config from file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Decrypting config contents")
decrypted_array_json = decrypt(config)
logger.debug("Convert config from json to array")
#array = json.loads(config)
array = json.loads(decrypted_array_json)
logger.debug("Finished reading config file and decrypting contents")
return array
def ask_for_confirmation(question):
logger.debug("Asking user for confirmation")
logger.debug("Question: {}".format(question))
print(question)
while True:
confirmation = input("y/n> ")
logger.debug("User answered: {}".format(confirmation))
if confirmation.casefold() == "y":
return True
elif confirmation.casefold() == "n":
return False
else:
print("This value must be one of the following characters: y, n.")
def create_encryption_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
logger.debug("Successfully created a new encryption key.")
logger.debug("Encryption Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create encryption key. Error: {}".format(e))
sys.exit(-1)
# Activate the encryption key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the encryption key.")
return key_id
except Exception as e:
logger.error("Unable to activate encryption key. Error: {}".format(e))
sys.exit(-1)
def create_hmac_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.MAC_GENERATE,
enums.CryptographicUsageMask.MAC_VERIFY
]
)
logger.debug("Successfully created a new HMAC key.")
logger.debug("HMAC Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create hmac key. Error: {}".format(e))
sys.exit(-1)
# Activate the HMAC key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the HMAC key.")
return key_id
except Exception as e:
logger.error("Unable to activate hmac key. Error: {}".format(e))
sys.exit(-1)
def encrypt(data):
try:
data = data.encode('UTF-8')
key_id = create_encryption_key()
iv = secrets.token_bytes(16)
cipher_text, autogenerated_iv = client.encrypt(
data,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
hmac_key_id, hmac = client.mac(
key_id.encode() + iv + cipher_text,
uid = create_hmac_key(),
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
logger.debug("Successfully encrypted the data.")
array = dict()
array['version'] = 1
array['cipher_key_id'] = key_id
array['cipher_text'] = base64.b64encode(cipher_text).decode()
array['iv'] = base64.b64encode(iv).decode()
array['hmac_key_id'] = hmac_key_id
array['hmac'] = base64.b64encode(hmac).decode()
logger.debug("Dict of info: {}".format(array))
array_json = json.dumps(array)
array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode()
return array_json_b64
except Exception as e:
logger.error("Unable to encrypt data. Error: {}".format(e))
sys.exit(-1)
def decrypt(data):
array_json = base64.b64decode(data)
array = json.loads(array_json)
if array['version'] == 1:
return decrypt_v1(array)
else:
logger.error("Unable to detemine encryption version.")
return False
def decrypt_v1(array):
try:
logger.debug("Dict of info: {}".format(array))
key_id = array['cipher_key_id']
iv = base64.b64decode(array['iv'])
cipher_text = base64.b64decode(array['cipher_text'])
hmac_key_id = array['hmac_key_id']
hmac = base64.b64decode(array['hmac'])
hmac_key_id_test, hmac_test = client.mac(
key_id.encode() + iv + cipher_text,
uid = hmac_key_id,
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
if pyhmac.compare_digest(hmac, hmac_test):
logger.debug("HMAC matches.")
else:
logger.error("HMAC does not match, data is corrupted/tampered.")
sys.exit(-1)
plain_text = client.decrypt(
cipher_text,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
logger.debug("Successfully decrypted the data.")
plain_text = plain_text.decode('utf-8')
return plain_text
except Exception as e:
logger.error("Unable to decrypt data. Error: {}".format(e))
sys.exit(-1)
def new_pool_details():
print("Requesting pool details to add to config.")
pool_name = input("Please enter pool name: ")
while True:
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
if pool_passphrase == pool_passphrase_confirm:
break
else:
print("The pool encryption passphrases do not match, please try again.")
array = dict()
array[pool_name] = dict()
array[pool_name]["pool_passphrase"] = pool_passphrase
return array
def edit_pool_details(pools, pool_to_edit):
if ask_for_confirmation("Would you like to edit the pool name?\nCurrent Value: {}".format(pool_to_edit)):
pool_name = input("Please enter pool name: ")
else:
pool_name = pool_to_edit
if ask_for_confirmation("Would you like to edit the pool encryption passphraset?"):
while True:
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
if pool_passphrase == pool_passphrase_confirm:
break
else:
print("The pool encryption passphrases do not match, please try again.")
else:
pool_passphrase = pools[pool_to_edit]['pool_passphrase']
array = dict()
array[pool_name] = dict()
array[pool_name]["pool_passphrase"] = pool_passphrase
return array
def select_pool(pools, wording = "edit"):
print("Which pool would you like to {}:".format(wording))
print(" ")
pool_names = list(pools)
for i in range(0, len(pools)):
pretty_number = i + 1
print("{}) {}".format(pretty_number, pool_names[i]))
print(" ")
while True:
pool_to_modify = int(input("Please enter number relating to the pool you wish to {}: ".format(wording))) - 1
try:
return pool_names[pool_to_modify]
except IndexError as error:
print("you entered a number out of range, please try again")
if __name__ == '__main__':
# Build and parse arguments
parser = argparse.ArgumentParser(
usage="%prog [options]",
description="Run Truenas Pool Unlcok opteration. This will unlock encrypted pools on truenas.")
parser.add_argument(
"-c",
"--config",
action="store_true",
dest="config",
help="Edit pools configuration."
)
parser.add_argument (
"-v",
"--verbose",
action="store_true",
dest="debug",
help="Output debug/verbose info to the console for troubleshooting."
)
opts = parser.parse_args()
script_directory = os.path.dirname(os.path.realpath(__file__))
script_name = os.path.basename(__file__)
secrets_config_file = os.path.join(script_directory, "secrets.config")
pykmip_client_config_file = os.path.join(script_directory, "conf", "client.conf")
log_file = os.path.join(script_directory, "log.log")
datetime_string = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
if opts.debug:
logger = build_logger(logging.DEBUG)
else:
logger = build_logger(logging.INFO)
client = client.ProxyKmipClient(config_file=pykmip_client_config_file)
client.open()
if opts.config:
while True:
if not os.path.exists(secrets_config_file):
print("No pools found, do you want to add a new one?")
print(" ")
print("n) New account")
print("q) Quit config")
while True:
user_input = input("n/q> ")
if user_input.casefold() == "n":
pool_details = dict()
api_key = input("Please enter your API key: ")
pool_details['API Key'] = api_key
pool_details['Pools'] = new_pool_details()
write_config_file(pool_details, secrets_config_file)
break
elif user_input.casefold() == "q":
sys.exit(0)
else:
print("This value must be one of the following characters: n, q.")
pools = read_config_file(secrets_config_file)
print("Current pools:")
print(" ")
for pool in pools['Pools']:
print(pool)
print(" ")
print("a) Edit API Key")
print("e) Edit pool")
print("n) New pool")
print("d) Delete pool")
print("q) Quit config")
while True:
user_input = input("a/e/n/d/q> ")
# Edit API Key
if user_input.casefold() == "a":
api_key = input("Please enter your API key: ")
pools['API Key'] = api_key
write_config_file(pools, secrets_config_file)
# Editing an account
elif user_input.casefold() == "e":
pool_to_edit = select_pool(pools['Pools'])
pool_details = edit_pool_details(pools['Pools'], pool_to_edit)
del pools['Pools'][pool_to_edit]
pools['Pools'].update(pool_details)
write_config_file(pools, secrets_config_file)
break
# Createing a new account
elif user_input.casefold() == "n":
pool_details = new_pool_details()
pools['Pools'].update(pool_details)
write_config_file(pools, secrets_config_file)
break
# Deleting an account
elif user_input.casefold() == "d":
pool_to_delete = select_pool(pools['Pools'], "delete")
if not ask_for_confirmation("Are you sure you wish to delete {} pool? ".format(pool_to_delete)):
break
del pools['Pools'][pool_to_delete]
if len(pools['Pools']) == 0:
# no more accounts, remove secrets file
os.remove(secrets_config_file)
else:
write_config_file(pools, secrets_config_file)
break
# Quit the config
elif user_input.casefold() == "q":
sys.exit(0)
# Catch all for non-valid characters
else:
print("This value must be one of the following characters: a, e, n, d, q.")
if not os.path.exists(secrets_config_file):
print("No configuration file found. Please run {} -c to configure your accounts.".format(script_name))
sys.exit(-1)
# run the decryption of the keys and unlock the pool
pools = read_config_file(secrets_config_file)
api_key = pools['API Key']
logger.debug("API Key: {}".format(api_key))
API_POOLS = request('pool', api_key)['response']
API_DATASETS = request('pool/dataset', api_key)['response']
for pool_dataset_name in pools['Pools']:
if pool_dataset_name != 'DEFAULT':
name = pool_dataset_name
pool_passphrase = pools['Pools'][name]['pool_passphrase']
logger.info("Attempting to unlock {} pool.".format(name))
for pool in API_POOLS:
if pool['name'] == name:
# GELI encrypted pool
if pool['encrypt'] == 2:
if pool['is_decrypted'] == False:
logger.info('Pool {} is locked'.format(pool['name']))
response = request('pool/id/{}/unlock'.format(pool['id']), api_key, 'POST', {'passphrase': '{}'.format(pool_passphrase), 'services_restart': ['cifs', 'nfs']})
if response['ok']:
logger.info('Pool {} was unlocked successfully'.format(pool['name']))
else:
logger.error('Pool {} was NOT unlocked successfully'.format(pool['name']))
else:
logger.info('Pool {} is already unlocked'.format(pool['name']))
# start detection of locked zfs dataset
elif pool['encrypt'] == 0:
# loop through the datasets to find the one we are looking for
for dataset in API_DATASETS:
if dataset['name'] == name:
if dataset['locked'] == True:
logger.info('Dataset {} is locked'.format(dataset['name']))
response = request('pool/dataset/unlock',api_key, 'POST', {'id': '{}'.format(dataset['id']), 'unlock_options': {'recursive': True, 'datasets': [{'name': '{}'.format(dataset['name']), 'passphrase': '{}'.format(pool_passphrase)}]}})
if response['ok']:
logger.info('Dataset {} was unlocked successfully'.format(dataset['name']))
# restart services since the dataset was unlocked
logger.info('Restarting the NFS service')
response = request('service/restart', api_key, 'POST', {'service': 'nfs'})
logger.info('Restarting the CIFS/SMB service')
response = request('service/restart', api_key, 'POST', {'service': 'cifs'})
else:
logger.error('Dataset {} was NOT unlocked successfully'.format(dataset['name']))
else:
logger.info('Dataset {} is already unlocked'.format(dataset['name']))
#print('pool {0} is decrypted: {1}'.format(pool['name'],pool['is_decrypted']))
# this line should be after the actual unlock on the disk
#logger.debug("Decrypted {0} pool key.".format(name))
# start any jails that are set to start on boot but are most likely on an encrypted pool
API_JAILS = request('jail', api_key)['response']
for jail in API_JAILS:
if jail['boot'] == 1:
logger.info('Jail {} is set to start on boot'.format(jail['id']))
if jail['state'] == 'down':
logger.info('Jail {} is stopped'.format(jail['id']))
logger.info('Starting jail {}'.format(jail['id']))
response = request('jail/start', api_key, 'POST', '{}'.format(jail['id']))
if response['ok']:
logger.info('Started jail {} successfully'.format(jail['id']))
else:
logger.error('Jail {} was NOT started successfully'.format(jail['id']))
else:
logger.info('Jail {} is already started'.format(jail['id']))
client.close()
sys.exit(0)