Pools variable name doesn't make sense since the API key is also in there. Better to call it config as it contains the config
488 lines
20 KiB
Python
488 lines
20 KiB
Python
#!/usr/bin/env python
|
|
|
|
import os
|
|
import binascii
|
|
import logging
|
|
import sys
|
|
import secrets
|
|
import base64
|
|
import argparse
|
|
import configparser
|
|
import getpass
|
|
import requests
|
|
import platform
|
|
import subprocess
|
|
import time
|
|
import simplejson as json
|
|
import hmac as pyhmac
|
|
import datetime
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
from kmip.core import enums
|
|
from kmip.demos import utils
|
|
from kmip.pie import client
|
|
|
|
|
|
def request(resource, api_key, method='GET', data=None):
|
|
if data is None:
|
|
data = ''
|
|
else:
|
|
data = json.dumps(data)
|
|
url = 'https://127.0.0.1/api/v2.0/{}'.format(resource)
|
|
logger.debug('Request URL: {}'.format(url))
|
|
logger.debug('Request Data: {}'.format(data))
|
|
r = requests.request(
|
|
method,
|
|
url,
|
|
data=data,
|
|
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(api_key)},
|
|
verify=False
|
|
)
|
|
logger.debug('Request Status Code: {}'.format(r.status_code))
|
|
if r.ok:
|
|
try:
|
|
logger.debug('Request Returned JSON: {}'.format(r.json()))
|
|
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.json()}
|
|
except:
|
|
logger.debug('Request Returned Text: {}'.format(r.text))
|
|
return {'ok': r.ok, 'status_code': r.status_code, 'response': r.text}
|
|
raise ValueError(r)
|
|
|
|
def build_logger(level):
|
|
logger = logging.getLogger()
|
|
logger.setLevel(level)
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
# log to file
|
|
#fileHandler = logging.FileHandler(log_file)
|
|
#fileHandler.setFormatter(formatter)
|
|
#logger.addHandler(fileHandler)
|
|
# log to console
|
|
consoleHandler = logging.StreamHandler()
|
|
consoleHandler.setFormatter(formatter)
|
|
logger.addHandler(consoleHandler)
|
|
|
|
return logger
|
|
|
|
def write_config_file(array, config_file):
|
|
logger.debug("Starting to write config file and encrypt contents")
|
|
logger.debug("Using config file: {}".format(config_file))
|
|
logger.debug("Converting config from array to json")
|
|
array_json = json.dumps(array)
|
|
logger.debug("Encrypting config json")
|
|
encrypted_array_json = encrypt(array_json)
|
|
logger.debug("Attempting to write encrypted config to file")
|
|
try:
|
|
f = open(config_file, "w")
|
|
#f.write(array_json)
|
|
f.write(encrypted_array_json)
|
|
f.close()
|
|
logger.debug("Successfully wrote encrypted config to file")
|
|
except Exception as e:
|
|
logger.error("Unable to write encrypted config to file. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
logger.debug("Finshed writing config file and encrypting contents")
|
|
|
|
def read_config_file(config_file):
|
|
logger.debug("Starting to read config file and decrypt contents")
|
|
logger.debug("Using config file: {}".format(config_file))
|
|
logger.debug("Attempting to read encrypted config from file")
|
|
try:
|
|
with open(config_file) as f:
|
|
config = f.read()
|
|
logger.debug("Successfully read encrypted config from file")
|
|
except Exception as e:
|
|
logger.error("Unable to read encrypted config from file. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
logger.debug("Decrypting config contents")
|
|
decrypted_array_json = decrypt(config)
|
|
logger.debug("Convert config from json to array")
|
|
#array = json.loads(config)
|
|
array = json.loads(decrypted_array_json)
|
|
logger.debug("Finished reading config file and decrypting contents")
|
|
return array
|
|
|
|
def ask_for_confirmation(question):
|
|
logger.debug("Asking user for confirmation")
|
|
logger.debug("Question: {}".format(question))
|
|
print(question)
|
|
while True:
|
|
confirmation = input("y/n> ")
|
|
logger.debug("User answered: {}".format(confirmation))
|
|
if confirmation.casefold() == "y":
|
|
return True
|
|
elif confirmation.casefold() == "n":
|
|
return False
|
|
else:
|
|
print("This value must be one of the following characters: y, n.")
|
|
|
|
def create_encryption_key():
|
|
# Create an encryption key.
|
|
try:
|
|
key_id = client.create(
|
|
enums.CryptographicAlgorithm.AES,
|
|
256,
|
|
cryptographic_usage_mask=[
|
|
enums.CryptographicUsageMask.ENCRYPT,
|
|
enums.CryptographicUsageMask.DECRYPT
|
|
]
|
|
)
|
|
logger.debug("Successfully created a new encryption key.")
|
|
logger.debug("Encryption Key ID: {}".format(key_id))
|
|
except Exception as e:
|
|
logger.error("Unable to create encryption key. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
|
|
# Activate the encryption key so that it can be used.
|
|
try:
|
|
client.activate(key_id)
|
|
logger.debug("Successfully activated the encryption key.")
|
|
return key_id
|
|
except Exception as e:
|
|
logger.error("Unable to activate encryption key. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
|
|
def create_hmac_key():
|
|
# Create an encryption key.
|
|
try:
|
|
key_id = client.create(
|
|
enums.CryptographicAlgorithm.AES,
|
|
256,
|
|
cryptographic_usage_mask=[
|
|
enums.CryptographicUsageMask.MAC_GENERATE,
|
|
enums.CryptographicUsageMask.MAC_VERIFY
|
|
]
|
|
)
|
|
logger.debug("Successfully created a new HMAC key.")
|
|
logger.debug("HMAC Key ID: {}".format(key_id))
|
|
except Exception as e:
|
|
logger.error("Unable to create hmac key. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
|
|
# Activate the HMAC key so that it can be used.
|
|
try:
|
|
client.activate(key_id)
|
|
logger.debug("Successfully activated the HMAC key.")
|
|
return key_id
|
|
except Exception as e:
|
|
logger.error("Unable to activate hmac key. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
|
|
def encrypt(data):
|
|
try:
|
|
data = data.encode('UTF-8')
|
|
key_id = create_encryption_key()
|
|
iv = secrets.token_bytes(16)
|
|
cipher_text, autogenerated_iv = client.encrypt(
|
|
data,
|
|
uid=key_id,
|
|
cryptographic_parameters={
|
|
'cryptographic_algorithm':
|
|
enums.CryptographicAlgorithm.AES,
|
|
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
|
'padding_method': enums.PaddingMethod.ANSI_X923
|
|
},
|
|
iv_counter_nonce=(
|
|
iv
|
|
)
|
|
)
|
|
hmac_key_id, hmac = client.mac(
|
|
key_id.encode() + iv + cipher_text,
|
|
uid = create_hmac_key(),
|
|
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
|
|
)
|
|
logger.debug("Successfully encrypted the data.")
|
|
array = dict()
|
|
array['version'] = 1
|
|
array['cipher_key_id'] = key_id
|
|
array['cipher_text'] = base64.b64encode(cipher_text).decode()
|
|
array['iv'] = base64.b64encode(iv).decode()
|
|
array['hmac_key_id'] = hmac_key_id
|
|
array['hmac'] = base64.b64encode(hmac).decode()
|
|
logger.debug("Dict of info: {}".format(array))
|
|
array_json = json.dumps(array)
|
|
array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode()
|
|
return array_json_b64
|
|
except Exception as e:
|
|
logger.error("Unable to encrypt data. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
|
|
def decrypt(data):
|
|
array_json = base64.b64decode(data)
|
|
array = json.loads(array_json)
|
|
if array['version'] == 1:
|
|
return decrypt_v1(array)
|
|
else:
|
|
logger.error("Unable to detemine encryption version.")
|
|
return False
|
|
|
|
def decrypt_v1(array):
|
|
try:
|
|
logger.debug("Dict of info: {}".format(array))
|
|
key_id = array['cipher_key_id']
|
|
iv = base64.b64decode(array['iv'])
|
|
cipher_text = base64.b64decode(array['cipher_text'])
|
|
hmac_key_id = array['hmac_key_id']
|
|
hmac = base64.b64decode(array['hmac'])
|
|
hmac_key_id_test, hmac_test = client.mac(
|
|
key_id.encode() + iv + cipher_text,
|
|
uid = hmac_key_id,
|
|
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
|
|
)
|
|
if pyhmac.compare_digest(hmac, hmac_test):
|
|
logger.debug("HMAC matches.")
|
|
else:
|
|
logger.error("HMAC does not match, data is corrupted/tampered.")
|
|
sys.exit(-1)
|
|
plain_text = client.decrypt(
|
|
cipher_text,
|
|
uid=key_id,
|
|
cryptographic_parameters={
|
|
'cryptographic_algorithm':
|
|
enums.CryptographicAlgorithm.AES,
|
|
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
|
'padding_method': enums.PaddingMethod.ANSI_X923
|
|
},
|
|
iv_counter_nonce=(
|
|
iv
|
|
)
|
|
)
|
|
logger.debug("Successfully decrypted the data.")
|
|
plain_text = plain_text.decode('utf-8')
|
|
return plain_text
|
|
except Exception as e:
|
|
logger.error("Unable to decrypt data. Error: {}".format(e))
|
|
sys.exit(-1)
|
|
|
|
def new_pool_details():
|
|
print("Requesting pool details to add to config.")
|
|
pool_name = input("Please enter pool name: ")
|
|
while True:
|
|
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
|
|
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
|
|
if pool_passphrase == pool_passphrase_confirm:
|
|
break
|
|
else:
|
|
print("The pool encryption passphrases do not match, please try again.")
|
|
array = dict()
|
|
array[pool_name] = dict()
|
|
array[pool_name]["pool_passphrase"] = pool_passphrase
|
|
return array
|
|
|
|
def edit_pool_details(pools, pool_to_edit):
|
|
if ask_for_confirmation("Would you like to edit the pool name?\nCurrent Value: {}".format(pool_to_edit)):
|
|
pool_name = input("Please enter pool name: ")
|
|
else:
|
|
pool_name = pool_to_edit
|
|
|
|
if ask_for_confirmation("Would you like to edit the pool encryption passphraset?"):
|
|
while True:
|
|
pool_passphrase = getpass.getpass("Please enter pool encryption passphrase: ")
|
|
pool_passphrase_confirm = getpass.getpass("Please confirm pool encryption passphrase: ")
|
|
if pool_passphrase == pool_passphrase_confirm:
|
|
break
|
|
else:
|
|
print("The pool encryption passphrases do not match, please try again.")
|
|
else:
|
|
pool_passphrase = pools[pool_to_edit]['pool_passphrase']
|
|
|
|
array = dict()
|
|
array[pool_name] = dict()
|
|
array[pool_name]["pool_passphrase"] = pool_passphrase
|
|
return array
|
|
|
|
def select_pool(pools, wording = "edit"):
|
|
print("Which pool would you like to {}:".format(wording))
|
|
print(" ")
|
|
pool_names = list(pools)
|
|
for i in range(0, len(pools)):
|
|
pretty_number = i + 1
|
|
print("{}) {}".format(pretty_number, pool_names[i]))
|
|
print(" ")
|
|
while True:
|
|
pool_to_modify = int(input("Please enter number relating to the pool you wish to {}: ".format(wording))) - 1
|
|
try:
|
|
return pool_names[pool_to_modify]
|
|
except IndexError as error:
|
|
print("you entered a number out of range, please try again")
|
|
|
|
if __name__ == '__main__':
|
|
# Build and parse arguments
|
|
parser = argparse.ArgumentParser(
|
|
usage="%prog [options]",
|
|
description="Run Truenas Pool Unlcok opteration. This will unlock encrypted pools on truenas.")
|
|
parser.add_argument(
|
|
"-c",
|
|
"--config",
|
|
action="store_true",
|
|
dest="config",
|
|
help="Edit pools configuration."
|
|
)
|
|
parser.add_argument (
|
|
"-v",
|
|
"--verbose",
|
|
action="store_true",
|
|
dest="debug",
|
|
help="Output debug/verbose info to the console for troubleshooting."
|
|
)
|
|
|
|
opts = parser.parse_args()
|
|
|
|
script_directory = os.path.dirname(os.path.realpath(__file__))
|
|
script_name = os.path.basename(__file__)
|
|
secrets_config_file = os.path.join(script_directory, "secrets.config")
|
|
pykmip_client_config_file = os.path.join(script_directory, "conf", "client.conf")
|
|
log_file = os.path.join(script_directory, "log.log")
|
|
datetime_string = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
|
|
if opts.debug:
|
|
logger = build_logger(logging.DEBUG)
|
|
else:
|
|
logger = build_logger(logging.INFO)
|
|
|
|
client = client.ProxyKmipClient(config_file=pykmip_client_config_file)
|
|
client.open()
|
|
|
|
if opts.config:
|
|
while True:
|
|
if not os.path.exists(secrets_config_file):
|
|
print("No pools found, do you want to add a new one?")
|
|
print(" ")
|
|
print("n) New config")
|
|
print("q) Quit config")
|
|
while True:
|
|
user_input = input("n/q> ")
|
|
if user_input.casefold() == "n":
|
|
config = dict()
|
|
api_key = input("Please enter your API key: ")
|
|
config['API Key'] = api_key
|
|
config['Pools'] = new_pool_details()
|
|
write_config_file(config, secrets_config_file)
|
|
break
|
|
elif user_input.casefold() == "q":
|
|
sys.exit(0)
|
|
else:
|
|
print("This value must be one of the following characters: n, q.")
|
|
config = read_config_file(secrets_config_file)
|
|
print("Current pools:")
|
|
print(" ")
|
|
for pool in config['Pools']:
|
|
print(pool)
|
|
print(" ")
|
|
print("a) Edit API Key")
|
|
print("e) Edit pool")
|
|
print("n) New pool")
|
|
print("d) Delete pool")
|
|
print("q) Quit config")
|
|
while True:
|
|
user_input = input("a/e/n/d/q> ")
|
|
# Edit API Key
|
|
if user_input.casefold() == "a":
|
|
api_key = input("Please enter your API key: ")
|
|
config['API Key'] = api_key
|
|
write_config_file(pools, secrets_config_file)
|
|
# Editing an account
|
|
elif user_input.casefold() == "e":
|
|
pool_to_edit = select_pool(config['Pools'])
|
|
pool_details = edit_pool_details(config['Pools'], pool_to_edit)
|
|
del config['Pools'][pool_to_edit]
|
|
config['Pools'].update(pool_details)
|
|
write_config_file(config, secrets_config_file)
|
|
break
|
|
# Createing a new account
|
|
elif user_input.casefold() == "n":
|
|
pool_details = new_pool_details()
|
|
config['Pools'].update(pool_details)
|
|
write_config_fileconfig, secrets_config_file)
|
|
break
|
|
|
|
# Deleting an account
|
|
elif user_input.casefold() == "d":
|
|
pool_to_delete = select_pool(config['Pools'], "delete")
|
|
if not ask_for_confirmation("Are you sure you wish to delete {} pool? ".format(pool_to_delete)):
|
|
break
|
|
del config['Pools'][pool_to_delete]
|
|
if len(config['Pools']) == 0:
|
|
# no more accounts, remove secrets file
|
|
os.remove(secrets_config_file)
|
|
else:
|
|
write_config_file(config, secrets_config_file)
|
|
break
|
|
|
|
# Quit the config
|
|
elif user_input.casefold() == "q":
|
|
sys.exit(0)
|
|
|
|
# Catch all for non-valid characters
|
|
else:
|
|
print("This value must be one of the following characters: a, e, n, d, q.")
|
|
|
|
if not os.path.exists(secrets_config_file):
|
|
print("No configuration file found. Please run {} -c to configure your accounts.".format(script_name))
|
|
sys.exit(-1)
|
|
# run the decryption of the keys and unlock the pool
|
|
config = read_config_file(secrets_config_file)
|
|
api_key = config['API Key']
|
|
logger.debug("API Key: {}".format(api_key))
|
|
API_POOLS = request('pool', api_key)['response']
|
|
API_DATASETS = request('pool/dataset', api_key)['response']
|
|
for pool_dataset_name in config['Pools']:
|
|
if pool_dataset_name != 'DEFAULT':
|
|
name = pool_dataset_name
|
|
pool_passphrase = config['Pools'][name]['pool_passphrase']
|
|
logger.info("Attempting to unlock {} pool.".format(name))
|
|
for pool in API_POOLS:
|
|
if pool['name'] == name:
|
|
# GELI encrypted pool
|
|
if pool['encrypt'] == 2:
|
|
if pool['is_decrypted'] == False:
|
|
logger.info('Pool {} is locked'.format(pool['name']))
|
|
response = request('pool/id/{}/unlock'.format(pool['id']), api_key, 'POST', {'passphrase': '{}'.format(pool_passphrase), 'services_restart': ['cifs', 'nfs']})
|
|
if response['ok']:
|
|
logger.info('Pool {} was unlocked successfully'.format(pool['name']))
|
|
else:
|
|
logger.error('Pool {} was NOT unlocked successfully'.format(pool['name']))
|
|
else:
|
|
logger.info('Pool {} is already unlocked'.format(pool['name']))
|
|
# start detection of locked zfs dataset
|
|
elif pool['encrypt'] == 0:
|
|
# loop through the datasets to find the one we are looking for
|
|
for dataset in API_DATASETS:
|
|
if dataset['name'] == name:
|
|
if dataset['locked'] == True:
|
|
logger.info('Dataset {} is locked'.format(dataset['name']))
|
|
response = request('pool/dataset/unlock',api_key, 'POST', {'id': '{}'.format(dataset['id']), 'unlock_options': {'recursive': True, 'datasets': [{'name': '{}'.format(dataset['name']), 'passphrase': '{}'.format(pool_passphrase)}]}})
|
|
if response['ok']:
|
|
logger.info('Dataset {} was unlocked successfully'.format(dataset['name']))
|
|
# restart services since the dataset was unlocked
|
|
logger.info('Restarting the NFS service')
|
|
response = request('service/restart', api_key, 'POST', {'service': 'nfs'})
|
|
logger.info('Restarting the CIFS/SMB service')
|
|
response = request('service/restart', api_key, 'POST', {'service': 'cifs'})
|
|
else:
|
|
logger.error('Dataset {} was NOT unlocked successfully'.format(dataset['name']))
|
|
else:
|
|
logger.info('Dataset {} is already unlocked'.format(dataset['name']))
|
|
|
|
#print('pool {0} is decrypted: {1}'.format(pool['name'],pool['is_decrypted']))
|
|
# this line should be after the actual unlock on the disk
|
|
#logger.debug("Decrypted {0} pool key.".format(name))
|
|
# start any jails that are set to start on boot but are most likely on an encrypted pool
|
|
API_JAILS = request('jail', api_key)['response']
|
|
for jail in API_JAILS:
|
|
if jail['boot'] == 1:
|
|
logger.info('Jail {} is set to start on boot'.format(jail['id']))
|
|
if jail['state'] == 'down':
|
|
logger.info('Jail {} is stopped'.format(jail['id']))
|
|
logger.info('Starting jail {}'.format(jail['id']))
|
|
response = request('jail/start', api_key, 'POST', '{}'.format(jail['id']))
|
|
if response['ok']:
|
|
logger.info('Started jail {} successfully'.format(jail['id']))
|
|
else:
|
|
logger.error('Jail {} was NOT started successfully'.format(jail['id']))
|
|
else:
|
|
logger.info('Jail {} is already started'.format(jail['id']))
|
|
client.close()
|
|
sys.exit(0) |