mirror of
https://github.com/openkmip/pykmip
synced 2025-12-15 15:53:36 +00:00
Add Decrypt support to the clients
This change adds Decrypt operation support to the KMIPProxy and ProxyKmipClient clients, including unit tests to cover the new functionality.
This commit is contained in:
@@ -785,6 +785,140 @@ class ProxyKmipClient(api.KmipClient):
|
||||
result.get('result_message')
|
||||
)
|
||||
|
||||
def decrypt(self, data, uid=None, cryptographic_parameters=None,
|
||||
iv_counter_nonce=None):
|
||||
"""
|
||||
Decrypt data using the specified decryption key and parameters.
|
||||
|
||||
Args:
|
||||
data (bytes): The bytes to decrypt. Required.
|
||||
uid (string): The unique ID of the decryption key to use.
|
||||
Optional, defaults to None.
|
||||
cryptographic_parameters (dict): A dictionary containing various
|
||||
cryptographic settings to be used for the decryption.
|
||||
Optional, defaults to None.
|
||||
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
|
||||
nonce, if needed by the decryption algorithm and/or cipher
|
||||
mode. Optional, defaults to None.
|
||||
|
||||
Returns:
|
||||
bytes: The decrypted data.
|
||||
|
||||
Raises:
|
||||
ClientConnectionNotOpen: if the client connection is unusable
|
||||
KmipOperationFailure: if the operation result is a failure
|
||||
TypeError: if the input arguments are invalid
|
||||
|
||||
Notes:
|
||||
The cryptographic_parameters argument is a dictionary that can
|
||||
contain the following key/value pairs:
|
||||
|
||||
Keys | Value
|
||||
------------------------------|-----------------------------------
|
||||
'block_cipher_mode' | A BlockCipherMode enumeration
|
||||
| indicating the cipher mode to use
|
||||
| with the decryption algorithm.
|
||||
'padding_method' | A PaddingMethod enumeration
|
||||
| indicating which padding method to
|
||||
| use with the decryption algorithm.
|
||||
'hashing_algorithm' | A HashingAlgorithm enumeration
|
||||
| indicating which hashing algorithm
|
||||
| to use.
|
||||
'key_role_type' | A KeyRoleType enumeration
|
||||
| indicating the intended use of the
|
||||
| associated cryptographic key.
|
||||
'digital_signature_algorithm' | A DigitalSignatureAlgorithm
|
||||
| enumeration indicating which
|
||||
| digital signature algorithm to
|
||||
| use.
|
||||
'cryptographic_algorithm' | A CryptographicAlgorithm
|
||||
| enumeration indicating which
|
||||
| decryption algorithm to use.
|
||||
'random_iv' | A boolean indicating whether the
|
||||
| server should autogenerate an IV.
|
||||
'iv_length' | An integer representing the length
|
||||
| of the initialization vector (IV)
|
||||
| in bits.
|
||||
'tag_length' | An integer representing the length
|
||||
| of the authenticator tag in bytes.
|
||||
'fixed_field_length' | An integer representing the length
|
||||
| of the fixed field portion of the
|
||||
| IV in bits.
|
||||
'invocation_field_length' | An integer representing the length
|
||||
| of the invocation field portion of
|
||||
| the IV in bits.
|
||||
'counter_length' | An integer representing the length
|
||||
| of the counter portion of the IV
|
||||
| in bits.
|
||||
'initial_counter_value' | An integer representing the
|
||||
| starting counter value for CTR
|
||||
| mode (typically 1).
|
||||
"""
|
||||
# Check input
|
||||
if not isinstance(data, six.binary_type):
|
||||
raise TypeError("data must be bytes")
|
||||
if uid is not None:
|
||||
if not isinstance(uid, six.string_types):
|
||||
raise TypeError("uid must be a string")
|
||||
if cryptographic_parameters is not None:
|
||||
if not isinstance(cryptographic_parameters, dict):
|
||||
raise TypeError("cryptographic_parameters must be a dict")
|
||||
if iv_counter_nonce is not None:
|
||||
if not isinstance(iv_counter_nonce, six.binary_type):
|
||||
raise TypeError("iv_counter_nonce must be bytes")
|
||||
|
||||
# Verify that operations can be given at this time
|
||||
if not self._is_open:
|
||||
raise exceptions.ClientConnectionNotOpen()
|
||||
|
||||
cryptographic_parameters = CryptographicParameters(
|
||||
block_cipher_mode=cryptographic_parameters.get(
|
||||
'block_cipher_mode'
|
||||
),
|
||||
padding_method=cryptographic_parameters.get('padding_method'),
|
||||
hashing_algorithm=cryptographic_parameters.get(
|
||||
'hashing_algorithm'
|
||||
),
|
||||
key_role_type=cryptographic_parameters.get('key_role_type'),
|
||||
digital_signature_algorithm=cryptographic_parameters.get(
|
||||
'digital_signature_algorithm'
|
||||
),
|
||||
cryptographic_algorithm=cryptographic_parameters.get(
|
||||
'cryptographic_algorithm'
|
||||
),
|
||||
random_iv=cryptographic_parameters.get('random_iv'),
|
||||
iv_length=cryptographic_parameters.get('iv_length'),
|
||||
tag_length=cryptographic_parameters.get('tag_length'),
|
||||
fixed_field_length=cryptographic_parameters.get(
|
||||
'fixed_field_length'
|
||||
),
|
||||
invocation_field_length=cryptographic_parameters.get(
|
||||
'invocation_field_length'
|
||||
),
|
||||
counter_length=cryptographic_parameters.get('counter_length'),
|
||||
initial_counter_value=cryptographic_parameters.get(
|
||||
'initial_counter_value'
|
||||
)
|
||||
)
|
||||
|
||||
# Decrypt the provided data and handle the results
|
||||
result = self.proxy.decrypt(
|
||||
data,
|
||||
uid,
|
||||
cryptographic_parameters,
|
||||
iv_counter_nonce
|
||||
)
|
||||
|
||||
status = result.get('result_status')
|
||||
if status == enums.ResultStatus.SUCCESS:
|
||||
return result.get('data')
|
||||
else:
|
||||
raise exceptions.KmipOperationFailure(
|
||||
status,
|
||||
result.get('result_reason'),
|
||||
result.get('result_message')
|
||||
)
|
||||
|
||||
def mac(self, data, uid=None, algorithm=None):
|
||||
"""
|
||||
Get the message authentication code for data.
|
||||
|
||||
Reference in New Issue
Block a user