408 lines
16 KiB
Python
408 lines
16 KiB
Python
# library needed
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import platform
|
|
import json
|
|
import configparser
|
|
import getpass
|
|
import logging
|
|
import secrets
|
|
import base64
|
|
import optparse
|
|
import hmac as pyhmac
|
|
from kmip.core import enums
|
|
from kmip.demos import utils
|
|
from kmip.pie import client
|
|
|
|
os_detected = platform.system()
|
|
script_directory = os.path.dirname(os.path.realpath(__file__))
|
|
secrets_ini_file = os.path.join(script_directory, "secrets.ini")
|
|
pykmip_client_config_file = os.path.join(script_directory, "conf", "client.conf")
|
|
log_file = os.path.join(script_directory, "log.log")
|
|
|
|
if os_detected == "Windows":
|
|
bitwarden_cli_executable = os.path.join(script_directory, "lib", "bw.exe")
|
|
elif os_detected == "Linux":
|
|
bitwarden_cli_executable = os.path.join(script_directory, "lib", "bw")
|
|
elif os_detected == "FreeBSD":
|
|
bitwarden_cli_executable = os.path.join(script_directory, "lib", "bw")
|
|
else:
|
|
print("Your OS is not supported. Only Windows and Linux are currently supported.\nDetected OS: {0}".format(os_detected))
|
|
sys.exit(1)
|
|
|
|
# INI format, mulitple accounts can be used.
|
|
#[Account Email Address]
|
|
#encrypted_api_client_id =
|
|
#encrypted_api_secret =
|
|
#encrypted_vault_password =
|
|
|
|
def build_logger(level):
|
|
logger = logging.getLogger()
|
|
logger.setLevel(level)
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
# log to file
|
|
fileHandler = logging.FileHandler(log_file)
|
|
fileHandler.setFormatter(formatter)
|
|
logger.addHandler(fileHandler)
|
|
# log to console
|
|
consoleHandler = logging.StreamHandler()
|
|
consoleHandler.setFormatter(formatter)
|
|
logger.addHandler(consoleHandler)
|
|
|
|
return logger
|
|
|
|
def read_config_file(config_file):
|
|
logger.debug("Starting to read config file and decrypt contents")
|
|
logger.debug("Using config file: {}".format(config_file))
|
|
config = configparser.ConfigParser()
|
|
config.read(config_file)
|
|
sections = config.sections()
|
|
logger.debug("Found following sections...")
|
|
accounts = dict()
|
|
for section in sections:
|
|
email = decrypt(client, section)
|
|
logger.debug("Sections: {}".format(email))
|
|
logger.debug("Found the following key under sectcion...")
|
|
accounts[email] = dict()
|
|
for key in config[section]:
|
|
logger.debug("Found the following key under sectcion...(Values will not be displayed)")
|
|
logger.debug("Key: {}".format(key))
|
|
accounts[email][key] = decrypt(client, config[section][key])
|
|
logger.debug("Finished reading config file and decrypting contents")
|
|
return accounts
|
|
|
|
def create_encryption_key(client):
|
|
# Create an encryption key.
|
|
try:
|
|
key_id = client.create(
|
|
enums.CryptographicAlgorithm.AES,
|
|
256,
|
|
cryptographic_usage_mask=[
|
|
enums.CryptographicUsageMask.ENCRYPT,
|
|
enums.CryptographicUsageMask.DECRYPT
|
|
]
|
|
)
|
|
logger.debug("Successfully created a new encryption key.")
|
|
logger.debug("Encryption Key ID: {}".format(key_id))
|
|
except Exception as e:
|
|
logger.error(e)
|
|
sys.exit(-1)
|
|
|
|
# Activate the encryption key so that it can be used.
|
|
try:
|
|
client.activate(key_id)
|
|
logger.debug("Successfully activated the encryption key.")
|
|
return key_id
|
|
except Exception as e:
|
|
logger.error(e)
|
|
sys.exit(-1)
|
|
|
|
def create_hmac_key(client):
|
|
# Create an encryption key.
|
|
try:
|
|
key_id = client.create(
|
|
enums.CryptographicAlgorithm.AES,
|
|
256,
|
|
cryptographic_usage_mask=[
|
|
enums.CryptographicUsageMask.MAC_GENERATE,
|
|
enums.CryptographicUsageMask.MAC_VERIFY
|
|
]
|
|
)
|
|
logger.debug("Successfully created a new HMAC key.")
|
|
logger.debug("HMAC Key ID: {}".format(key_id))
|
|
except Exception as e:
|
|
logger.error(e)
|
|
sys.exit(-1)
|
|
|
|
# Activate the HMAC key so that it can be used.
|
|
try:
|
|
client.activate(key_id)
|
|
logger.debug("Successfully activated the HMAC key.")
|
|
return key_id
|
|
except Exception as e:
|
|
logger.error(e)
|
|
sys.exit(-1)
|
|
|
|
def encrypt(client, data):
|
|
try:
|
|
data = data.encode('UTF-8')
|
|
key_id = create_encryption_key(client)
|
|
iv = secrets.token_bytes(16)
|
|
cipher_text, autogenerated_iv = client.encrypt(
|
|
data,
|
|
uid=key_id,
|
|
cryptographic_parameters={
|
|
'cryptographic_algorithm':
|
|
enums.CryptographicAlgorithm.AES,
|
|
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
|
'padding_method': enums.PaddingMethod.ANSI_X923
|
|
},
|
|
iv_counter_nonce=(
|
|
iv
|
|
)
|
|
)
|
|
hmac_key_id, hmac = client.mac(
|
|
key_id.encode() + iv + cipher_text,
|
|
uid = create_hmac_key(client),
|
|
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
|
|
)
|
|
logger.debug("Successfully encrypted the data.")
|
|
array = dict()
|
|
array['version'] = 1
|
|
array['cipher_key_id'] = key_id
|
|
array['cipher_text'] = base64.b64encode(cipher_text).decode()
|
|
array['iv'] = base64.b64encode(iv).decode()
|
|
array['hmac_key_id'] = hmac_key_id
|
|
array['hmac'] = base64.b64encode(hmac).decode()
|
|
logger.debug("Dict of info: {}".format(array))
|
|
array_json = json.dumps(array)
|
|
array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode()
|
|
return array_json_b64
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
|
|
def decrypt(client, data):
|
|
array_json = base64.b64decode(data)
|
|
array = json.loads(array_json)
|
|
if array['version'] == 1:
|
|
decrypt_v1(client, array)
|
|
else:
|
|
logger.error("Unable to detemine encryption version.")
|
|
|
|
def decrypt_v1(client, array):
|
|
try:
|
|
logger.debug("Dict of info: {}".format(array))
|
|
key_id = array['cipher_key_id']
|
|
iv = base64.b64decode(array['iv'])
|
|
cipher_text = base64.b64decode(array['cipher_text'])
|
|
hmac_key_id = array['hmac_key_id']
|
|
hmac = base64.b64decode(array['hmac'])
|
|
hmac_key_id_test, hmac_test = client.mac(
|
|
key_id.encode() + iv + cipher_text,
|
|
uid = hmac_key_id,
|
|
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
|
|
)
|
|
if pyhmac.compare_digest(hmac, hmac_test):
|
|
logger.debug("HMAC matches.")
|
|
else:
|
|
logger.error("HMAC does not match, data is corrupted/tampered.")
|
|
sys.exit(-1)
|
|
plain_text = client.decrypt(
|
|
cipher_text,
|
|
uid=key_id,
|
|
cryptographic_parameters={
|
|
'cryptographic_algorithm':
|
|
enums.CryptographicAlgorithm.AES,
|
|
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
|
'padding_method': enums.PaddingMethod.ANSI_X923
|
|
},
|
|
iv_counter_nonce=(
|
|
iv
|
|
)
|
|
)
|
|
logger.debug("Successfully decrypted the data.")
|
|
plain_text = plain_text.decode('utf-8')
|
|
return plain_text
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
def does_file_exist(filepath):
|
|
return os.path.exists(filepath)
|
|
|
|
def ask_for_account_details():
|
|
print("Requesting account details to build the ini file.")
|
|
account_email_address = input("Please enter Bitwarden account email address: ")
|
|
encrypted_account_email_address = encrypt(client, account_email_address)
|
|
account_api_client_id = input("Please enter Bitwarden account API client ID: ")
|
|
encrypted_account_api_client_id = encrypt(client, account_api_client_id)
|
|
while True:
|
|
account_api_secret = getpass.getpass("Please enter Bitwarden account API secret: ")
|
|
account_api_secret2 = getpass.getpass("Please confirm Bitwarden account API secret: ")
|
|
if account_api_secret == account_api_secret2:
|
|
encrypted_account_api_secret = encrypt(client, account_api_secret)
|
|
break
|
|
else:
|
|
print("The Bitwarden account API secrets do not match, please try again.")
|
|
while True:
|
|
account_vault_password = getpass.getpass("Please enter Bitwarden account vault password: ")
|
|
account_vault_password2 = getpass.getpass("Please confirm Bitwarden account vault password: ")
|
|
if account_vault_password == account_vault_password2:
|
|
encrypted_account_vault_password = encrypt(client, account_vault_password)
|
|
break
|
|
else:
|
|
print("The Bitwarden account vault passwords do not match, please try again.")
|
|
|
|
array = dict()
|
|
#array["account_email_address"] = account_email_address
|
|
array["encrypted_account_email_address"] = encrypted_account_email_address
|
|
#array["account_api_client_id"] = account_api_client_id
|
|
array["encrypted_account_api_client_id"] = encrypted_account_api_client_id
|
|
#array["account_api_secret"] = account_api_secret
|
|
array["encrypted_account_api_secret"] = encrypted_account_api_secret
|
|
#array["account_vault_password"] = account_vault_password
|
|
array["encrypted_account_vault_password"] = encrypted_account_vault_password
|
|
return array
|
|
|
|
def select_account(accounts, wording = "edit"):
|
|
print("Which account would you like to {}:".format(wording))
|
|
for i in range(0, len(accounts)):
|
|
pretty_number = i + 1
|
|
print("{}) {}".format(pretty_number, accounts[i]))
|
|
while True:
|
|
account_to_edit = int(input("Please enter number relating to the account you wish to {}: ".format(wording))) - 1
|
|
try:
|
|
return accounts[account_to_edit]
|
|
except IndexError as error:
|
|
print("you entered a number out of range, please try again")
|
|
|
|
if __name__ == "__main__":
|
|
# INI config does not exist
|
|
#if not does_file_exist(secrets_ini_file):
|
|
# account_details = ask_for_account_details()
|
|
# config = configparser.ConfigParser()
|
|
# config[account_details["account_email_address"]] = {}
|
|
# config[account_details["account_email_address"]]["account_api_client_id"] = account_details["account_api_client_id"]
|
|
# config[account_details["account_email_address"]]["account_api_secret"] = account_details["account_api_secret"]
|
|
# config[account_details["account_email_address"]]["account_vault_password"] = account_details["account_vault_password"]
|
|
# with open(secrets_ini_file, "w") as configfile:
|
|
# config.write(configfile)
|
|
|
|
#config = configparser.ConfigParser()
|
|
#config.read(secrets_ini_file)
|
|
#accounts = config.sections()
|
|
# decrypt all values for easy update a
|
|
|
|
# Build and parse arguments
|
|
parser = optparse.OptionParser(
|
|
usage="%prog [options]",
|
|
description="Run KMIP client operation")
|
|
|
|
parser.add_option(
|
|
"-c",
|
|
"--config",
|
|
action="store_true",
|
|
dest="config",
|
|
help="Edit configuration."
|
|
)
|
|
parser.add_option(
|
|
"-d",
|
|
"--decrypt",
|
|
action="store_true",
|
|
dest="decrypt",
|
|
)
|
|
parser.add_option(
|
|
"-m",
|
|
"--message",
|
|
action="store",
|
|
type="str",
|
|
default="This is a secret message.",
|
|
dest="message",
|
|
help="The message to encrypt."
|
|
)
|
|
parser.add_option (
|
|
"-v",
|
|
"--verbose",
|
|
action="store_true",
|
|
dest="debug",
|
|
help="Output debug/verbose info to the console for troubleshooting."
|
|
)
|
|
opts, args = parser.parse_args(sys.argv[1:])
|
|
if opts.debug:
|
|
logger = build_logger(logging.DEBUG)
|
|
else:
|
|
logger = build_logger(logging.INFO)
|
|
print(pykmip_client_config_file)
|
|
client = client.ProxyKmipClient(config_file=pykmip_client_config_file)
|
|
client.open()
|
|
#print(encrypt(client, "test"))
|
|
|
|
if opts.decrypt:
|
|
print(decrypt(client, opts.message))
|
|
sys.exit(0)
|
|
|
|
|
|
if opts.config:
|
|
if not does_file_exist(secrets_ini_file):
|
|
print("No Bitwarden accounts found, do you want to make a new one?")
|
|
print("n) New account")
|
|
print("q) Quit config")
|
|
while True:
|
|
user_input = input("n/q> ")
|
|
if user_input.casefold() == "n":
|
|
account_details = ask_for_account_details()
|
|
config = configparser.ConfigParser()
|
|
config.add_section(account_details["encrypted_account_email_address"])
|
|
for key in account_details.keys():
|
|
if not key == "encrypted_account_email_address":
|
|
config.set(account_details["encrypted_account_email_address"], key, account_details[key])
|
|
with open(secrets_ini_file, "w") as configfile:
|
|
config.write(configfile)
|
|
break
|
|
elif user_input.casefold() == "q":
|
|
sys.exit(0)
|
|
else:
|
|
print("This value must be one of the following characters: n, q.")
|
|
while True:
|
|
accounts = read_config_file(secrets_ini_file)
|
|
print(accounts)
|
|
print("Current Bitwarden accounts:")
|
|
print(" ")
|
|
for account in accounts:
|
|
print(account)
|
|
print(" ")
|
|
print("e) Edit account")
|
|
print("n) New account")
|
|
print("d) Delete account")
|
|
print("q) Quit config")
|
|
while True:
|
|
user_input = input("e/n/d/q> ")
|
|
if user_input.casefold() == "e":
|
|
account_section_to_edit = select_account(accounts)
|
|
|
|
elif user_input.casefold() == "n":
|
|
account_details = ask_for_account_details()
|
|
config = configparser.ConfigParser()
|
|
config.add_section(account_details["encrypted_account_email_address"])
|
|
for key in account_details.keys():
|
|
if not key == "encrypted_account_email_address":
|
|
config.set(account_details["encrypted_account_email_address"], key, account_details[key])
|
|
with open(secrets_ini_file, "w") as configfile:
|
|
config.write(configfile)
|
|
break
|
|
|
|
elif user_input.casefold() == "d":
|
|
config = configparser.ConfigParser()
|
|
account_section_to_delete = select_account(accounts, "delete")
|
|
print("Are you sure you wish to delete {} account? ".format(account_section_to_delete))
|
|
confirmation = input("y/n> ")
|
|
if not confirmation.casefold() in ["y","yes"]:
|
|
break
|
|
config.remove_section(account_section_to_delete)
|
|
with open(secrets_ini_file, "w") as configfile:
|
|
config.write(configfile)
|
|
break
|
|
|
|
elif user_input.casefold() == "q":
|
|
sys.exit(0)
|
|
|
|
else:
|
|
print("This value must be one of the following characters: e, n, d, q.")
|
|
client.close()
|
|
sys.exit(0)
|
|
|
|
print("Which account would you like to edit:")
|
|
for i in range(0, len(accounts)):
|
|
pretty_number = i + 1
|
|
print("{}) {}".format(pretty_number, accounts[i]))
|
|
while True:
|
|
account_to_edit = int(input("Please enter number relating to the account you wish to edit: ")) - 1
|
|
try:
|
|
print("selected account: {}".format(accounts[account_to_edit]))
|
|
break
|
|
except IndexError as error:
|
|
print("you entered a number out of range, please try again")
|