2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-24 04:04:20 +00:00

Add encryption support to the server cryptography engine

This change adds encryption functionality to the cryptographic
engine used by the server. It supports a variety of symmetric
encryption algorithms and block cipher modes. Asymmetric encryption
support will be added in a future patch.

Unit tests and minor updates to surrounding core code are included.
This commit is contained in:
Peter Hamilton
2017-06-16 16:03:30 -04:00
parent 66f80922f3
commit 7bc613417b
5 changed files with 657 additions and 23 deletions

View File

@@ -18,8 +18,12 @@ import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes, hmac, cmac
from cryptography.hazmat.primitives import padding as symmetric_padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.asymmetric import padding as \
asymmetric_padding
from cryptography.hazmat.primitives import ciphers
from cryptography.hazmat.primitives.ciphers import algorithms, modes
from kmip.core import enums
from kmip.core import exceptions
@@ -38,16 +42,18 @@ class CryptographyEngine(api.CryptographicEngine):
"""
self.logger = logging.getLogger('kmip.server.engine.cryptography')
# The IDEA algorithm is supported by cryptography but may not be
# supported by certain backends, like OpenSSL.
self._symmetric_key_algorithms = {
enums.CryptographicAlgorithm.TRIPLE_DES: algorithms.TripleDES,
enums.CryptographicAlgorithm.AES: algorithms.AES,
enums.CryptographicAlgorithm.BLOWFISH: algorithms.Blowfish,
enums.CryptographicAlgorithm.CAMELLIA: algorithms.Camellia,
enums.CryptographicAlgorithm.CAST5: algorithms.CAST5,
enums.CryptographicAlgorithm.IDEA: algorithms.IDEA,
enums.CryptographicAlgorithm.RC4: algorithms.ARC4
enums.CryptographicAlgorithm.AES: algorithms.AES,
enums.CryptographicAlgorithm.BLOWFISH: algorithms.Blowfish,
enums.CryptographicAlgorithm.CAMELLIA: algorithms.Camellia,
enums.CryptographicAlgorithm.CAST5: algorithms.CAST5,
enums.CryptographicAlgorithm.IDEA: algorithms.IDEA,
enums.CryptographicAlgorithm.RC4: algorithms.ARC4
}
self._asymetric_key_algorithms = {
self._asymmetric_key_algorithms = {
enums.CryptographicAlgorithm.RSA: self._create_rsa_key_pair
}
self._hash_algorithms = {
@@ -59,6 +65,43 @@ class CryptographyEngine(api.CryptographicEngine):
enums.CryptographicAlgorithm.HMAC_MD5: hashes.MD5
}
# TODO(peter-hamilton): Consider merging above hash dict and this one
self._encryption_hash_algorithms = {
enums.HashingAlgorithm.MD5: hashes.MD5,
enums.HashingAlgorithm.SHA_1: hashes.SHA1,
enums.HashingAlgorithm.SHA_224: hashes.SHA224,
enums.HashingAlgorithm.SHA_256: hashes.SHA256,
enums.HashingAlgorithm.SHA_384: hashes.SHA384,
enums.HashingAlgorithm.SHA_512: hashes.SHA512
}
# GCM is supported by cryptography but requires inputs that are not
# supported by the KMIP spec. It is excluded for now.
self._modes = {
enums.BlockCipherMode.CBC: modes.CBC,
enums.BlockCipherMode.ECB: modes.ECB,
enums.BlockCipherMode.OFB: modes.OFB,
enums.BlockCipherMode.CFB: modes.CFB,
enums.BlockCipherMode.CTR: modes.CTR
}
self._asymmetric_padding_methods = {
enums.PaddingMethod.OAEP: asymmetric_padding.OAEP,
enums.PaddingMethod.PKCS1v15: asymmetric_padding.PKCS1v15
}
self._symmetric_padding_methods = {
enums.PaddingMethod.ANSI_X923: symmetric_padding.ANSIX923,
enums.PaddingMethod.PKCS5: symmetric_padding.PKCS7
}
self._no_mode_needed = [
enums.CryptographicAlgorithm.RC4
]
self._no_padding_needed = [
enums.BlockCipherMode.CTR,
enums.BlockCipherMode.OFB,
enums.BlockCipherMode.CFB,
enums.BlockCipherMode.GCM
]
def create_symmetric_key(self, algorithm, length):
"""
Create a symmetric key.
@@ -147,13 +190,13 @@ class CryptographyEngine(api.CryptographicEngine):
>>> key = engine.create_asymmetric_key(
... CryptographicAlgorithm.RSA, 2048)
"""
if algorithm not in self._asymetric_key_algorithms.keys():
if algorithm not in self._asymmetric_key_algorithms.keys():
raise exceptions.InvalidField(
"The cryptographic algorithm ({0}) is not a supported "
"asymmetric key algorithm.".format(algorithm)
)
engine_method = self._asymetric_key_algorithms.get(algorithm)
engine_method = self._asymmetric_key_algorithms.get(algorithm)
return engine_method(length)
def mac(self, algorithm, key, data):
@@ -224,6 +267,169 @@ class CryptographyEngine(api.CryptographicEngine):
)
return mac_data
def encrypt(self,
encryption_algorithm,
encryption_key,
plain_text,
cipher_mode=None,
padding_method=None,
iv_nonce=None):
"""
Encrypt data using symmetric encryption.
Args:
encryption_algorithm (CryptographicAlgorithm): An enumeration
specifying the symmetric encryption algorithm to use for
encryption.
encryption_key (bytes): The bytes of the symmetric key to use for
encryption.
plain_text (bytes): The bytes to be encrypted.
cipher_mode (BlockCipherMode): An enumeration specifying the
block cipher mode to use with the encryption algorithm.
Required in the general case. Optional if the encryption
algorithm is RC4 (aka ARC4). If optional, defaults to None.
padding_method (PaddingMethod): An enumeration specifying the
padding method to use on the data before encryption. Required
if the cipher mode is for block ciphers (e.g., CBC, ECB).
Optional otherwise, defaults to None.
iv_nonce (bytes): The IV/nonce value to use to initialize the mode
of the encryption algorithm. Optional, defaults to None. If
required and not provided, it will be autogenerated and
returned with the cipher text.
Returns:
dict: A dictionary containing the encrypted data, with at least
the following key/value fields:
* cipher_text - the bytes of the encrypted data
* iv_nonce - the bytes of the IV/counter/nonce used if it
was needed by the encryption scheme and if it was
automatically generated for the encryption
Raises:
InvalidField: Raised when the algorithm is unsupported or the
length is incompatible with the algorithm.
CryptographicFailure: Raised when the key generation process
fails.
Example:
>>> engine = CryptographyEngine()
>>> result = engine.encrypt(
... encryption_algorithm=CryptographicAlgorithm.AES,
... encryption_key=(
... b'\xF3\x96\xE7\x1C\xCF\xCD\xEC\x1F'
... b'\xFC\xE2\x8E\xA6\xF8\x74\x28\xB0'
... ),
... plain_text=(
... b'\x00\x01\x02\x03\x04\x05\x06\x07'
... b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
... ),
... cipher_mode=BlockCipherMode.CBC,
... padding_method=PaddingMethod.ANSI_X923,
... )
>>> result.get('cipher_text')
b'\x18[\xb9y\x1bL\xd1\x8f\x9a\xa0e\x02b\xa3=c'
>>> result.iv_counter_nonce
b'8qA\x05\xc4\x86\x03\xd9=\xef\xdf\xb8ke\x9a\xa2'
"""
# Set up the algorithm
if encryption_algorithm is None:
raise exceptions.InvalidField("Encryption algorithm is required.")
algorithm = self._symmetric_key_algorithms.get(
encryption_algorithm,
None
)
if algorithm is None:
raise exceptions.InvalidField(
"Encryption algorithm '{0}' is not a supported symmetric "
"encryption algorithm.".format(encryption_algorithm)
)
try:
algorithm = algorithm(encryption_key)
except Exception as e:
self.logger.exception(e)
raise exceptions.CryptographicFailure(
"Invalid key bytes for the specified encryption algorithm."
)
# Set up the cipher mode if needed
return_iv_nonce = False
if encryption_algorithm == enums.CryptographicAlgorithm.RC4:
mode = None
else:
if cipher_mode is None:
raise exceptions.InvalidField("Cipher mode is required.")
mode = self._modes.get(cipher_mode, None)
if mode is None:
raise exceptions.InvalidField(
"Cipher mode '{0}' is not a supported mode.".format(
cipher_mode
)
)
if hasattr(mode, 'initialization_vector') or \
hasattr(mode, 'nonce'):
if iv_nonce is None:
iv_nonce = os.urandom(algorithm.block_size // 8)
return_iv_nonce = True
mode = mode(iv_nonce)
else:
mode = mode()
# Pad the plain text if needed (separate methods for testing purposes)
if cipher_mode in [
enums.BlockCipherMode.CBC,
enums.BlockCipherMode.ECB
]:
plain_text = self._handle_symmetric_padding(
self._symmetric_key_algorithms.get(encryption_algorithm),
plain_text,
padding_method
)
# Encrypt the plain text
cipher = ciphers.Cipher(algorithm, mode, backend=default_backend())
encryptor = cipher.encryptor()
cipher_text = encryptor.update(plain_text) + encryptor.finalize()
if return_iv_nonce:
return {
'cipher_text': cipher_text,
'iv_nonce': iv_nonce
}
else:
return {'cipher_text': cipher_text}
def _handle_symmetric_padding(self,
algorithm,
plain_text,
padding_method):
# KMIP 1.3 test TC-STREAM-ENC-2-13.xml demonstrates a case
# where an encrypt call for 3DES-ECB does not use padding if
# the plaintext fits the blocksize of the algorithm. This does
# not appear to be documented explicitly in the KMIP spec. It
# also makes failures during unpadding after decryption
# impossible to differentiate from cipher text/key mismatches.
# For now, ALWAYS apply padding regardless of plain text length.
if padding_method in self._symmetric_padding_methods.keys():
padding_method = self._symmetric_padding_methods.get(
padding_method
)
padder = padding_method(algorithm.block_size).padder()
plain_text = padder.update(plain_text)
plain_text += padder.finalize()
else:
if padding_method is None:
raise exceptions.InvalidField(
"Padding method is required."
)
else:
raise exceptions.InvalidField(
"Padding method '{0}' is not supported.".format(
padding_method
)
)
return plain_text
def _create_rsa_key_pair(self, length, public_exponent=65537):
"""
Create an RSA key pair.