# library needed import sys import os import subprocess import platform import json import configparser import getpass import logging import secrets import base64 import optparse import hmac as pyhmac from kmip.core import enums from kmip.demos import utils from kmip.pie import client os_detected = platform.system() script_directory = os.path.dirname(os.path.realpath(__file__)) secrets_config_file = os.path.join(script_directory, "secrets.config") pykmip_client_config_file = os.path.join(script_directory, "conf", "client.conf") log_file = os.path.join(script_directory, "log.log") if os_detected == "Windows": bitwarden_cli_executable = os.path.join(script_directory, "lib", "bw.exe") elif os_detected == "Linux": bitwarden_cli_executable = os.path.join(script_directory, "lib", "bw") elif os_detected == "FreeBSD": bitwarden_cli_executable = os.path.join(script_directory, "lib", "bw") else: print("Your OS is not supported. Only Windows and Linux are currently supported.\nDetected OS: {0}".format(os_detected)) sys.exit(1) # INI format, mulitple accounts can be used. #[Account Email Address] #encrypted_api_client_id = #encrypted_api_secret = #encrypted_vault_password = def build_logger(level): logger = logging.getLogger() logger.setLevel(level) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' ) # log to file fileHandler = logging.FileHandler(log_file) fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) # log to console consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(formatter) logger.addHandler(consoleHandler) return logger def write_config_file(array, config_file): logger.debug("Starting to write config file and encrypt contents") logger.debug("Using config file: {}".format(config_file)) logger.debug("Converting config from array to json") array_json = json.dumps(array) logger.debug("Encrypting config json") encrypted_array_json = encrypt(array_json) logger.debug("Attempting to write encrypted config to file") try: f = open(config_file, "w") f.write(encrypted_array_json) f.close() logger.debug("Successfully wrote encrypted config to file") except Exception as e: logger.error("Unable to write encrypted config to file. Error: {}".format(e)) sys.exit(-1) logger.debug("Finshed writing config file and encrypting contents") def read_config_file(config_file): logger.debug("Starting to read config file and decrypt contents") logger.debug("Using config file: {}".format(config_file)) logger.debug("Attempting to read encrypted config from file") try: with open(config_file) as f: config = f.read() logger.debug("Successfully read encrypted config from file") except Exception as e: logger.error("Unable to read encrypted config from file. Error: {}".format(e)) sys.exit(-1) logger.debug("Decrypting config contents") decrypted_array_json = decrypt(config) logger.debug("Convert config from json to array") array = json.loads(decrypted_array_json) logger.debug("Finished reading config file and decrypting contents") return array def ask_for_confirmation(question): logger.debug("Asking user for confirmation") logger.debug("Question: {}".format(question)) print(question) while True: confirmation = input("y/n> ") logger.debug("User answered: {}".format(confirmation)) if confirmation.casefold() == "y": return True elif confirmation.casefold() == "n": return False else: print("This value must be one of the following characters: y, n.") def create_encryption_key(): # Create an encryption key. try: key_id = client.create( enums.CryptographicAlgorithm.AES, 256, cryptographic_usage_mask=[ enums.CryptographicUsageMask.ENCRYPT, enums.CryptographicUsageMask.DECRYPT ] ) logger.debug("Successfully created a new encryption key.") logger.debug("Encryption Key ID: {}".format(key_id)) except Exception as e: logger.error(e) sys.exit(-1) # Activate the encryption key so that it can be used. try: client.activate(key_id) logger.debug("Successfully activated the encryption key.") return key_id except Exception as e: logger.error(e) sys.exit(-1) def create_hmac_key(): # Create an encryption key. try: key_id = client.create( enums.CryptographicAlgorithm.AES, 256, cryptographic_usage_mask=[ enums.CryptographicUsageMask.MAC_GENERATE, enums.CryptographicUsageMask.MAC_VERIFY ] ) logger.debug("Successfully created a new HMAC key.") logger.debug("HMAC Key ID: {}".format(key_id)) except Exception as e: logger.error(e) sys.exit(-1) # Activate the HMAC key so that it can be used. try: client.activate(key_id) logger.debug("Successfully activated the HMAC key.") return key_id except Exception as e: logger.error(e) sys.exit(-1) def encrypt(data): try: data = data.encode('UTF-8') key_id = create_encryption_key() iv = secrets.token_bytes(16) cipher_text, autogenerated_iv = client.encrypt( data, uid=key_id, cryptographic_parameters={ 'cryptographic_algorithm': enums.CryptographicAlgorithm.AES, 'block_cipher_mode': enums.BlockCipherMode.CBC, 'padding_method': enums.PaddingMethod.ANSI_X923 }, iv_counter_nonce=( iv ) ) hmac_key_id, hmac = client.mac( key_id.encode() + iv + cipher_text, uid = create_hmac_key(), algorithm = enums.CryptographicAlgorithm.HMAC_SHA512 ) logger.debug("Successfully encrypted the data.") array = dict() array['version'] = 1 array['cipher_key_id'] = key_id array['cipher_text'] = base64.b64encode(cipher_text).decode() array['iv'] = base64.b64encode(iv).decode() array['hmac_key_id'] = hmac_key_id array['hmac'] = base64.b64encode(hmac).decode() logger.debug("Dict of info: {}".format(array)) array_json = json.dumps(array) array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode() return array_json_b64 except Exception as e: logger.error(e) def decrypt(data): array_json = base64.b64decode(data) array = json.loads(array_json) if array['version'] == 1: return decrypt_v1(array) else: logger.error("Unable to detemine encryption version.") return False def decrypt_v1(array): try: logger.debug("Dict of info: {}".format(array)) key_id = array['cipher_key_id'] iv = base64.b64decode(array['iv']) cipher_text = base64.b64decode(array['cipher_text']) hmac_key_id = array['hmac_key_id'] hmac = base64.b64decode(array['hmac']) hmac_key_id_test, hmac_test = client.mac( key_id.encode() + iv + cipher_text, uid = hmac_key_id, algorithm = enums.CryptographicAlgorithm.HMAC_SHA512 ) if pyhmac.compare_digest(hmac, hmac_test): logger.debug("HMAC matches.") else: logger.error("HMAC does not match, data is corrupted/tampered.") sys.exit(-1) plain_text = client.decrypt( cipher_text, uid=key_id, cryptographic_parameters={ 'cryptographic_algorithm': enums.CryptographicAlgorithm.AES, 'block_cipher_mode': enums.BlockCipherMode.CBC, 'padding_method': enums.PaddingMethod.ANSI_X923 }, iv_counter_nonce=( iv ) ) logger.debug("Successfully decrypted the data.") plain_text = plain_text.decode('utf-8') return plain_text except Exception as e: logger.error(e) def does_file_exist(filepath): return os.path.exists(filepath) def new_account_details(): print("Requesting account details to add to config.") account_email_address = input("Please enter Bitwarden account email address: ") account_api_client_id = input("Please enter Bitwarden account API client ID: ") while True: account_api_secret = getpass.getpass("Please enter Bitwarden account API secret: ") account_api_secret2 = getpass.getpass("Please confirm Bitwarden account API secret: ") if account_api_secret == account_api_secret2: break else: print("The Bitwarden account API secrets do not match, please try again.") while True: account_vault_password = getpass.getpass("Please enter Bitwarden account vault password: ") account_vault_password2 = getpass.getpass("Please confirm Bitwarden account vault password: ") if account_vault_password == account_vault_password2: break else: print("The Bitwarden account vault passwords do not match, please try again.") array = dict() array[account_email_address] = dict() array[account_email_address]["account_api_client_id"] = account_api_client_id array[account_email_address]["account_api_secret"] = account_api_secret array[account_email_address]["account_vault_password"] = account_vault_password return array def edit_account_details(accounts, email): if ask_for_confirmation("Would you like to edit the Bitwarden account email address?\nCurrent Value: {}".format(email)): account_email_address = input("Please enter Bitwarden account email address: ") else: account_email_address = email if ask_for_confirmation("Would you like to edit the Bitwarden account API client ID?\nCurrent Value: {}".format(accounts[email]['account_api_client_id'])): account_api_client_id = input("Please enter Bitwarden account API client ID: ") else: account_api_client_id = accounts[email]['account_api_client_id'] if ask_for_confirmation("Would you like to edit the Bitwarden account API secret?"): while True: account_api_secret = getpass.getpass("Please enter Bitwarden account API secret: ") account_api_secret2 = getpass.getpass("Please confirm Bitwarden account API secret: ") if account_api_secret == account_api_secret2: break else: print("The Bitwarden account API secrets do not match, please try again.") else: account_api_secret = accounts[email]['account_api_secret'] if ask_for_confirmation("Would you like to edit the Bitwarden account vault password?"): while True: account_vault_password = getpass.getpass("Please enter Bitwarden account vault password: ") account_vault_password2 = getpass.getpass("Please confirm Bitwarden account vault password: ") if account_vault_password == account_vault_password2: break else: print("The Bitwarden account vault passwords do not match, please try again.") else: account_vault_password = accounts[email]['account_vault_password'] array = dict() array[account_email_address] = dict() array[account_email_address]["account_api_client_id"] = account_api_client_id array[account_email_address]["account_api_secret"] = account_api_secret array[account_email_address]["account_vault_password"] = account_vault_password return array def select_account(accounts, wording = "edit"): print("Which account would you like to {}:".format(wording)) print(" ") emails = list(accounts) for i in range(0, len(accounts)): pretty_number = i + 1 print("{}) {}".format(pretty_number, emails[i])) print(" ") while True: account_to_modify = int(input("Please enter number relating to the account you wish to {}: ".format(wording))) - 1 try: return emails[account_to_modify] except IndexError as error: print("you entered a number out of range, please try again") if __name__ == "__main__": # Build and parse arguments parser = optparse.OptionParser( usage="%prog [options]", description="Run KMIP client operation") parser.add_option( "-c", "--config", action="store_true", dest="config", help="Edit configuration." ) parser.add_option ( "-v", "--verbose", action="store_true", dest="debug", help="Output debug/verbose info to the console for troubleshooting." ) opts, args = parser.parse_args(sys.argv[1:]) if opts.debug: logger = build_logger(logging.DEBUG) else: logger = build_logger(logging.INFO) client = client.ProxyKmipClient(config_file=pykmip_client_config_file) client.open() #print(encrypt(client, "test")) if opts.config: while True: if not does_file_exist(secrets_config_file): print("No Bitwarden accounts found, do you want to make a new one?") print(" ") print("n) New account") print("q) Quit config") while True: user_input = input("n/q> ") if user_input.casefold() == "n": account_details = new_account_details() write_config_file(account_details, secrets_config_file) break elif user_input.casefold() == "q": sys.exit(0) else: print("This value must be one of the following characters: n, q.") accounts = read_config_file(secrets_config_file) print(accounts) print("Current Bitwarden accounts:") print(" ") for account in accounts: print(account) print(" ") print("e) Edit account") print("n) New account") print("d) Delete account") print("q) Quit config") while True: user_input = input("e/n/d/q> ") # Editing an account if user_input.casefold() == "e": account_to_edit = select_account(accounts) account_details = edit_account_details(accounts, account_to_edit) del accounts[account_to_edit] accounts.update(account_details) write_config_file(accounts, secrets_config_file) break # Createing a new account elif user_input.casefold() == "n": account_details = new_account_details() accounts.update(account_details) write_config_file(accounts, secrets_config_file) break # Deleting an account elif user_input.casefold() == "d": account_to_delete = select_account(accounts, "delete") if not ask_for_confirmation("Are you sure you wish to delete {} account? ".format(account_to_delete)): break del accounts[account_to_delete] if len(accounts) == 0: # no more accounts, remove secrets file os.remove(secrets_config_file) else: write_config_file(accounts, secrets_config_file) break # Quit the config elif user_input.casefold() == "q": sys.exit(0) # Catch all for non-valid characters else: print("This value must be one of the following characters: e, n, d, q.") client.close() sys.exit(0)