diff --git a/kmip/services/server/crypto/engine.py b/kmip/services/server/crypto/engine.py index d83defe..8d2877a 100644 --- a/kmip/services/server/crypto/engine.py +++ b/kmip/services/server/crypto/engine.py @@ -276,15 +276,15 @@ class CryptographyEngine(api.CryptographicEngine): plain_text, cipher_mode=None, padding_method=None, - iv_nonce=None): + iv_nonce=None, + hashing_algorithm=None): """ - Encrypt data using symmetric encryption. + Encrypt data using symmetric or asymmetric encryption. Args: encryption_algorithm (CryptographicAlgorithm): An enumeration - specifying the symmetric encryption algorithm to use for - encryption. - encryption_key (bytes): The bytes of the symmetric key to use for + specifying the encryption algorithm to use for encryption. + encryption_key (bytes): The bytes of the encryption key to use for encryption. plain_text (bytes): The bytes to be encrypted. cipher_mode (BlockCipherMode): An enumeration specifying the @@ -299,6 +299,10 @@ class CryptographyEngine(api.CryptographicEngine): of the encryption algorithm. Optional, defaults to None. If required and not provided, it will be autogenerated and returned with the cipher text. + hashing_algorithm (HashingAlgorithm): An enumeration specifying + the hashing algorithm to use with the encryption algorithm, + if needed. Required for OAEP-based asymmetric encryption. + Optional, defaults to None. Returns: dict: A dictionary containing the encrypted data, with at least @@ -334,10 +338,74 @@ class CryptographyEngine(api.CryptographicEngine): >>> result.iv_counter_nonce b'8qA\x05\xc4\x86\x03\xd9=\xef\xdf\xb8ke\x9a\xa2' """ - - # Set up the algorithm if encryption_algorithm is None: raise exceptions.InvalidField("Encryption algorithm is required.") + + if encryption_algorithm == enums.CryptographicAlgorithm.RSA: + return self._encrypt_asymmetric( + encryption_algorithm, + encryption_key, + plain_text, + padding_method, + hashing_algorithm=hashing_algorithm + ) + else: + return self._encrypt_symmetric( + encryption_algorithm, + encryption_key, + plain_text, + cipher_mode=cipher_mode, + padding_method=padding_method, + iv_nonce=iv_nonce + ) + + def _encrypt_symmetric( + self, + encryption_algorithm, + encryption_key, + plain_text, + cipher_mode=None, + padding_method=None, + iv_nonce=None): + """ + Encrypt data using symmetric encryption. + + Args: + encryption_algorithm (CryptographicAlgorithm): An enumeration + specifying the symmetric encryption algorithm to use for + encryption. + encryption_key (bytes): The bytes of the symmetric key to use for + encryption. + plain_text (bytes): The bytes to be encrypted. + cipher_mode (BlockCipherMode): An enumeration specifying the + block cipher mode to use with the encryption algorithm. + Required in the general case. Optional if the encryption + algorithm is RC4 (aka ARC4). If optional, defaults to None. + padding_method (PaddingMethod): An enumeration specifying the + padding method to use on the data before encryption. Required + if the cipher mode is for block ciphers (e.g., CBC, ECB). + Optional otherwise, defaults to None. + iv_nonce (bytes): The IV/nonce value to use to initialize the mode + of the encryption algorithm. Optional, defaults to None. If + required and not provided, it will be autogenerated and + returned with the cipher text. + + Returns: + dict: A dictionary containing the encrypted data, with at least + the following key/value fields: + * cipher_text - the bytes of the encrypted data + * iv_nonce - the bytes of the IV/counter/nonce used if it + was needed by the encryption scheme and if it was + automatically generated for the encryption + + Raises: + InvalidField: Raised when the algorithm is unsupported or the + encryption key is incompatible with the algorithm. + CryptographicFailure: Raised when the key generation process + fails. + """ + + # Set up the algorithm algorithm = self._symmetric_key_algorithms.get( encryption_algorithm, None @@ -402,6 +470,89 @@ class CryptographyEngine(api.CryptographicEngine): else: return {'cipher_text': cipher_text} + def _encrypt_asymmetric(self, + encryption_algorithm, + encryption_key, + plain_text, + padding_method, + hashing_algorithm=None): + """ + Encrypt data using asymmetric encryption. + + Args: + encryption_algorithm (CryptographicAlgorithm): An enumeration + specifying the asymmetric encryption algorithm to use for + encryption. Required. + encryption_key (bytes): The bytes of the public key to use for + encryption. Required. + plain_text (bytes): The bytes to be encrypted. Required. + padding_method (PaddingMethod): An enumeration specifying the + padding method to use with the asymmetric encryption + algorithm. Required. + hashing_algorithm (HashingAlgorithm): An enumeration specifying + the hashing algorithm to use with the encryption padding + method. Required, if the padding method is OAEP. Optional + otherwise, defaults to None. + + Returns: + dict: A dictionary containing the encrypted data, with at least + the following key/value field: + * cipher_text - the bytes of the encrypted data + + Raises: + InvalidField: Raised when the algorithm is unsupported or the + length is incompatible with the algorithm. + CryptographicFailure: Raised when the key generation process + fails. + """ + if encryption_algorithm == enums.CryptographicAlgorithm.RSA: + if padding_method == enums.PaddingMethod.OAEP: + hash_algorithm = self._encryption_hash_algorithms.get( + hashing_algorithm + ) + if hash_algorithm is None: + raise exceptions.InvalidField( + "The hashing algorithm '{0}' is not supported for " + "asymmetric encryption.".format(hashing_algorithm) + ) + + padding_method = asymmetric_padding.OAEP( + mgf=asymmetric_padding.MGF1( + algorithm=hash_algorithm() + ), + algorithm=hash_algorithm(), + label=None + ) + elif padding_method == enums.PaddingMethod.PKCS1v15: + padding_method = asymmetric_padding.PKCS1v15() + else: + raise exceptions.InvalidField( + "The padding method '{0}' is not supported for asymmetric " + "encryption.".format(padding_method) + ) + + backend = default_backend() + + try: + public_key = backend.load_der_public_key(encryption_key) + except Exception: + try: + public_key = backend.load_pem_public_key(encryption_key) + except Exception: + raise exceptions.CryptographicFailure( + "The public key bytes could not be loaded." + ) + cipher_text = public_key.encrypt( + plain_text, + padding_method + ) + return {'cipher_text': cipher_text} + else: + raise exceptions.InvalidField( + "The cryptographic algorithm '{0}' is not supported for " + "asymmetric encryption.".format(encryption_algorithm) + ) + def _handle_symmetric_padding(self, algorithm, plain_text, @@ -443,7 +594,8 @@ class CryptographyEngine(api.CryptographicEngine): cipher_text, cipher_mode=None, padding_method=None, - iv_nonce=None): + iv_nonce=None, + hashing_algorithm=None): """ Decrypt data using symmetric decryption. @@ -464,6 +616,10 @@ class CryptographyEngine(api.CryptographicEngine): Optional otherwise, defaults to None. iv_nonce (bytes): The IV/nonce value to use to initialize the mode of the decryption algorithm. Optional, defaults to None. + hashing_algorithm (HashingAlgorithm): An enumeration specifying + the hashing algorithm to use with the decryption algorithm, + if needed. Required for OAEP-based asymmetric decryption. + Optional, defaults to None. Returns: bytes: the bytes of the decrypted data @@ -496,10 +652,66 @@ class CryptographyEngine(api.CryptographicEngine): >>> result b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f' """ - - # Set up the algorithm if decryption_algorithm is None: raise exceptions.InvalidField("Decryption algorithm is required.") + + if decryption_algorithm == enums.CryptographicAlgorithm.RSA: + return self._decrypt_asymmetric( + decryption_algorithm, + decryption_key, + cipher_text, + padding_method, + hashing_algorithm=hashing_algorithm + ) + else: + return self._decrypt_symmetric( + decryption_algorithm, + decryption_key, + cipher_text, + cipher_mode=cipher_mode, + padding_method=padding_method, + iv_nonce=iv_nonce + ) + + def _decrypt_symmetric( + self, + decryption_algorithm, + decryption_key, + cipher_text, + cipher_mode=None, + padding_method=None, + iv_nonce=None): + """ + Decrypt data using symmetric decryption. + + Args: + decryption_algorithm (CryptographicAlgorithm): An enumeration + specifying the symmetric decryption algorithm to use for + decryption. + decryption_key (bytes): The bytes of the symmetric key to use for + decryption. + cipher_text (bytes): The bytes to be decrypted. + cipher_mode (BlockCipherMode): An enumeration specifying the + block cipher mode to use with the decryption algorithm. + Required in the general case. Optional if the decryption + algorithm is RC4 (aka ARC4). If optional, defaults to None. + padding_method (PaddingMethod): An enumeration specifying the + padding method to use on the data after decryption. Required + if the cipher mode is for block ciphers (e.g., CBC, ECB). + Optional otherwise, defaults to None. + iv_nonce (bytes): The IV/nonce value to use to initialize the mode + of the decryption algorithm. Optional, defaults to None. + + Returns: + bytes: the bytes of the decrypted data + + Raises: + InvalidField: Raised when the algorithm is unsupported or the + length is incompatible with the algorithm. + CryptographicFailure: Raised when the key generation process + fails. + """ + # Set up the algorithm algorithm = self._symmetric_key_algorithms.get( decryption_algorithm, None @@ -560,6 +772,96 @@ class CryptographyEngine(api.CryptographicEngine): return plain_text + def _decrypt_asymmetric( + self, + decryption_algorithm, + decryption_key, + cipher_text, + padding_method, + hashing_algorithm=None): + """ + Encrypt data using asymmetric decryption. + + Args: + decryption_algorithm (CryptographicAlgorithm): An enumeration + specifying the asymmetric decryption algorithm to use for + decryption. Required. + decryption_key (bytes): The bytes of the private key to use for + decryption. Required. + cipher_text (bytes): The bytes to be decrypted. Required. + padding_method (PaddingMethod): An enumeration specifying the + padding method to use with the asymmetric decryption + algorithm. Required. + hashing_algorithm (HashingAlgorithm): An enumeration specifying + the hashing algorithm to use with the decryption padding + method. Required, if the padding method is OAEP. Optional + otherwise, defaults to None. + + Returns: + dict: A dictionary containing the decrypted data, with at least + the following key/value field: + * plain_text - the bytes of the decrypted data + + Raises: + InvalidField: Raised when the algorithm is unsupported or the + length is incompatible with the algorithm. + CryptographicFailure: Raised when the key generation process + fails. + """ + if decryption_algorithm == enums.CryptographicAlgorithm.RSA: + if padding_method == enums.PaddingMethod.OAEP: + hash_algorithm = self._encryption_hash_algorithms.get( + hashing_algorithm + ) + if hash_algorithm is None: + raise exceptions.InvalidField( + "The hashing algorithm '{0}' is not supported for " + "asymmetric decryption.".format(hashing_algorithm) + ) + + padding_method = asymmetric_padding.OAEP( + mgf=asymmetric_padding.MGF1( + algorithm=hash_algorithm() + ), + algorithm=hash_algorithm(), + label=None + ) + elif padding_method == enums.PaddingMethod.PKCS1v15: + padding_method = asymmetric_padding.PKCS1v15() + else: + raise exceptions.InvalidField( + "The padding method '{0}' is not supported for asymmetric " + "decryption.".format(padding_method) + ) + + backend = default_backend() + + try: + private_key = backend.load_der_private_key( + decryption_key, + None + ) + except Exception: + try: + private_key = backend.load_pem_private_key( + decryption_key, + None + ) + except Exception: + raise exceptions.CryptographicFailure( + "The private key bytes could not be loaded." + ) + plain_text = private_key.decrypt( + cipher_text, + padding_method + ) + return plain_text + else: + raise exceptions.InvalidField( + "The cryptographic algorithm '{0}' is not supported for " + "asymmetric decryption.".format(decryption_algorithm) + ) + def _create_rsa_key_pair(self, length, public_exponent=65537): """ Create an RSA key pair. diff --git a/kmip/tests/unit/services/server/crypto/test_engine.py b/kmip/tests/unit/services/server/crypto/test_engine.py index c6dd905..2ad9cb7 100644 --- a/kmip/tests/unit/services/server/crypto/test_engine.py +++ b/kmip/tests/unit/services/server/crypto/test_engine.py @@ -17,6 +17,9 @@ import mock import pytest import testtools +from cryptography.hazmat import backends +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.ciphers import algorithms from kmip.core import enums @@ -261,10 +264,10 @@ class TestCryptographyEngine(testtools.TestCase): *args ) - def test_encrypt_invalid_algorithm(self): + def test_encrypt_symmetric_invalid_algorithm(self): """ - Test that the right errors are raised when invalid encryption - algorithms are used. + Test that the right errors are raised when invalid symmetric + encryption algorithms are used. """ engine = crypto.CryptographyEngine() @@ -285,10 +288,10 @@ class TestCryptographyEngine(testtools.TestCase): *args ) - def test_encrypt_invalid_algorithm_key(self): + def test_encrypt_symmetric_invalid_algorithm_key(self): """ Test that the right error is raised when an invalid key is used with - an encryption algorithm. + a symmetric encryption algorithm. """ engine = crypto.CryptographyEngine() @@ -300,10 +303,10 @@ class TestCryptographyEngine(testtools.TestCase): *args ) - def test_encrypt_no_mode_needed(self): + def test_encrypt_symmetric_no_mode_needed(self): """ - Test that data can be encrypted for certain inputs without a cipher - mode. + Test that data can be symmetrically encrypted for certain inputs + without a cipher mode. """ engine = crypto.CryptographyEngine() @@ -313,10 +316,10 @@ class TestCryptographyEngine(testtools.TestCase): b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08' ) - def test_encrypt_invalid_cipher_mode(self): + def test_encrypt_symmetric_invalid_cipher_mode(self): """ Test that the right errors are raised when invalid cipher modes are - used. + used with symmetric encryption. """ engine = crypto.CryptographyEngine() @@ -343,10 +346,10 @@ class TestCryptographyEngine(testtools.TestCase): **kwargs ) - def test_encrypt_generate_iv(self): + def test_encrypt_symmetric_generate_iv(self): """ Test that the initialization vector is correctly generated and - returned for an appropriate set of encryption inputs. + returned for an appropriate set of symmetric encryption inputs. """ engine = crypto.CryptographyEngine() @@ -379,9 +382,189 @@ class TestCryptographyEngine(testtools.TestCase): self.assertNotIn('iv_nonce', result.keys()) - def test_decrypt_invalid_algorithm(self): + def test_encrypt_asymmetric_invalid_encryption_algorithm(self): """ - Test that the right errors are raised when invalid decryption + Test that the right error is raised when an invalid asymmetric + encryption algorithm is specified. + """ + engine = crypto.CryptographyEngine() + + args = ('invalid', b'', b'', None, None) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic algorithm 'invalid' is not supported for " + "asymmetric encryption.", + engine._encrypt_asymmetric, + *args + ) + + def test_encrypt_asymmetric_invalid_hashing_algorithm(self): + """ + Test that the right error is raised when an invalid hashing algorithm + is specified. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.RSA, + b'', + b'' + ) + kwargs = { + 'padding_method': enums.PaddingMethod.OAEP, + 'hashing_algorithm': 'invalid' + } + self.assertRaisesRegexp( + exceptions.InvalidField, + "The hashing algorithm 'invalid' is not supported for asymmetric " + "encryption.", + engine.encrypt, + *args, + **kwargs + ) + + def test_encrypt_asymmetric_invalid_padding_method(self): + """ + Test that the right error is raised when an invalid padding method + is specified. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.RSA, + b'', + b'' + ) + kwargs = { + 'padding_method': 'invalid', + 'hashing_algorithm': enums.HashingAlgorithm.SHA_1 + } + self.assertRaisesRegexp( + exceptions.InvalidField, + "The padding method 'invalid' is not supported for asymmetric " + "encryption.", + engine.encrypt, + *args, + **kwargs + ) + + def test_encrypt_asymmetric_invalid_public_key(self): + """ + Test that the right error is raised when an invalid public key is + specified. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.RSA, + 'invalid', + b'' + ) + kwargs = { + 'padding_method': enums.PaddingMethod.OAEP, + 'hashing_algorithm': enums.HashingAlgorithm.SHA_1 + } + self.assertRaisesRegexp( + exceptions.CryptographicFailure, + "The public key bytes could not be loaded.", + engine.encrypt, + *args, + **kwargs + ) + + def test_decrypt_asymmetric_invalid_encryption_algorithm(self): + """ + Test that the right error is raised when an invalid asymmetric + decryption algorithm is specified. + """ + engine = crypto.CryptographyEngine() + + args = ('invalid', b'', b'', None, None) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic algorithm 'invalid' is not supported for " + "asymmetric decryption.", + engine._decrypt_asymmetric, + *args + ) + + def test_decrypt_asymmetric_invalid_hashing_algorithm(self): + """ + Test that the right error is raised when an invalid hashing algorithm + is specified. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.RSA, + b'', + b'' + ) + kwargs = { + 'padding_method': enums.PaddingMethod.OAEP, + 'hashing_algorithm': 'invalid' + } + self.assertRaisesRegexp( + exceptions.InvalidField, + "The hashing algorithm 'invalid' is not supported for asymmetric " + "decryption.", + engine.decrypt, + *args, + **kwargs + ) + + def test_decrypt_asymmetric_invalid_padding_method(self): + """ + Test that the right error is raised when an invalid padding method + is specified. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.RSA, + b'', + b'' + ) + kwargs = { + 'padding_method': 'invalid', + 'hashing_algorithm': enums.HashingAlgorithm.SHA_1 + } + self.assertRaisesRegexp( + exceptions.InvalidField, + "The padding method 'invalid' is not supported for asymmetric " + "decryption.", + engine.decrypt, + *args, + **kwargs + ) + + def test_decrypt_asymmetric_invalid_private_key(self): + """ + Test that the right error is raised when an invalid private key is + specified. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.RSA, + 'invalid', + b'' + ) + kwargs = { + 'padding_method': enums.PaddingMethod.OAEP, + 'hashing_algorithm': enums.HashingAlgorithm.SHA_1 + } + self.assertRaisesRegexp( + exceptions.CryptographicFailure, + "The private key bytes could not be loaded.", + engine.decrypt, + *args, + **kwargs + ) + + def test_decrypt_symmetric_invalid_algorithm(self): + """ + Test that the right errors are raised when invalid symmetric decryption algorithms are used. """ engine = crypto.CryptographyEngine() @@ -403,10 +586,10 @@ class TestCryptographyEngine(testtools.TestCase): *args ) - def test_decrypt_invalid_algorithm_key(self): + def test_decrypt_symmetric_invalid_algorithm_key(self): """ Test that the right error is raised when an invalid key is used with - a decryption algorithm. + a symmetric decryption algorithm. """ engine = crypto.CryptographyEngine() @@ -418,10 +601,10 @@ class TestCryptographyEngine(testtools.TestCase): *args ) - def test_decrypt_invalid_cipher_mode(self): + def test_decrypt_symmetric_invalid_cipher_mode(self): """ Test that the right errors are raised when invalid cipher modes are - used. + used with symmetric decryption. """ engine = crypto.CryptographyEngine() @@ -448,10 +631,10 @@ class TestCryptographyEngine(testtools.TestCase): **kwargs ) - def test_decrypt_missing_iv_nonce(self): + def test_decrypt_symmetric_missing_iv_nonce(self): """ Test that the right error is raised when an IV/nonce is not provided - for the decryption algorithm. + for the symmetric decryption algorithm. """ engine = crypto.CryptographyEngine() @@ -884,72 +1067,262 @@ class TestCryptographyEngine(testtools.TestCase): 'iv_nonce': None} ] ) -def encrypt_parameters(request): +def symmetric_parameters(request): return request.param -def test_encrypt(encrypt_parameters): +def test_encrypt_symmetric(symmetric_parameters): """ Test that various encryption algorithms and block cipher modes can be - used to correctly encrypt data. + used to correctly symmetrically encrypt data. """ engine = crypto.CryptographyEngine() engine._handle_symmetric_padding = mock.MagicMock( - return_value=encrypt_parameters.get('plain_text') + return_value=symmetric_parameters.get('plain_text') ) result = engine.encrypt( - encrypt_parameters.get('algorithm'), - encrypt_parameters.get('key'), - encrypt_parameters.get('plain_text'), - cipher_mode=encrypt_parameters.get('cipher_mode'), - iv_nonce=encrypt_parameters.get('iv_nonce') + symmetric_parameters.get('algorithm'), + symmetric_parameters.get('key'), + symmetric_parameters.get('plain_text'), + cipher_mode=symmetric_parameters.get('cipher_mode'), + iv_nonce=symmetric_parameters.get('iv_nonce') ) if engine._handle_symmetric_padding.called: engine._handle_symmetric_padding.assert_called_once_with( engine._symmetric_key_algorithms.get( - encrypt_parameters.get('algorithm') + symmetric_parameters.get('algorithm') ), - encrypt_parameters.get('plain_text'), + symmetric_parameters.get('plain_text'), None ) - assert encrypt_parameters.get('cipher_text') == result.get('cipher_text') + assert symmetric_parameters.get('cipher_text') == result.get('cipher_text') -def test_decrypt(encrypt_parameters): +def test_decrypt_symmetric(symmetric_parameters): """ Test that various decryption algorithms and block cipher modes can be - used to correctly decrypt data. + used to correctly symmetrically decrypt data. """ engine = crypto.CryptographyEngine() engine._handle_symmetric_padding = mock.MagicMock( - return_value=encrypt_parameters.get('plain_text') + return_value=symmetric_parameters.get('plain_text') ) result = engine.decrypt( - encrypt_parameters.get('algorithm'), - encrypt_parameters.get('key'), - encrypt_parameters.get('cipher_text'), - cipher_mode=encrypt_parameters.get('cipher_mode'), - iv_nonce=encrypt_parameters.get('iv_nonce') + symmetric_parameters.get('algorithm'), + symmetric_parameters.get('key'), + symmetric_parameters.get('cipher_text'), + cipher_mode=symmetric_parameters.get('cipher_mode'), + iv_nonce=symmetric_parameters.get('iv_nonce') ) if engine._handle_symmetric_padding.called: engine._handle_symmetric_padding.assert_called_once_with( engine._symmetric_key_algorithms.get( - encrypt_parameters.get('algorithm') + symmetric_parameters.get('algorithm') ), - encrypt_parameters.get('plain_text'), + symmetric_parameters.get('plain_text'), None, undo_padding=True ) - assert encrypt_parameters.get('plain_text') == result + assert symmetric_parameters.get('plain_text') == result + + +# Most of these test vectors were obtained from the pyca/cryptography test +# suite: +# +# cryptography_vectors/asymmetric/RSA/pkcs-1v2-1d2-vec/oaep-vect.txt +# cryptography_vectors/asymmetric/RSA/pkcs1v15crypt-vectors.txt +@pytest.fixture( + scope='function', + params=[ + {'algorithm': enums.CryptographicAlgorithm.RSA, + 'padding_method': enums.PaddingMethod.OAEP, + 'hashing_algorithm': enums.HashingAlgorithm.SHA_1, + 'encoding': serialization.Encoding.DER, + 'public_key': { + 'n': int( + 'a8b3b284af8eb50b387034a860f146c4919f318763cd6c5598c8ae4811a' + '1e0abc4c7e0b082d693a5e7fced675cf4668512772c0cbc64a742c6c630' + 'f533c8cc72f62ae833c40bf25842e984bb78bdbf97c0107d55bdb662f5c' + '4e0fab9845cb5148ef7392dd3aaff93ae1e6b667bb3d4247616d4f5ba10' + 'd4cfd226de88d39f16fb', + 16 + ), + 'e': int('010001', 16) + }, + 'private_key': { + 'd': int( + '53339cfdb79fc8466a655c7316aca85c55fd8f6dd898fdaf119517ef4f5' + '2e8fd8e258df93fee180fa0e4ab29693cd83b152a553d4ac4d1812b8b9f' + 'a5af0e7f55fe7304df41570926f3311f15c4d65a732c483116ee3d3d2d0' + 'af3549ad9bf7cbfb78ad884f84d5beb04724dc7369b31def37d0cf539e9' + 'cfcdd3de653729ead5d1', + 16 + ), + 'p': int( + 'd32737e7267ffe1341b2d5c0d150a81b586fb3132bed2f8d5262864a9cb' + '9f30af38be448598d413a172efb802c21acf1c11c520c2f26a471dcad21' + '2eac7ca39d', + 16 + ), + 'q': int( + 'cc8853d1d54da630fac004f471f281c7b8982d8224a490edbeb33d3e3d5' + 'cc93c4765703d1dd791642f1f116a0dd852be2419b2af72bfe9a030e860' + 'b0288b5d77', + 16 + ), + 'dmp1': int( + '0e12bf1718e9cef5599ba1c3882fe8046a90874eefce8f2ccc20e4f2741' + 'fb0a33a3848aec9c9305fbecbd2d76819967d4671acc6431e4037968db3' + '7878e695c1', + 16 + ), + 'dmq1': int( + '95297b0f95a2fa67d00707d609dfd4fc05c89dafc2ef6d6ea55bec771ea' + '333734d9251e79082ecda866efef13c459e1a631386b7e354c899f5f112' + 'ca85d71583', + 16 + ), + 'iqmp': int( + '4f456c502493bdc0ed2ab756a3a6ed4d67352a697d4216e93212b127a63' + 'd5411ce6fa98d5dbefd73263e3728142743818166ed7dd63687dd2a8ca1' + 'd2f4fbd8e1', + 16 + ) + }, + 'plain_text': ( + b'\x66\x28\x19\x4e\x12\x07\x3d\xb0' + b'\x3b\xa9\x4c\xda\x9e\xf9\x53\x23' + b'\x97\xd5\x0d\xba\x79\xb9\x87\x00' + b'\x4a\xfe\xfe\x34' + )}, + {'algorithm': enums.CryptographicAlgorithm.RSA, + 'padding_method': enums.PaddingMethod.PKCS1v15, + 'encoding': serialization.Encoding.PEM, + 'public_key': { + 'n': int( + '98b70582ca808fd1d3509562a0ef305af6d9875443b35bdf24d536353e3' + 'f1228dcd12a78568356c6ff323abf72ac1cdbfe712fb49fe594a5a2175d' + '48b6732538d8df37cb970be4a5b562c3f298db9ddf75607877918cced1d' + '0d1f377338c0d3d3207797e862c65d11439e588177527a7ded91971adcf' + '91e2e834e37f05a73655', + 16 + ), + 'e': int('010001', 16) + }, + 'private_key': { + 'd': int( + '0614a786052d284cd906a8e413f7622c050f3549c026589ea27750e0bed' + '9410e5a7883a1e603f5c517ad36d49faac5bd66bcb8030fa8d309e351dd' + 'd782d843df975680ae73eea9aab289b757205dadb8fdfb989ec8db8e709' + '5f51f24529f5637aa669331e2569f8b854abecec99aa264c3da7cc6866f' + '0c0e1fb8469848581c73', + 16 + ), + 'p': int( + 'cb61a88c8c305ad9a8fbec2ba4c86cccc2028024aa1690c29bc8264d2fe' + 'be87e4f86e912ef0f5c1853d71cbc9b14baed3c37cef6c7a3598b6fbe06' + '4810905b57', + 16 + ), + 'q': int( + 'c0399f0b9380faba38ff80d2fff6ede79cfdabf658972077a5e2b295693' + 'ea51072268b91746eea9be04ad66100ebed733db4cd0147a18d6de8c0cd' + '8fbf249c33', + 16 + ), + 'dmp1': int( + '944c3a6579574cf7873362ab14359cb7d50393c2a84f59f0bd3cbd48ed1' + '77c6895be8eb6e29ff58c3b9e0ff32ab57bf3be440762848184aa9aa919' + 'd574567e73', + 16 + ), + 'dmq1': int( + '45ebefd58727308cd2b4e6085a8158d29a418feec114e00385bceb96fbb' + 'c84d071a561b95c30087900e2580edb05f6cea7907fcdca5f92917b4bbe' + 'ba5e1e140f', + 16 + ), + 'iqmp': int( + 'c52468c8fd15e5da2f6c8eba4e97baebe995b67a1a7ad719dd9fff366b1' + '84d5ab455075909292044ecb345cf2cdd26228e21f85183255f4a9e69f4' + 'c7152ebb0f', + 16 + ) + }, + 'plain_text': ( + b'\xe9\xa7\x71\xe0\xa6\x5f\x28\x70' + b'\x8e\x83\xd5\xe6\xcc\x89\x8a\x41' + b'\xd7' + )} + ] +) +def asymmetric_parameters(request): + return request.param + + +def test_encrypt_decrypt_asymmetric(asymmetric_parameters): + """ + Test that various encryption/decryption algorithms can be used to + correctly asymmetrically encrypt data. + """ + # NOTE (peter-hamilton) Randomness included in RSA padding schemes + # makes it impossible to unit test just encryption; it's not possible + # to predict the cipher text. Instead, we test the encrypt/decrypt + # cycle to ensure that they correctly mirror each other. + backend = backends.default_backend() + public_key_numbers = rsa.RSAPublicNumbers( + asymmetric_parameters.get('public_key').get('e'), + asymmetric_parameters.get('public_key').get('n') + ) + public_key = public_key_numbers.public_key(backend) + public_bytes = public_key.public_bytes( + asymmetric_parameters.get('encoding'), + serialization.PublicFormat.PKCS1 + ) + + private_key_numbers = rsa.RSAPrivateNumbers( + p=asymmetric_parameters.get('private_key').get('p'), + q=asymmetric_parameters.get('private_key').get('q'), + d=asymmetric_parameters.get('private_key').get('d'), + dmp1=asymmetric_parameters.get('private_key').get('dmp1'), + dmq1=asymmetric_parameters.get('private_key').get('dmq1'), + iqmp=asymmetric_parameters.get('private_key').get('iqmp'), + public_numbers=public_key_numbers + ) + private_key = private_key_numbers.private_key(backend) + private_bytes = private_key.private_bytes( + asymmetric_parameters.get('encoding'), + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption() + ) + + engine = crypto.CryptographyEngine() + + result = engine.encrypt( + asymmetric_parameters.get('algorithm'), + public_bytes, + asymmetric_parameters.get('plain_text'), + padding_method=asymmetric_parameters.get('padding_method'), + hashing_algorithm=asymmetric_parameters.get('hashing_algorithm') + ) + result = engine.decrypt( + asymmetric_parameters.get('algorithm'), + private_bytes, + result.get('cipher_text'), + padding_method=asymmetric_parameters.get('padding_method'), + hashing_algorithm=asymmetric_parameters.get('hashing_algorithm') + ) + + assert asymmetric_parameters.get('plain_text') == result @pytest.fixture(