2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-26 05:03:37 +00:00

Add support for asymmetric encryption and decryption

This change updates the encrypt/decrypt support in the cryptography
engine to support asymmetric key algorithms, specifically RSA. Unit
tests have been added to validate the new functionality.
This commit is contained in:
Peter Hamilton
2017-08-14 21:10:12 -04:00
parent 5758c6dd1e
commit 89c997c337
2 changed files with 728 additions and 53 deletions

View File

@@ -17,6 +17,9 @@ import mock
import pytest
import testtools
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.ciphers import algorithms
from kmip.core import enums
@@ -261,10 +264,10 @@ class TestCryptographyEngine(testtools.TestCase):
*args
)
def test_encrypt_invalid_algorithm(self):
def test_encrypt_symmetric_invalid_algorithm(self):
"""
Test that the right errors are raised when invalid encryption
algorithms are used.
Test that the right errors are raised when invalid symmetric
encryption algorithms are used.
"""
engine = crypto.CryptographyEngine()
@@ -285,10 +288,10 @@ class TestCryptographyEngine(testtools.TestCase):
*args
)
def test_encrypt_invalid_algorithm_key(self):
def test_encrypt_symmetric_invalid_algorithm_key(self):
"""
Test that the right error is raised when an invalid key is used with
an encryption algorithm.
a symmetric encryption algorithm.
"""
engine = crypto.CryptographyEngine()
@@ -300,10 +303,10 @@ class TestCryptographyEngine(testtools.TestCase):
*args
)
def test_encrypt_no_mode_needed(self):
def test_encrypt_symmetric_no_mode_needed(self):
"""
Test that data can be encrypted for certain inputs without a cipher
mode.
Test that data can be symmetrically encrypted for certain inputs
without a cipher mode.
"""
engine = crypto.CryptographyEngine()
@@ -313,10 +316,10 @@ class TestCryptographyEngine(testtools.TestCase):
b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08'
)
def test_encrypt_invalid_cipher_mode(self):
def test_encrypt_symmetric_invalid_cipher_mode(self):
"""
Test that the right errors are raised when invalid cipher modes are
used.
used with symmetric encryption.
"""
engine = crypto.CryptographyEngine()
@@ -343,10 +346,10 @@ class TestCryptographyEngine(testtools.TestCase):
**kwargs
)
def test_encrypt_generate_iv(self):
def test_encrypt_symmetric_generate_iv(self):
"""
Test that the initialization vector is correctly generated and
returned for an appropriate set of encryption inputs.
returned for an appropriate set of symmetric encryption inputs.
"""
engine = crypto.CryptographyEngine()
@@ -379,9 +382,189 @@ class TestCryptographyEngine(testtools.TestCase):
self.assertNotIn('iv_nonce', result.keys())
def test_decrypt_invalid_algorithm(self):
def test_encrypt_asymmetric_invalid_encryption_algorithm(self):
"""
Test that the right errors are raised when invalid decryption
Test that the right error is raised when an invalid asymmetric
encryption algorithm is specified.
"""
engine = crypto.CryptographyEngine()
args = ('invalid', b'', b'', None, None)
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic algorithm 'invalid' is not supported for "
"asymmetric encryption.",
engine._encrypt_asymmetric,
*args
)
def test_encrypt_asymmetric_invalid_hashing_algorithm(self):
"""
Test that the right error is raised when an invalid hashing algorithm
is specified.
"""
engine = crypto.CryptographyEngine()
args = (
enums.CryptographicAlgorithm.RSA,
b'',
b''
)
kwargs = {
'padding_method': enums.PaddingMethod.OAEP,
'hashing_algorithm': 'invalid'
}
self.assertRaisesRegexp(
exceptions.InvalidField,
"The hashing algorithm 'invalid' is not supported for asymmetric "
"encryption.",
engine.encrypt,
*args,
**kwargs
)
def test_encrypt_asymmetric_invalid_padding_method(self):
"""
Test that the right error is raised when an invalid padding method
is specified.
"""
engine = crypto.CryptographyEngine()
args = (
enums.CryptographicAlgorithm.RSA,
b'',
b''
)
kwargs = {
'padding_method': 'invalid',
'hashing_algorithm': enums.HashingAlgorithm.SHA_1
}
self.assertRaisesRegexp(
exceptions.InvalidField,
"The padding method 'invalid' is not supported for asymmetric "
"encryption.",
engine.encrypt,
*args,
**kwargs
)
def test_encrypt_asymmetric_invalid_public_key(self):
"""
Test that the right error is raised when an invalid public key is
specified.
"""
engine = crypto.CryptographyEngine()
args = (
enums.CryptographicAlgorithm.RSA,
'invalid',
b''
)
kwargs = {
'padding_method': enums.PaddingMethod.OAEP,
'hashing_algorithm': enums.HashingAlgorithm.SHA_1
}
self.assertRaisesRegexp(
exceptions.CryptographicFailure,
"The public key bytes could not be loaded.",
engine.encrypt,
*args,
**kwargs
)
def test_decrypt_asymmetric_invalid_encryption_algorithm(self):
"""
Test that the right error is raised when an invalid asymmetric
decryption algorithm is specified.
"""
engine = crypto.CryptographyEngine()
args = ('invalid', b'', b'', None, None)
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic algorithm 'invalid' is not supported for "
"asymmetric decryption.",
engine._decrypt_asymmetric,
*args
)
def test_decrypt_asymmetric_invalid_hashing_algorithm(self):
"""
Test that the right error is raised when an invalid hashing algorithm
is specified.
"""
engine = crypto.CryptographyEngine()
args = (
enums.CryptographicAlgorithm.RSA,
b'',
b''
)
kwargs = {
'padding_method': enums.PaddingMethod.OAEP,
'hashing_algorithm': 'invalid'
}
self.assertRaisesRegexp(
exceptions.InvalidField,
"The hashing algorithm 'invalid' is not supported for asymmetric "
"decryption.",
engine.decrypt,
*args,
**kwargs
)
def test_decrypt_asymmetric_invalid_padding_method(self):
"""
Test that the right error is raised when an invalid padding method
is specified.
"""
engine = crypto.CryptographyEngine()
args = (
enums.CryptographicAlgorithm.RSA,
b'',
b''
)
kwargs = {
'padding_method': 'invalid',
'hashing_algorithm': enums.HashingAlgorithm.SHA_1
}
self.assertRaisesRegexp(
exceptions.InvalidField,
"The padding method 'invalid' is not supported for asymmetric "
"decryption.",
engine.decrypt,
*args,
**kwargs
)
def test_decrypt_asymmetric_invalid_private_key(self):
"""
Test that the right error is raised when an invalid private key is
specified.
"""
engine = crypto.CryptographyEngine()
args = (
enums.CryptographicAlgorithm.RSA,
'invalid',
b''
)
kwargs = {
'padding_method': enums.PaddingMethod.OAEP,
'hashing_algorithm': enums.HashingAlgorithm.SHA_1
}
self.assertRaisesRegexp(
exceptions.CryptographicFailure,
"The private key bytes could not be loaded.",
engine.decrypt,
*args,
**kwargs
)
def test_decrypt_symmetric_invalid_algorithm(self):
"""
Test that the right errors are raised when invalid symmetric decryption
algorithms are used.
"""
engine = crypto.CryptographyEngine()
@@ -403,10 +586,10 @@ class TestCryptographyEngine(testtools.TestCase):
*args
)
def test_decrypt_invalid_algorithm_key(self):
def test_decrypt_symmetric_invalid_algorithm_key(self):
"""
Test that the right error is raised when an invalid key is used with
a decryption algorithm.
a symmetric decryption algorithm.
"""
engine = crypto.CryptographyEngine()
@@ -418,10 +601,10 @@ class TestCryptographyEngine(testtools.TestCase):
*args
)
def test_decrypt_invalid_cipher_mode(self):
def test_decrypt_symmetric_invalid_cipher_mode(self):
"""
Test that the right errors are raised when invalid cipher modes are
used.
used with symmetric decryption.
"""
engine = crypto.CryptographyEngine()
@@ -448,10 +631,10 @@ class TestCryptographyEngine(testtools.TestCase):
**kwargs
)
def test_decrypt_missing_iv_nonce(self):
def test_decrypt_symmetric_missing_iv_nonce(self):
"""
Test that the right error is raised when an IV/nonce is not provided
for the decryption algorithm.
for the symmetric decryption algorithm.
"""
engine = crypto.CryptographyEngine()
@@ -884,72 +1067,262 @@ class TestCryptographyEngine(testtools.TestCase):
'iv_nonce': None}
]
)
def encrypt_parameters(request):
def symmetric_parameters(request):
return request.param
def test_encrypt(encrypt_parameters):
def test_encrypt_symmetric(symmetric_parameters):
"""
Test that various encryption algorithms and block cipher modes can be
used to correctly encrypt data.
used to correctly symmetrically encrypt data.
"""
engine = crypto.CryptographyEngine()
engine._handle_symmetric_padding = mock.MagicMock(
return_value=encrypt_parameters.get('plain_text')
return_value=symmetric_parameters.get('plain_text')
)
result = engine.encrypt(
encrypt_parameters.get('algorithm'),
encrypt_parameters.get('key'),
encrypt_parameters.get('plain_text'),
cipher_mode=encrypt_parameters.get('cipher_mode'),
iv_nonce=encrypt_parameters.get('iv_nonce')
symmetric_parameters.get('algorithm'),
symmetric_parameters.get('key'),
symmetric_parameters.get('plain_text'),
cipher_mode=symmetric_parameters.get('cipher_mode'),
iv_nonce=symmetric_parameters.get('iv_nonce')
)
if engine._handle_symmetric_padding.called:
engine._handle_symmetric_padding.assert_called_once_with(
engine._symmetric_key_algorithms.get(
encrypt_parameters.get('algorithm')
symmetric_parameters.get('algorithm')
),
encrypt_parameters.get('plain_text'),
symmetric_parameters.get('plain_text'),
None
)
assert encrypt_parameters.get('cipher_text') == result.get('cipher_text')
assert symmetric_parameters.get('cipher_text') == result.get('cipher_text')
def test_decrypt(encrypt_parameters):
def test_decrypt_symmetric(symmetric_parameters):
"""
Test that various decryption algorithms and block cipher modes can be
used to correctly decrypt data.
used to correctly symmetrically decrypt data.
"""
engine = crypto.CryptographyEngine()
engine._handle_symmetric_padding = mock.MagicMock(
return_value=encrypt_parameters.get('plain_text')
return_value=symmetric_parameters.get('plain_text')
)
result = engine.decrypt(
encrypt_parameters.get('algorithm'),
encrypt_parameters.get('key'),
encrypt_parameters.get('cipher_text'),
cipher_mode=encrypt_parameters.get('cipher_mode'),
iv_nonce=encrypt_parameters.get('iv_nonce')
symmetric_parameters.get('algorithm'),
symmetric_parameters.get('key'),
symmetric_parameters.get('cipher_text'),
cipher_mode=symmetric_parameters.get('cipher_mode'),
iv_nonce=symmetric_parameters.get('iv_nonce')
)
if engine._handle_symmetric_padding.called:
engine._handle_symmetric_padding.assert_called_once_with(
engine._symmetric_key_algorithms.get(
encrypt_parameters.get('algorithm')
symmetric_parameters.get('algorithm')
),
encrypt_parameters.get('plain_text'),
symmetric_parameters.get('plain_text'),
None,
undo_padding=True
)
assert encrypt_parameters.get('plain_text') == result
assert symmetric_parameters.get('plain_text') == result
# Most of these test vectors were obtained from the pyca/cryptography test
# suite:
#
# cryptography_vectors/asymmetric/RSA/pkcs-1v2-1d2-vec/oaep-vect.txt
# cryptography_vectors/asymmetric/RSA/pkcs1v15crypt-vectors.txt
@pytest.fixture(
scope='function',
params=[
{'algorithm': enums.CryptographicAlgorithm.RSA,
'padding_method': enums.PaddingMethod.OAEP,
'hashing_algorithm': enums.HashingAlgorithm.SHA_1,
'encoding': serialization.Encoding.DER,
'public_key': {
'n': int(
'a8b3b284af8eb50b387034a860f146c4919f318763cd6c5598c8ae4811a'
'1e0abc4c7e0b082d693a5e7fced675cf4668512772c0cbc64a742c6c630'
'f533c8cc72f62ae833c40bf25842e984bb78bdbf97c0107d55bdb662f5c'
'4e0fab9845cb5148ef7392dd3aaff93ae1e6b667bb3d4247616d4f5ba10'
'd4cfd226de88d39f16fb',
16
),
'e': int('010001', 16)
},
'private_key': {
'd': int(
'53339cfdb79fc8466a655c7316aca85c55fd8f6dd898fdaf119517ef4f5'
'2e8fd8e258df93fee180fa0e4ab29693cd83b152a553d4ac4d1812b8b9f'
'a5af0e7f55fe7304df41570926f3311f15c4d65a732c483116ee3d3d2d0'
'af3549ad9bf7cbfb78ad884f84d5beb04724dc7369b31def37d0cf539e9'
'cfcdd3de653729ead5d1',
16
),
'p': int(
'd32737e7267ffe1341b2d5c0d150a81b586fb3132bed2f8d5262864a9cb'
'9f30af38be448598d413a172efb802c21acf1c11c520c2f26a471dcad21'
'2eac7ca39d',
16
),
'q': int(
'cc8853d1d54da630fac004f471f281c7b8982d8224a490edbeb33d3e3d5'
'cc93c4765703d1dd791642f1f116a0dd852be2419b2af72bfe9a030e860'
'b0288b5d77',
16
),
'dmp1': int(
'0e12bf1718e9cef5599ba1c3882fe8046a90874eefce8f2ccc20e4f2741'
'fb0a33a3848aec9c9305fbecbd2d76819967d4671acc6431e4037968db3'
'7878e695c1',
16
),
'dmq1': int(
'95297b0f95a2fa67d00707d609dfd4fc05c89dafc2ef6d6ea55bec771ea'
'333734d9251e79082ecda866efef13c459e1a631386b7e354c899f5f112'
'ca85d71583',
16
),
'iqmp': int(
'4f456c502493bdc0ed2ab756a3a6ed4d67352a697d4216e93212b127a63'
'd5411ce6fa98d5dbefd73263e3728142743818166ed7dd63687dd2a8ca1'
'd2f4fbd8e1',
16
)
},
'plain_text': (
b'\x66\x28\x19\x4e\x12\x07\x3d\xb0'
b'\x3b\xa9\x4c\xda\x9e\xf9\x53\x23'
b'\x97\xd5\x0d\xba\x79\xb9\x87\x00'
b'\x4a\xfe\xfe\x34'
)},
{'algorithm': enums.CryptographicAlgorithm.RSA,
'padding_method': enums.PaddingMethod.PKCS1v15,
'encoding': serialization.Encoding.PEM,
'public_key': {
'n': int(
'98b70582ca808fd1d3509562a0ef305af6d9875443b35bdf24d536353e3'
'f1228dcd12a78568356c6ff323abf72ac1cdbfe712fb49fe594a5a2175d'
'48b6732538d8df37cb970be4a5b562c3f298db9ddf75607877918cced1d'
'0d1f377338c0d3d3207797e862c65d11439e588177527a7ded91971adcf'
'91e2e834e37f05a73655',
16
),
'e': int('010001', 16)
},
'private_key': {
'd': int(
'0614a786052d284cd906a8e413f7622c050f3549c026589ea27750e0bed'
'9410e5a7883a1e603f5c517ad36d49faac5bd66bcb8030fa8d309e351dd'
'd782d843df975680ae73eea9aab289b757205dadb8fdfb989ec8db8e709'
'5f51f24529f5637aa669331e2569f8b854abecec99aa264c3da7cc6866f'
'0c0e1fb8469848581c73',
16
),
'p': int(
'cb61a88c8c305ad9a8fbec2ba4c86cccc2028024aa1690c29bc8264d2fe'
'be87e4f86e912ef0f5c1853d71cbc9b14baed3c37cef6c7a3598b6fbe06'
'4810905b57',
16
),
'q': int(
'c0399f0b9380faba38ff80d2fff6ede79cfdabf658972077a5e2b295693'
'ea51072268b91746eea9be04ad66100ebed733db4cd0147a18d6de8c0cd'
'8fbf249c33',
16
),
'dmp1': int(
'944c3a6579574cf7873362ab14359cb7d50393c2a84f59f0bd3cbd48ed1'
'77c6895be8eb6e29ff58c3b9e0ff32ab57bf3be440762848184aa9aa919'
'd574567e73',
16
),
'dmq1': int(
'45ebefd58727308cd2b4e6085a8158d29a418feec114e00385bceb96fbb'
'c84d071a561b95c30087900e2580edb05f6cea7907fcdca5f92917b4bbe'
'ba5e1e140f',
16
),
'iqmp': int(
'c52468c8fd15e5da2f6c8eba4e97baebe995b67a1a7ad719dd9fff366b1'
'84d5ab455075909292044ecb345cf2cdd26228e21f85183255f4a9e69f4'
'c7152ebb0f',
16
)
},
'plain_text': (
b'\xe9\xa7\x71\xe0\xa6\x5f\x28\x70'
b'\x8e\x83\xd5\xe6\xcc\x89\x8a\x41'
b'\xd7'
)}
]
)
def asymmetric_parameters(request):
return request.param
def test_encrypt_decrypt_asymmetric(asymmetric_parameters):
"""
Test that various encryption/decryption algorithms can be used to
correctly asymmetrically encrypt data.
"""
# NOTE (peter-hamilton) Randomness included in RSA padding schemes
# makes it impossible to unit test just encryption; it's not possible
# to predict the cipher text. Instead, we test the encrypt/decrypt
# cycle to ensure that they correctly mirror each other.
backend = backends.default_backend()
public_key_numbers = rsa.RSAPublicNumbers(
asymmetric_parameters.get('public_key').get('e'),
asymmetric_parameters.get('public_key').get('n')
)
public_key = public_key_numbers.public_key(backend)
public_bytes = public_key.public_bytes(
asymmetric_parameters.get('encoding'),
serialization.PublicFormat.PKCS1
)
private_key_numbers = rsa.RSAPrivateNumbers(
p=asymmetric_parameters.get('private_key').get('p'),
q=asymmetric_parameters.get('private_key').get('q'),
d=asymmetric_parameters.get('private_key').get('d'),
dmp1=asymmetric_parameters.get('private_key').get('dmp1'),
dmq1=asymmetric_parameters.get('private_key').get('dmq1'),
iqmp=asymmetric_parameters.get('private_key').get('iqmp'),
public_numbers=public_key_numbers
)
private_key = private_key_numbers.private_key(backend)
private_bytes = private_key.private_bytes(
asymmetric_parameters.get('encoding'),
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
)
engine = crypto.CryptographyEngine()
result = engine.encrypt(
asymmetric_parameters.get('algorithm'),
public_bytes,
asymmetric_parameters.get('plain_text'),
padding_method=asymmetric_parameters.get('padding_method'),
hashing_algorithm=asymmetric_parameters.get('hashing_algorithm')
)
result = engine.decrypt(
asymmetric_parameters.get('algorithm'),
private_bytes,
result.get('cipher_text'),
padding_method=asymmetric_parameters.get('padding_method'),
hashing_algorithm=asymmetric_parameters.get('hashing_algorithm')
)
assert asymmetric_parameters.get('plain_text') == result
@pytest.fixture(