mirror of
https://github.com/openkmip/pykmip
synced 2025-12-10 05:13:15 +00:00
Adding KmipEngine support for Create
This change adds support for the Create operation to the KmipEngine. New exceptions and test cases are included.
This commit is contained in:
@@ -35,6 +35,7 @@ from kmip.core.factories import attributes as factory
|
||||
from kmip.core.messages import contents
|
||||
from kmip.core.messages import messages
|
||||
|
||||
from kmip.core.messages.payloads import create
|
||||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
@@ -643,18 +644,21 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e = engine.KmipEngine()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
e._process_create = mock.MagicMock()
|
||||
e._process_register = mock.MagicMock()
|
||||
e._process_get = mock.MagicMock()
|
||||
e._process_destroy = mock.MagicMock()
|
||||
e._process_query = mock.MagicMock()
|
||||
e._process_discover_versions = mock.MagicMock()
|
||||
|
||||
e._process_operation(enums.Operation.CREATE, None)
|
||||
e._process_operation(enums.Operation.REGISTER, None)
|
||||
e._process_operation(enums.Operation.GET, None)
|
||||
e._process_operation(enums.Operation.DESTROY, None)
|
||||
e._process_operation(enums.Operation.QUERY, None)
|
||||
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
|
||||
|
||||
e._process_create.assert_called_with(None)
|
||||
e._process_register.assert_called_with(None)
|
||||
e._process_get.assert_called_with(None)
|
||||
e._process_destroy.assert_called_with(None)
|
||||
@@ -1425,6 +1429,267 @@ class TestKmipEngine(testtools.TestCase):
|
||||
*args
|
||||
)
|
||||
|
||||
def test_create(self):
|
||||
"""
|
||||
Test that a Create request can be processed correctly.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
# Build Create request
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
256
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
payload = create.CreateRequestPayload(
|
||||
object_type,
|
||||
template_attribute
|
||||
)
|
||||
|
||||
response_payload = e._process_create(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Create"
|
||||
)
|
||||
|
||||
uid = response_payload.unique_identifier.value
|
||||
self.assertEqual('1', uid)
|
||||
|
||||
# Retrieve the stored object and verify all attributes were set
|
||||
# appropriately.
|
||||
symmetric_key = e._data_session.query(
|
||||
pie_objects.SymmetricKey
|
||||
).filter(
|
||||
pie_objects.ManagedObject.unique_identifier == uid
|
||||
).one()
|
||||
self.assertEqual(
|
||||
enums.KeyFormatType.RAW,
|
||||
symmetric_key.key_format_type
|
||||
)
|
||||
self.assertEqual(1, len(symmetric_key.names))
|
||||
self.assertIn('Test Symmetric Key', symmetric_key.names)
|
||||
self.assertEqual(256, len(symmetric_key.value) * 8)
|
||||
self.assertEqual(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
symmetric_key.cryptographic_algorithm
|
||||
)
|
||||
self.assertEqual(256, symmetric_key.cryptographic_length)
|
||||
self.assertEqual(2, len(symmetric_key.cryptographic_usage_masks))
|
||||
self.assertIn(
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
symmetric_key.cryptographic_usage_masks
|
||||
)
|
||||
self.assertIn(
|
||||
enums.CryptographicUsageMask.DECRYPT,
|
||||
symmetric_key.cryptographic_usage_masks
|
||||
)
|
||||
|
||||
self.assertEqual(uid, e._id_placeholder)
|
||||
|
||||
def test_create_unsupported_object_type(self):
|
||||
"""
|
||||
Test that an InvalidField error is generated when attempting to
|
||||
create an unsupported object type.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
object_type = attributes.ObjectType(enums.ObjectType.PUBLIC_KEY)
|
||||
payload = create.CreateRequestPayload(
|
||||
object_type
|
||||
)
|
||||
|
||||
args = (payload, )
|
||||
regex = "Cannot create a PublicKey object with the Create operation."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_create,
|
||||
*args
|
||||
)
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Create"
|
||||
)
|
||||
|
||||
def test_create_omitting_attributes(self):
|
||||
"""
|
||||
Test that InvalidField errors are generated when trying to create
|
||||
a symmetric key without required attributes.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
# Test the error for omitting the Cryptographic Algorithm
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
256
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
payload = create.CreateRequestPayload(
|
||||
object_type,
|
||||
template_attribute
|
||||
)
|
||||
|
||||
args = (payload, )
|
||||
regex = (
|
||||
"The cryptographic algorithm must be specified as an attribute."
|
||||
)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_create,
|
||||
*args
|
||||
)
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Create"
|
||||
)
|
||||
e._logger.reset_mock()
|
||||
|
||||
# Test the error for omitting the Cryptographic Length
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
payload = create.CreateRequestPayload(
|
||||
object_type,
|
||||
template_attribute
|
||||
)
|
||||
|
||||
args = (payload, )
|
||||
regex = (
|
||||
"The cryptographic length must be specified as an attribute."
|
||||
)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_create,
|
||||
*args
|
||||
)
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Create"
|
||||
)
|
||||
e._logger.reset_mock()
|
||||
|
||||
# Test the error for omitting the Cryptographic Usage Mask
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
256
|
||||
)
|
||||
]
|
||||
)
|
||||
payload = create.CreateRequestPayload(
|
||||
object_type,
|
||||
template_attribute
|
||||
)
|
||||
|
||||
args = (payload, )
|
||||
regex = (
|
||||
"The cryptographic usage mask must be specified as an attribute."
|
||||
)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_create,
|
||||
*args
|
||||
)
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Create"
|
||||
)
|
||||
e._logger.reset_mock()
|
||||
|
||||
def test_register(self):
|
||||
"""
|
||||
Test that a Register request can be processed correctly.
|
||||
@@ -1912,23 +2177,27 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsInstance(result, query.QueryResponsePayload)
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(4, len(result.operations))
|
||||
self.assertEqual(5, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.REGISTER,
|
||||
enums.Operation.CREATE,
|
||||
result.operations[0].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.GET,
|
||||
enums.Operation.REGISTER,
|
||||
result.operations[1].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.DESTROY,
|
||||
enums.Operation.GET,
|
||||
result.operations[2].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
enums.Operation.DESTROY,
|
||||
result.operations[3].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
result.operations[4].value
|
||||
)
|
||||
self.assertEqual(list(), result.object_types)
|
||||
self.assertIsNotNone(result.vendor_identification)
|
||||
self.assertEqual(
|
||||
@@ -1947,7 +2216,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(5, len(result.operations))
|
||||
self.assertEqual(6, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.DISCOVER_VERSIONS,
|
||||
result.operations[-1].value
|
||||
@@ -2019,6 +2288,129 @@ class TestKmipEngine(testtools.TestCase):
|
||||
)
|
||||
self.assertEqual([], result.protocol_versions)
|
||||
|
||||
def test_create_get_destroy(self):
|
||||
"""
|
||||
Test that a managed object can be created, retrieved, and destroyed
|
||||
without error.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
# Build a SymmetricKey for registration.
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
256
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
# Create the symmetric key with the corresponding attributes
|
||||
payload = create.CreateRequestPayload(
|
||||
object_type=object_type,
|
||||
template_attribute=template_attribute
|
||||
)
|
||||
|
||||
response_payload = e._process_create(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Create"
|
||||
)
|
||||
|
||||
uid = response_payload.unique_identifier.value
|
||||
self.assertEqual('1', uid)
|
||||
|
||||
e._logger.reset_mock()
|
||||
|
||||
# Retrieve the created key using Get and verify all fields set
|
||||
payload = get.GetRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(uid)
|
||||
)
|
||||
|
||||
response_payload = e._process_get(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Get"
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
response_payload.object_type.value
|
||||
)
|
||||
self.assertEqual(str(uid), response_payload.unique_identifier.value)
|
||||
self.assertIsInstance(response_payload.secret, secrets.SymmetricKey)
|
||||
|
||||
key_block = response_payload.secret.key_block
|
||||
self.assertEqual(
|
||||
256,
|
||||
len(key_block.key_value.key_material.value) * 8
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.KeyFormatType.RAW,
|
||||
key_block.key_format_type.value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
key_block.cryptographic_algorithm.value
|
||||
)
|
||||
self.assertEqual(
|
||||
256,
|
||||
key_block.cryptographic_length.value
|
||||
)
|
||||
|
||||
e._logger.reset_mock()
|
||||
|
||||
# Destroy the symmetric key and verify it cannot be accessed again
|
||||
payload = destroy.DestroyRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(uid)
|
||||
)
|
||||
|
||||
response_payload = e._process_destroy(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Destroy"
|
||||
)
|
||||
self.assertEqual(str(uid), response_payload.unique_identifier.value)
|
||||
self.assertRaises(
|
||||
exc.NoResultFound,
|
||||
e._data_session.query(pie_objects.OpaqueObject).filter(
|
||||
pie_objects.ManagedObject.unique_identifier == uid
|
||||
).one
|
||||
)
|
||||
|
||||
e._data_session.commit()
|
||||
e._data_store_session_factory()
|
||||
|
||||
def test_register_get_destroy(self):
|
||||
"""
|
||||
Test that a managed object can be registered, retrieved, and destroyed
|
||||
|
||||
Reference in New Issue
Block a user