Files
bitwardenbackup/bitwardenBackup.py

578 lines
26 KiB
Python

# library needed
import sys
import os
import subprocess
import platform
import json
import getpass
import logging
import secrets
import base64
import argparse
import hmac as pyhmac
import datetime
import time
import shutil
from kmip.core import enums
from kmip.pie import client
def build_logger(level):
logger = logging.getLogger()
logger.setLevel(level)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
# log to file
fileHandler = logging.FileHandler(log_file)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
# log to console
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)
return logger
def write_config_file(array, config_file):
logger.debug("Starting to write config file and encrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Converting config from array to json")
array_json = json.dumps(array)
logger.debug("Encrypting config json")
encrypted_array_json = encrypt(array_json)
logger.debug("Attempting to write encrypted config to file")
try:
f = open(config_file, "w")
f.write(encrypted_array_json)
f.close()
logger.debug("Successfully wrote encrypted config to file")
except Exception as e:
logger.error("Unable to write encrypted config to file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Finshed writing config file and encrypting contents")
def read_config_file(config_file):
logger.debug("Starting to read config file and decrypt contents")
logger.debug("Using config file: {}".format(config_file))
logger.debug("Attempting to read encrypted config from file")
try:
with open(config_file) as f:
config = f.read()
logger.debug("Successfully read encrypted config from file")
except Exception as e:
logger.error("Unable to read encrypted config from file. Error: {}".format(e))
sys.exit(-1)
logger.debug("Decrypting config contents")
decrypted_array_json = decrypt(config)
logger.debug("Convert config from json to array")
array = json.loads(decrypted_array_json)
logger.debug("Finished reading config file and decrypting contents")
return array
def ask_for_confirmation(question):
logger.debug("Asking user for confirmation")
logger.debug("Question: {}".format(question))
print(question)
while True:
confirmation = input("y/n> ")
logger.debug("User answered: {}".format(confirmation))
if confirmation.casefold() == "y":
return True
elif confirmation.casefold() == "n":
return False
else:
print("This value must be one of the following characters: y, n.")
def create_encryption_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
logger.debug("Successfully created a new encryption key.")
logger.debug("Encryption Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create encryption key. Error: {}".format(e))
sys.exit(-1)
# Activate the encryption key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the encryption key.")
return key_id
except Exception as e:
logger.error("Unable to activate encryption key. Error: {}".format(e))
sys.exit(-1)
def create_hmac_key():
# Create an encryption key.
try:
key_id = client.create(
enums.CryptographicAlgorithm.AES,
256,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.MAC_GENERATE,
enums.CryptographicUsageMask.MAC_VERIFY
]
)
logger.debug("Successfully created a new HMAC key.")
logger.debug("HMAC Key ID: {}".format(key_id))
except Exception as e:
logger.error("Unable to create hmac key. Error: {}".format(e))
sys.exit(-1)
# Activate the HMAC key so that it can be used.
try:
client.activate(key_id)
logger.debug("Successfully activated the HMAC key.")
return key_id
except Exception as e:
logger.error("Unable to activate hmac key. Error: {}".format(e))
sys.exit(-1)
def encrypt(data):
try:
data = data.encode('UTF-8')
key_id = create_encryption_key()
iv = secrets.token_bytes(16)
cipher_text, autogenerated_iv = client.encrypt(
data,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
hmac_key_id, hmac = client.mac(
key_id.encode() + iv + cipher_text,
uid = create_hmac_key(),
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
logger.debug("Successfully encrypted the data.")
array = dict()
array['version'] = 1
array['cipher_key_id'] = key_id
array['cipher_text'] = base64.b64encode(cipher_text).decode()
array['iv'] = base64.b64encode(iv).decode()
array['hmac_key_id'] = hmac_key_id
array['hmac'] = base64.b64encode(hmac).decode()
logger.debug("Dict of info: {}".format(array))
array_json = json.dumps(array)
array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode()
return array_json_b64
except Exception as e:
logger.error("Unable to encrypt data. Error: {}".format(e))
sys.exit(-1)
def decrypt(data):
array_json = base64.b64decode(data)
array = json.loads(array_json)
if array['version'] == 1:
return decrypt_v1(array)
else:
logger.error("Unable to detemine encryption version.")
return False
def decrypt_v1(array):
try:
logger.debug("Dict of info: {}".format(array))
key_id = array['cipher_key_id']
iv = base64.b64decode(array['iv'])
cipher_text = base64.b64decode(array['cipher_text'])
hmac_key_id = array['hmac_key_id']
hmac = base64.b64decode(array['hmac'])
hmac_key_id_test, hmac_test = client.mac(
key_id.encode() + iv + cipher_text,
uid = hmac_key_id,
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
)
if pyhmac.compare_digest(hmac, hmac_test):
logger.debug("HMAC matches.")
else:
logger.error("HMAC does not match, data is corrupted/tampered.")
sys.exit(-1)
plain_text = client.decrypt(
cipher_text,
uid=key_id,
cryptographic_parameters={
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.ANSI_X923
},
iv_counter_nonce=(
iv
)
)
logger.debug("Successfully decrypted the data.")
plain_text = plain_text.decode('utf-8')
return plain_text
except Exception as e:
logger.error("Unable to decrypt data. Error: {}".format(e))
sys.exit(-1)
def new_account_details():
print("Requesting account details to add to config.")
account_email_address = input("Please enter Bitwarden account email address: ")
account_api_client_id = input("Please enter Bitwarden account API client ID: ")
while True:
account_api_secret = getpass.getpass("Please enter Bitwarden account API secret: ")
account_api_secret2 = getpass.getpass("Please confirm Bitwarden account API secret: ")
if account_api_secret == account_api_secret2:
break
else:
print("The Bitwarden account API secrets do not match, please try again.")
while True:
account_vault_password = getpass.getpass("Please enter Bitwarden account vault password: ")
account_vault_password2 = getpass.getpass("Please confirm Bitwarden account vault password: ")
if account_vault_password == account_vault_password2:
break
else:
print("The Bitwarden account vault passwords do not match, please try again.")
array = dict()
array[account_email_address] = dict()
array[account_email_address]["account_api_client_id"] = account_api_client_id
array[account_email_address]["account_api_secret"] = account_api_secret
array[account_email_address]["account_vault_password"] = account_vault_password
return array
def edit_account_details(accounts, email):
if ask_for_confirmation("Would you like to edit the Bitwarden account email address?\nCurrent Value: {}".format(email)):
account_email_address = input("Please enter Bitwarden account email address: ")
else:
account_email_address = email
if ask_for_confirmation("Would you like to edit the Bitwarden account API client ID?\nCurrent Value: {}".format(accounts[email]['account_api_client_id'])):
account_api_client_id = input("Please enter Bitwarden account API client ID: ")
else:
account_api_client_id = accounts[email]['account_api_client_id']
if ask_for_confirmation("Would you like to edit the Bitwarden account API secret?"):
while True:
account_api_secret = getpass.getpass("Please enter Bitwarden account API secret: ")
account_api_secret2 = getpass.getpass("Please confirm Bitwarden account API secret: ")
if account_api_secret == account_api_secret2:
break
else:
print("The Bitwarden account API secrets do not match, please try again.")
else:
account_api_secret = accounts[email]['account_api_secret']
if ask_for_confirmation("Would you like to edit the Bitwarden account vault password?"):
while True:
account_vault_password = getpass.getpass("Please enter Bitwarden account vault password: ")
account_vault_password2 = getpass.getpass("Please confirm Bitwarden account vault password: ")
if account_vault_password == account_vault_password2:
break
else:
print("The Bitwarden account vault passwords do not match, please try again.")
else:
account_vault_password = accounts[email]['account_vault_password']
array = dict()
array[account_email_address] = dict()
array[account_email_address]["account_api_client_id"] = account_api_client_id
array[account_email_address]["account_api_secret"] = account_api_secret
array[account_email_address]["account_vault_password"] = account_vault_password
return array
def select_account(accounts, wording = "edit"):
print("Which account would you like to {}:".format(wording))
print(" ")
emails = list(accounts)
for i in range(0, len(accounts)):
pretty_number = i + 1
print("{}) {}".format(pretty_number, emails[i]))
print(" ")
while True:
account_to_modify = int(input("Please enter number relating to the account you wish to {}: ".format(wording))) - 1
try:
return emails[account_to_modify]
except IndexError as error:
print("you entered a number out of range, please try again")
if __name__ == "__main__":
# Build and parse arguments
parser = argparse.ArgumentParser(
usage="{} [options]".format(os.path.basename(__file__)),
description="Run Bitwarden backup opteration. This will produce an encrypted zip/tar with exported CSV, JSON, and attachements.")
parser.add_argument (
"-a",
"--accounts",
nargs="+",
dest="accounts_to_backup",
help="Accounts to backup instead of all accounts."
)
parser.add_argument (
"-c",
"--config",
action="store_true",
dest="config",
help="Edit Bitwarden account configuration."
)
parser.add_argument (
"-t",
"--test",
action="store_true",
dest="test",
help="Test Bitwarden account login and unlock."
)
parser.add_argument (
"-v",
"--verbose",
action="store_true",
dest="debug",
help="Output debug/verbose info to the console for troubleshooting."
)
parser.add_argument (
"--no-encryption",
action="store_true",
dest="no_encrypt",
help="Will only zip up export and will NOT encrypt anything."
)
opts = parser.parse_args()
os_detected = platform.system()
script_directory = os.path.dirname(os.path.realpath(__file__))
working_directory = os.path.join(script_directory, "working")
exports_directory = os.path.join(script_directory, "exports")
script_name = os.path.basename(__file__)
secrets_config_file = os.path.join(script_directory, "secrets.config")
pykmip_client_config_file = os.path.join(script_directory, "conf", "client.conf")
log_file = os.path.join(script_directory, "log.log")
datetime_string = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
if opts.debug:
logger = build_logger(logging.DEBUG)
else:
logger = build_logger(logging.INFO)
if os_detected == "Windows":
bitwarden_cli_executable = "bw.exe"
gpg_executable = "gpg.exe"
secure_delete_executable = "sdelete.exe"
elif os_detected == "Linux":
bitwarden_cli_executable = "bw"
gpg_executable = "gpg"
secure_delete_executable = "srm"
#elif os_detected == "macOS":
# bitwarden_cli_executable = os.path.join(script_directory, "lib", "Bitwarden CLI", "bw_macOS")
else:
print("Your OS is not supported. Only Windows, Linux, and macOS are supported. Those are the only three supported OSes for the Bitwarden CLI.")
print("Detected OS: {0}".format(os_detected))
sys.exit(1)
# check if required programs are installed
if not shutil.which(bitwarden_cli_executable):
print("Bitwarden CLI ({}) cannot be found. Please make sure it is installed and executable.".format(bitwarden_cli_executable))
sys.exit(1)
elif not shutil.which(gpg_executable):
print("GPG ({}) cannot be found. Please make sure it is installed and executable.".format(gpg_executable))
sys.exit(1)
elif not shutil.which(secure_delete_executable):
print("SRM/sdelete.exe ({}) cannot be found. Please make sure it is installed and executable.".format(secure_delete_executable))
sys.exit(1)
client = client.ProxyKmipClient(config_file=pykmip_client_config_file)
client.open()
if opts.config:
while True:
if not os.path.exists(secrets_config_file):
print("No Bitwarden accounts found, do you want to make a new one?")
print(" ")
print("n) New account")
print("q) Quit config")
while True:
user_input = input("n/q> ")
if user_input.casefold() == "n":
account_details = new_account_details()
write_config_file(account_details, secrets_config_file)
break
elif user_input.casefold() == "q":
sys.exit(0)
else:
print("This value must be one of the following characters: n, q.")
accounts = read_config_file(secrets_config_file)
print("Current Bitwarden accounts:")
print(" ")
for account in accounts:
print(account)
print(" ")
print("e) Edit account")
print("n) New account")
print("d) Delete account")
print("q) Quit config")
while True:
user_input = input("e/n/d/q> ")
# Editing an account
if user_input.casefold() == "e":
account_to_edit = select_account(accounts)
account_details = edit_account_details(accounts, account_to_edit)
del accounts[account_to_edit]
accounts.update(account_details)
write_config_file(accounts, secrets_config_file)
break
# Createing a new account
elif user_input.casefold() == "n":
account_details = new_account_details()
accounts.update(account_details)
write_config_file(accounts, secrets_config_file)
break
# Deleting an account
elif user_input.casefold() == "d":
account_to_delete = select_account(accounts, "delete")
if not ask_for_confirmation("Are you sure you wish to delete {} account? ".format(account_to_delete)):
break
del accounts[account_to_delete]
if len(accounts) == 0:
# no more accounts, remove secrets file
os.remove(secrets_config_file)
else:
write_config_file(accounts, secrets_config_file)
break
# Quit the config
elif user_input.casefold() == "q":
sys.exit(0)
# Catch all for non-valid characters
else:
print("This value must be one of the following characters: e, n, d, q.")
#json.loads((subprocess.check_output(['bw.exe','status'])).decode())['status']
if not os.path.exists(secrets_config_file):
print("No configuration file found. Please run {} -c to configure your accounts.".format(script_name))
sys.exit(-1)
accounts = read_config_file(secrets_config_file)
emails_from_config_file = list(accounts)
if opts.accounts_to_backup:
emails_to_backup = list()
for email in opts.accounts_to_backup:
if email in emails_from_config_file:
emails_to_backup.append(email)
if len(emails_to_backup) > 0:
emails = emails_to_backup
else:
logger.error("None of the emails passed in are in the config file.")
sys.exit(1)
else:
emails = emails_from_config_file
logger.debug("Logging out of any Bitwarden account to start fresh")
logger.debug((subprocess.run([bitwarden_cli_executable, 'logout'], capture_output=True).stdout).decode())
for email in emails:
vault_password = accounts[email]['account_vault_password']
os.environ["BW_CLIENTID"] = accounts[email]['account_api_client_id']
os.environ["BW_CLIENTSECRET"] = accounts[email]['account_api_secret']
# login to Bitwarden
logger.info("Trying to login to Bitwarden as {}".format(email))
bitwarden_login_output = subprocess.run([bitwarden_cli_executable, 'login', '--apikey', '--raw'], capture_output=True)
logger.debug((bitwarden_login_output.stdout).decode())
bitwarden_status = json.loads(((subprocess.run([bitwarden_cli_executable, 'status'], capture_output=True)).stdout).decode())
logger.debug("Bitwarden Status: {}".format(bitwarden_status))
if bitwarden_status['status'] == "locked":
logger.info("Successfully Logged in")
bitwarden_unlock_output = subprocess.run([bitwarden_cli_executable, 'unlock', vault_password, '--raw', '--nointeraction'], capture_output=True)
bitwarden_session_key = (bitwarden_unlock_output.stdout).decode()
if bitwarden_session_key:
# logger.debug("Session key: {}".format(bitwarden_session_key))
logger.info("Successfully unlocked vault")
if not opts.test:
os.environ["BW_SESSION"] = bitwarden_session_key
# export to csv and json
logger.info("Exporting vault to both CSV and JSON files")
logger.debug("Exporting vault to CSV")
file_name = 'Bitwarden {} Export {}'.format(email, datetime_string)
logger.debug((subprocess.run([bitwarden_cli_executable, 'export', '--output', os.path.join(working_directory, '{}.csv'.format(file_name)) , '--format', 'csv'], capture_output=True).stdout).decode())
time.sleep(1)
logger.debug("Exporting vault to JSON")
logger.debug((subprocess.run([bitwarden_cli_executable, 'export', '--output', os.path.join(working_directory, '{}.json'.format(file_name)), '--format', 'json'], capture_output=True).stdout).decode())
time.sleep(1)
# looking for Organizations
# look for organizations
logger.info("Looking for Organizations")
bitwarden_organizations = json.loads(((subprocess.run([bitwarden_cli_executable, 'list', 'organizations'], capture_output=True)).stdout).decode())
logger.info("Found {} Organiztaions.".format(len(bitwarden_organizations)))
for organization in bitwarden_organizations:
logger.info("Exporting organization {} vault to both CSV and JSON files".format(organization['name']))
logger.debug("Exporting organization vault to CSV")
file_name = 'Bitwarden Organization {} Export {}'.format(organization['name'], datetime_string)
logger.debug((subprocess.run([bitwarden_cli_executable, 'export', '--organizationid', '{}'.format(organization['id']), '--output', os.path.join(working_directory, '{}.csv'.format(file_name)), '--format', 'csv'], capture_output=True).stdout).decode())
time.sleep(1)
logger.debug("Exporting organization vault to JSON")
logger.debug((subprocess.run([bitwarden_cli_executable, 'export', '--organizationid', '{}'.format(organization['id']), '--output', os.path.join(working_directory, '{}.json'.format(file_name)), '--format', 'json'], capture_output=True).stdout).decode())
time.sleep(1)
logger.info("Downlading attachments...")
bitwarden_items = json.loads(((subprocess.run([bitwarden_cli_executable, 'list', 'items'], capture_output=True)).stdout).decode())
for item in bitwarden_items:
logger.debug("Working on item {} ({})".format(item['name'], item['id']))
if "attachments" in item:
logger.debug("Found {} attachments".format(len(item['attachments'])))
attachment_folder_name = os.path.join(working_directory, "attachments", item['name'])
for attachment in item['attachments']:
logger.debug("Downloading attachment ({}) with name {} to folder {}".format(attachment['id'], attachment['fileName'], attachment_folder_name))
logger.info((subprocess.run([bitwarden_cli_executable, 'get', 'attachment', attachment['id'], '--itemid', item['id'], '--output', os.path.join(attachment_folder_name, attachment['fileName'])], capture_output=True).stdout).decode())
time.sleep(1)
else:
logger.debug("Item has no attachments")
logger.info("Done downloading attachments")
logger.info("Zipping everything together...")
zip_filename = os.path.join(exports_directory, email,"Bitwarden Backup {} {}".format(email, datetime_string))
shutil.make_archive(zip_filename, format="zip", root_dir=working_directory)
if not opts.no_encrypt:
logger.info("Encrypting zip file...")
logger.debug((subprocess.run([gpg_executable, '--no-options', '--batch', '--passphrase', vault_password, '--symmetric', '--cipher-algo', 'AES256', '--digest-algo', 'SHA512', '--compression-algo', 'Uncompressed', '--output', zip_filename + '.zip.gpg', zip_filename + '.zip'], capture_output=True).stdout).decode())
else:
logger.warning("You passed in --no-encryption option, not encrypting zip file")
else:
logger.error((bitwarden_unlock_output.stderr).decode())
else:
logger.error("Unable to login to account, please check API credentials")
#logger.error((bitwarden_login_output.stderr).decode())
if not opts.test:
logger.info("Securely deleting files")
if os_detected == "Windows":
# sdelete.exe .\working\ -p 5 -s
logger.debug(
(subprocess.run([secure_delete_executable, '-p', '5', '-s', working_directory], capture_output=True).stdout).decode())
if not opts.no_encrypt:
logger.debug((subprocess.run([secure_delete_executable, '-p', '5', zip_filename + ".zip"], capture_output=True).stdout).decode())
elif os_detected == "Linux":
logger.debug((subprocess.run([secure_delete_executable, '-r', '-l', '-z', '-v', working_directory], capture_output=True).stdout).decode())
if not opts.no_encrypt:
logger.debug((subprocess.run([secure_delete_executable, '-r', '-l', '-z', '-v', zip_filename + '.zip'], capture_output=True).stdout).decode())
del vault_password
del os.environ['BW_CLIENTID']
del os.environ['BW_CLIENTSECRET']
if "BW_SESSION" in os.environ:
del os.environ["BW_SESSION"]
logger.info("Logging out of your Bitwarden account")
logger.debug((subprocess.run([bitwarden_cli_executable, 'logout'], capture_output=True).stdout).decode())
del accounts
del emails_from_config_file
client.close()
sys.exit(0)