2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-27 21:53:39 +00:00

Add ObjectGroup support to the server

This change ObjectGroup attribute support to the server, allowing
for the storage and retrieval of the new attribute in addition to
object filtering based on its value. New unit tests have been
added to cover the new changes.
This commit is contained in:
Peter Hamilton
2019-10-10 17:34:28 -04:00
committed by Peter Hamilton
parent 1b81fff431
commit 009e8cecc9
3 changed files with 133 additions and 8 deletions

View File

@@ -1584,7 +1584,7 @@ class TestKmipEngine(testtools.TestCase):
symmetric_key,
'Object Group'
)
self.assertEqual(None, result)
self.assertEqual([], result)
result = e._get_attribute_from_managed_object(
symmetric_key,
@@ -1900,16 +1900,16 @@ class TestKmipEngine(testtools.TestCase):
)
# Test that an unsupported attribute cannot be set.
object_group = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
'Test Group'
custom_attribute = attribute_factory.create_attribute(
enums.AttributeType.CUSTOM_ATTRIBUTE,
"Test Group"
)
args = (
managed_object,
('Object Group', object_group.attribute_value)
("Custom Attribute", custom_attribute.attribute_value)
)
regex = "The Object Group attribute is unsupported."
regex = "The Custom Attribute attribute is unsupported."
six.assertRaisesRegex(
self,
exceptions.InvalidField,
@@ -2430,6 +2430,10 @@ class TestKmipEngine(testtools.TestCase):
"application_namespace": "ssl",
"application_data": "www.example.com"
}
),
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Group1"
)
]
)
@@ -2489,6 +2493,8 @@ class TestKmipEngine(testtools.TestCase):
"www.example.com",
symmetric_key.app_specific_info[0].application_data
)
self.assertEqual(1, len(symmetric_key.object_groups))
self.assertEqual("Group1", symmetric_key.object_groups[0].object_group)
self.assertEqual(uid, e._id_placeholder)
@@ -5735,6 +5741,110 @@ class TestKmipEngine(testtools.TestCase):
self.assertEqual(1, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
def test_locate_with_object_group(self):
"""
Test the Locate operation when the 'Object Group'
attribute is given.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
key = (
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)
object_group_a = pie_objects.ObjectGroup(object_group="Group1")
object_group_b = pie_objects.ObjectGroup(object_group="Group2")
obj_a = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
key,
name='name1'
)
obj_a.object_groups.append(object_group_a)
obj_b = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)
obj_b.object_groups.append(object_group_a)
obj_b.object_groups.append(object_group_b)
obj_c = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)
e._data_session.add(obj_a)
e._data_session.add(obj_b)
e._data_session.add(obj_c)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
id_b = str(obj_b.unique_identifier)
attribute_factory = factory.AttributeFactory()
# Locate the symmetric key objects based on their shared object group
# attribute.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Group1"
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_b)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified object group ('Group1') does not match any "
"of the object's associated object group attributes."
)
self.assertEqual(2, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
self.assertIn(id_b, response_payload.unique_identifiers)
# Locate a single symmetric key object based on its unique object group
# attribute.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Group2"
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_b)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified object group ('Group2') does not match any "
"of the object's associated object group attributes."
)
self.assertEqual(1, len(response_payload.unique_identifiers))
self.assertIn(id_b, response_payload.unique_identifiers)
def test_get(self):
"""
Test that a Get request can be processed correctly.