added pykmip functions
This commit is contained in:
@@ -6,6 +6,9 @@ import platform
|
||||
import json
|
||||
import configparser
|
||||
import getpass
|
||||
from kmip.core import enums
|
||||
from kmip.demos import utils
|
||||
from kmip.pie import client
|
||||
|
||||
os_detected = platform.system()
|
||||
script_directory = os.path.dirname(os.path.realpath(__file__))
|
||||
@@ -25,6 +28,137 @@ else:
|
||||
#encrypted_api_secret =
|
||||
#encrypted_vault_password =
|
||||
|
||||
def create_encryption_key(client):
|
||||
# Create an encryption key.
|
||||
try:
|
||||
key_id = client.create(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
256,
|
||||
cryptographic_usage_mask=[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
logger.debug("Successfully created a new encryption key.")
|
||||
logger.debug("Encryption Key ID: {}".format(key_id))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
sys.exit(-1)
|
||||
|
||||
# Activate the encryption key so that it can be used.
|
||||
try:
|
||||
client.activate(key_id)
|
||||
logger.debug("Successfully activated the encryption key.")
|
||||
return key_id
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
sys.exit(-1)
|
||||
|
||||
def create_hmac_key(client):
|
||||
# Create an encryption key.
|
||||
try:
|
||||
key_id = client.create(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
256,
|
||||
cryptographic_usage_mask=[
|
||||
enums.CryptographicUsageMask.MAC_GENERATE,
|
||||
enums.CryptographicUsageMask.MAC_VERIFY
|
||||
]
|
||||
)
|
||||
logger.debug("Successfully created a new HMAC key.")
|
||||
logger.debug("HMAC Key ID: {}".format(key_id))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
sys.exit(-1)
|
||||
|
||||
# Activate the HMAC key so that it can be used.
|
||||
try:
|
||||
client.activate(key_id)
|
||||
logger.debug("Successfully activated the HMAC key.")
|
||||
return key_id
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
sys.exit(-1)
|
||||
|
||||
def encrypt(client, data):
|
||||
try:
|
||||
data = data.encode('UTF-8')
|
||||
key_id = create_encryption_key(client)
|
||||
iv = secrets.token_bytes(16)
|
||||
cipher_text, autogenerated_iv = client.encrypt(
|
||||
data,
|
||||
uid=key_id,
|
||||
cryptographic_parameters={
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.ANSI_X923
|
||||
},
|
||||
iv_counter_nonce=(
|
||||
iv
|
||||
)
|
||||
)
|
||||
hmac_key_id, hmac = client.mac(
|
||||
key_id.encode() + iv + cipher_text,
|
||||
uid = create_hmac_key(client),
|
||||
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
|
||||
)
|
||||
logger.debug("Successfully encrypted the data.")
|
||||
array = dict()
|
||||
array['version'] = 1
|
||||
array['cipher_key_id'] = key_id
|
||||
array['cipher_text'] = base64.b64encode(cipher_text).decode()
|
||||
array['iv'] = base64.b64encode(iv).decode()
|
||||
array['hmac_key_id'] = hmac_key_id
|
||||
array['hmac'] = base64.b64encode(hmac).decode()
|
||||
logger.debug("Dict of info: {}".format(array))
|
||||
array_json = json.dumps(array)
|
||||
array_json_b64 = base64.b64encode(array_json.encode('utf-8')).decode()
|
||||
return array_json_b64
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
|
||||
def decrypt(client, data):
|
||||
try:
|
||||
array_json = base64.b64decode(data)
|
||||
array = json.loads(array_json)
|
||||
logger.debug("Dict of info: {}".format(array))
|
||||
key_id = array['cipher_key_id']
|
||||
iv = base64.b64decode(array['iv'])
|
||||
cipher_text = base64.b64decode(array['cipher_text'])
|
||||
hmac_key_id = array['hmac_key_id']
|
||||
hmac = base64.b64decode(array['hmac'])
|
||||
hmac_key_id_test, hmac_test = client.mac(
|
||||
key_id.encode() + iv + cipher_text,
|
||||
uid = hmac_key_id,
|
||||
algorithm = enums.CryptographicAlgorithm.HMAC_SHA512
|
||||
)
|
||||
if pyhmac.compare_digest(hmac, hmac_test):
|
||||
logger.debug("HMAC matches.")
|
||||
else:
|
||||
logger.error("HMAC does not match, data is corrupted/tampered.")
|
||||
sys.exit(-1)
|
||||
plain_text = client.decrypt(
|
||||
cipher_text,
|
||||
uid=key_id,
|
||||
cryptographic_parameters={
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.ANSI_X923
|
||||
},
|
||||
iv_counter_nonce=(
|
||||
iv
|
||||
)
|
||||
)
|
||||
logger.debug("Successfully decrypted the data.")
|
||||
plain_text = plain_text.decode('utf-8')
|
||||
return plain_text
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
def does_file_exist(filepath):
|
||||
return os.path.exists(filepath)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user