mirror of
https://github.com/openkmip/pykmip
synced 2025-12-10 21:33:15 +00:00
Add Encrypt operation support to the server
This change adds the Encrypt operation to the server. Support is currently limited to symmetric encryption only. The encryption key used with the operation must be in the Active state and it must have the Encrypt bit set in its cryptographic usage mask.
This commit is contained in:
@@ -45,6 +45,7 @@ from kmip.core.messages.payloads import create
|
||||
from kmip.core.messages.payloads import create_key_pair
|
||||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import encrypt
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import get_attributes
|
||||
@@ -893,6 +894,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_destroy = mock.MagicMock()
|
||||
e._process_query = mock.MagicMock()
|
||||
e._process_discover_versions = mock.MagicMock()
|
||||
e._process_encrypt = mock.MagicMock()
|
||||
|
||||
e._process_operation(enums.Operation.CREATE, None)
|
||||
e._process_operation(enums.Operation.CREATE_KEY_PAIR, None)
|
||||
@@ -904,6 +906,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_operation(enums.Operation.DESTROY, None)
|
||||
e._process_operation(enums.Operation.QUERY, None)
|
||||
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
|
||||
e._process_operation(enums.Operation.ENCRYPT, None)
|
||||
|
||||
e._process_create.assert_called_with(None)
|
||||
e._process_create_key_pair.assert_called_with(None)
|
||||
@@ -915,6 +918,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_destroy.assert_called_with(None)
|
||||
e._process_query.assert_called_with(None)
|
||||
e._process_discover_versions.assert_called_with(None)
|
||||
e._process_encrypt.assert_called_with(None)
|
||||
|
||||
def test_unsupported_operation(self):
|
||||
"""
|
||||
@@ -5004,7 +5008,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsInstance(result, query.QueryResponsePayload)
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(12, len(result.operations))
|
||||
self.assertEqual(13, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.CREATE,
|
||||
result.operations[0].value
|
||||
@@ -5050,9 +5054,13 @@ class TestKmipEngine(testtools.TestCase):
|
||||
result.operations[10].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.MAC,
|
||||
enums.Operation.ENCRYPT,
|
||||
result.operations[11].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.MAC,
|
||||
result.operations[12].value
|
||||
)
|
||||
self.assertEqual(list(), result.object_types)
|
||||
self.assertIsNotNone(result.vendor_identification)
|
||||
self.assertEqual(
|
||||
@@ -5129,6 +5137,352 @@ class TestKmipEngine(testtools.TestCase):
|
||||
)
|
||||
self.assertEqual([], result.protocol_versions)
|
||||
|
||||
def test_encrypt(self):
|
||||
"""
|
||||
Test that an Encrypt request can be processed correctly.
|
||||
|
||||
The test vectors used here come from Eric Young's test set for
|
||||
Blowfish, via https://www.di-mgt.com.au/cryptopad.html.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._cryptography_engine.logger = mock.MagicMock()
|
||||
|
||||
encryption_key = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||
128,
|
||||
(
|
||||
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||
),
|
||||
[enums.CryptographicUsageMask.ENCRYPT]
|
||||
)
|
||||
encryption_key.state = enums.State.ACTIVE
|
||||
|
||||
e._data_session.add(encryption_key)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
unique_identifier = str(encryption_key.unique_identifier)
|
||||
cryptographic_parameters = attributes.CryptographicParameters(
|
||||
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS5,
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||
)
|
||||
data = (
|
||||
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||
b'\x66\x6F\x72\x20\x00'
|
||||
)
|
||||
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||
|
||||
payload = encrypt.EncryptRequestPayload(
|
||||
unique_identifier,
|
||||
cryptographic_parameters,
|
||||
data,
|
||||
iv_counter_nonce
|
||||
)
|
||||
|
||||
response_payload = e._process_encrypt(payload)
|
||||
|
||||
e._logger.info.assert_any_call("Processing operation: Encrypt")
|
||||
self.assertEqual(
|
||||
unique_identifier,
|
||||
response_payload.unique_identifier
|
||||
)
|
||||
self.assertEqual(
|
||||
(
|
||||
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
|
||||
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
|
||||
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
|
||||
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
|
||||
),
|
||||
response_payload.data
|
||||
)
|
||||
self.assertIsNone(response_payload.iv_counter_nonce)
|
||||
|
||||
def test_encrypt_no_iv_counter_nonce(self):
|
||||
"""
|
||||
Test that an Encrypt request can be processed correctly when a
|
||||
specific IV/counter/nonce is not specified.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._cryptography_engine.logger = mock.MagicMock()
|
||||
|
||||
encryption_key = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||
128,
|
||||
(
|
||||
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||
),
|
||||
[enums.CryptographicUsageMask.ENCRYPT]
|
||||
)
|
||||
encryption_key.state = enums.State.ACTIVE
|
||||
|
||||
e._data_session.add(encryption_key)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
unique_identifier = str(encryption_key.unique_identifier)
|
||||
cryptographic_parameters = attributes.CryptographicParameters(
|
||||
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS5,
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||
)
|
||||
data = (
|
||||
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||
b'\x66\x6F\x72\x20\x00'
|
||||
)
|
||||
iv_counter_nonce = None
|
||||
|
||||
payload = encrypt.EncryptRequestPayload(
|
||||
unique_identifier,
|
||||
cryptographic_parameters,
|
||||
data,
|
||||
iv_counter_nonce
|
||||
)
|
||||
|
||||
response_payload = e._process_encrypt(payload)
|
||||
|
||||
e._logger.info.assert_any_call("Processing operation: Encrypt")
|
||||
self.assertEqual(
|
||||
unique_identifier,
|
||||
response_payload.unique_identifier
|
||||
)
|
||||
self.assertIsNotNone(response_payload.data)
|
||||
self.assertIsNotNone(response_payload.iv_counter_nonce)
|
||||
|
||||
def test_encrypt_no_cryptographic_parameters(self):
|
||||
"""
|
||||
Test that the right error is thrown when cryptographic parameters
|
||||
are not provided with an Encrypt request.
|
||||
|
||||
Note: once the cryptographic parameters can be obtained from the
|
||||
encryption key's attributes, this test should be updated to
|
||||
reflect that.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._cryptography_engine.logger = mock.MagicMock()
|
||||
|
||||
encryption_key = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||
128,
|
||||
(
|
||||
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||
),
|
||||
[enums.CryptographicUsageMask.ENCRYPT]
|
||||
)
|
||||
encryption_key.state = enums.State.ACTIVE
|
||||
|
||||
e._data_session.add(encryption_key)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
unique_identifier = str(encryption_key.unique_identifier)
|
||||
cryptographic_parameters = None
|
||||
data = (
|
||||
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||
b'\x66\x6F\x72\x20\x00'
|
||||
)
|
||||
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||
|
||||
payload = encrypt.EncryptRequestPayload(
|
||||
unique_identifier,
|
||||
cryptographic_parameters,
|
||||
data,
|
||||
iv_counter_nonce
|
||||
)
|
||||
|
||||
args = (payload, )
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
"The cryptographic parameters must be specified.",
|
||||
e._process_encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_encrypt_invalid_encryption_key(self):
|
||||
"""
|
||||
Test that the right error is thrown when an invalid encryption key
|
||||
is specified with an Encrypt request.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._cryptography_engine.logger = mock.MagicMock()
|
||||
|
||||
encryption_key = pie_objects.OpaqueObject(
|
||||
b'\x01\x02\x03\x04',
|
||||
enums.OpaqueDataType.NONE
|
||||
)
|
||||
|
||||
e._data_session.add(encryption_key)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
unique_identifier = str(encryption_key.unique_identifier)
|
||||
cryptographic_parameters = attributes.CryptographicParameters(
|
||||
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS5,
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||
)
|
||||
data = (
|
||||
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||
b'\x66\x6F\x72\x20\x00'
|
||||
)
|
||||
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||
|
||||
payload = encrypt.EncryptRequestPayload(
|
||||
unique_identifier,
|
||||
cryptographic_parameters,
|
||||
data,
|
||||
iv_counter_nonce
|
||||
)
|
||||
|
||||
args = (payload, )
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.PermissionDenied,
|
||||
"The requested encryption key is not a symmetric key. "
|
||||
"Only symmetric encryption is currently supported.",
|
||||
e._process_encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_encrypt_inactive_encryption_key(self):
|
||||
"""
|
||||
Test that the right error is thrown when an inactive encryption key
|
||||
is specified with an Encrypt request.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._cryptography_engine.logger = mock.MagicMock()
|
||||
|
||||
encryption_key = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||
128,
|
||||
(
|
||||
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||
),
|
||||
[enums.CryptographicUsageMask.ENCRYPT]
|
||||
)
|
||||
|
||||
e._data_session.add(encryption_key)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
unique_identifier = str(encryption_key.unique_identifier)
|
||||
cryptographic_parameters = attributes.CryptographicParameters(
|
||||
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS5,
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||
)
|
||||
data = (
|
||||
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||
b'\x66\x6F\x72\x20\x00'
|
||||
)
|
||||
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||
|
||||
payload = encrypt.EncryptRequestPayload(
|
||||
unique_identifier,
|
||||
cryptographic_parameters,
|
||||
data,
|
||||
iv_counter_nonce
|
||||
)
|
||||
|
||||
args = (payload,)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.PermissionDenied,
|
||||
"The encryption key must be in the Active state to be used "
|
||||
"for encryption.",
|
||||
e._process_encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_encrypt_non_encryption_key(self):
|
||||
"""
|
||||
Test that the right error is thrown when a non-encryption key
|
||||
is specified with an Encrypt request.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._cryptography_engine.logger = mock.MagicMock()
|
||||
|
||||
encryption_key = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||
128,
|
||||
(
|
||||
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||
),
|
||||
[enums.CryptographicUsageMask.DECRYPT]
|
||||
)
|
||||
encryption_key.state = enums.State.ACTIVE
|
||||
|
||||
e._data_session.add(encryption_key)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
unique_identifier = str(encryption_key.unique_identifier)
|
||||
cryptographic_parameters = attributes.CryptographicParameters(
|
||||
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS5,
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||
)
|
||||
data = (
|
||||
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||
b'\x66\x6F\x72\x20\x00'
|
||||
)
|
||||
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||
|
||||
payload = encrypt.EncryptRequestPayload(
|
||||
unique_identifier,
|
||||
cryptographic_parameters,
|
||||
data,
|
||||
iv_counter_nonce
|
||||
)
|
||||
|
||||
args = (payload,)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.PermissionDenied,
|
||||
"The Encrypt bit must be set in the encryption key's "
|
||||
"cryptographic usage mask.",
|
||||
e._process_encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_mac(self):
|
||||
"""
|
||||
Test that a MAC request can be processed correctly.
|
||||
|
||||
Reference in New Issue
Block a user