From 01eb144243be77e60049ac761315a4421af18b1e Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Thu, 3 Oct 2019 15:34:38 -0400 Subject: [PATCH] Add ApplicationSpecificInformation support to the server This change adds ApplicationSpecificInformation attribute support to the server, allowing for the storage and retrieval of the new attribute in addition to object filtering based on its value. New unit tests have been added to cover the new changes. --- kmip/core/attributes.py | 4 +- kmip/core/factories/attribute_values.py | 27 +--- kmip/services/server/engine.py | 40 ++++- .../test_application_specific_information.py | 2 +- .../core/factories/test_attribute_values.py | 50 ++++++- .../test_application_specific_information.py | 6 +- .../tests/unit/services/server/test_engine.py | 138 +++++++++++++++++- 7 files changed, 238 insertions(+), 29 deletions(-) diff --git a/kmip/core/attributes.py b/kmip/core/attributes.py index 507c139..9bfe313 100644 --- a/kmip/core/attributes.py +++ b/kmip/core/attributes.py @@ -1248,10 +1248,10 @@ class ApplicationSpecificInformation(primitives.Struct): def __str__(self): value = ", ".join( [ - '"application_namespace": {}'.format( + '"application_namespace": "{}"'.format( self.application_namespace ), - '"application_data": {}'.format(self.application_data) + '"application_data": "{}"'.format(self.application_data) ] ) return "{" + value + "}" diff --git a/kmip/core/factories/attribute_values.py b/kmip/core/factories/attribute_values.py index e394e48..cddab89 100644 --- a/kmip/core/factories/attribute_values.py +++ b/kmip/core/factories/attribute_values.py @@ -277,28 +277,13 @@ class AttributeValueFactory(object): return attributes.ObjectGroup(group) def _create_application_specific_information(self, info): - if info is None: - return attributes.ApplicationSpecificInformation() + if info: + return attributes.ApplicationSpecificInformation( + application_namespace=info.get("application_namespace"), + application_data=info.get("application_data") + ) else: - application_namespace = info.get('application_namespace') - application_data = info.get('application_data') - - if not isinstance(application_namespace, str): - msg = utils.build_er_error( - attributes.ApplicationSpecificInformation, - 'constructor argument type', - str, type(application_namespace)) - raise TypeError(msg) - - if not isinstance(application_data, str): - msg = utils.build_er_error( - attributes.ApplicationSpecificInformation, - 'constructor argument type', - str, type(application_data)) - raise TypeError(msg) - - return attributes.ApplicationSpecificInformation.create( - application_namespace, application_data) + return attributes.ApplicationSpecificInformation() def _create_contact_information(self, info): if info is None: diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index d410fc5..cc9e501 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -730,8 +730,16 @@ class KmipEngine(object): return None elif attr_name == 'Link': return None - elif attr_name == 'Application Specific Information': - return None + elif attr_name == "Application Specific Information": + values = [] + for info in managed_object.app_specific_info: + values.append( + { + "application_namespace": info.application_namespace, + "application_data": info.application_data + } + ) + return values elif attr_name == 'Contact Information': return None elif attr_name == 'Last Change Date': @@ -781,6 +789,14 @@ class KmipEngine(object): raise exceptions.InvalidField( "Cannot set duplicate name values." ) + elif attribute_name == "Application Specific Information": + for value in attribute_value: + managed_object.app_specific_info.append( + objects.ApplicationSpecificInformation( + application_namespace=value.application_namespace, + application_data=value.application_data + ) + ) else: # TODO (peterhamilton) Remove when all attributes are supported raise exceptions.InvalidField( @@ -1662,6 +1678,26 @@ class KmipEngine(object): ) if attribute is None: continue + elif name == "Application Specific Information": + application_namespace = value.application_namespace + application_data = value.application_data + v = { + "application_namespace": application_namespace, + "application_data": application_data + } + if v not in attribute: + self._logger.debug( + "Failed match: " + "the specified application specific " + "information ('{}', '{}') does not match any " + "of the object's associated application " + "specific information attributes.".format( + v.get("application_namespace"), + v.get("application_data") + ) + ) + add_object = False + break elif name == "Name": if value not in attribute: self._logger.debug( diff --git a/kmip/tests/unit/core/attributes/test_application_specific_information.py b/kmip/tests/unit/core/attributes/test_application_specific_information.py index 7ff88d2..f6e6794 100644 --- a/kmip/tests/unit/core/attributes/test_application_specific_information.py +++ b/kmip/tests/unit/core/attributes/test_application_specific_information.py @@ -278,7 +278,7 @@ class TestApplicationSpecificInformation(testtools.TestCase): ("application_data", "www.example.com") ] value = "{}".format( - ", ".join(['"{}": {}'.format(arg[0], arg[1]) for arg in args]) + ", ".join(['"{}": "{}"'.format(arg[0], arg[1]) for arg in args]) ) self.assertEqual( "{" + value + "}", diff --git a/kmip/tests/unit/core/factories/test_attribute_values.py b/kmip/tests/unit/core/factories/test_attribute_values.py index 4ef6618..24cc228 100644 --- a/kmip/tests/unit/core/factories/test_attribute_values.py +++ b/kmip/tests/unit/core/factories/test_attribute_values.py @@ -418,7 +418,55 @@ class TestAttributeValueFactory(testtools.TestCase): """ Test that an ApplicationSpecificInformation attribute can be created. """ - self.skipTest('') + attribute = self.factory.create_attribute_value( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertEqual("ssl", attribute.application_namespace) + self.assertEqual("www.example.com", attribute.application_data) + + attribute = self.factory.create_attribute_value( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + None + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertIsNone(attribute.application_namespace) + self.assertIsNone(attribute.application_data) + + attribute = self.factory.create_attribute_value_by_enum( + enums.Tags.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertEqual("ssl", attribute.application_namespace) + self.assertEqual("www.example.com", attribute.application_data) + + attribute = self.factory.create_attribute_value_by_enum( + enums.Tags.APPLICATION_SPECIFIC_INFORMATION, + None + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertIsNone(attribute.application_namespace) + self.assertIsNone(attribute.application_data) def test_create_contact_information(self): """ diff --git a/kmip/tests/unit/pie/objects/test_application_specific_information.py b/kmip/tests/unit/pie/objects/test_application_specific_information.py index ea7fb97..749b7a3 100644 --- a/kmip/tests/unit/pie/objects/test_application_specific_information.py +++ b/kmip/tests/unit/pie/objects/test_application_specific_information.py @@ -247,11 +247,15 @@ class TestApplicationSpecificInformation(testtools.TestCase): session.add(app_specific_info) session.commit() + # Grab the ID now before making a new session to avoid a Detached error + # See http://sqlalche.me/e/bhk3 for more info. + app_specific_info_id = app_specific_info.id + session = sqlalchemy.orm.sessionmaker(bind=engine)() retrieved_info = session.query( objects.ApplicationSpecificInformation ).filter( - objects.ApplicationSpecificInformation.id == app_specific_info.id + objects.ApplicationSpecificInformation.id == app_specific_info_id ).one() session.commit() diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 50f1e06..371272a 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -1602,7 +1602,7 @@ class TestKmipEngine(testtools.TestCase): symmetric_key, 'Application Specific Information' ) - self.assertEqual(None, result) + self.assertEqual([], result) result = e._get_attribute_from_managed_object( symmetric_key, @@ -2423,6 +2423,13 @@ class TestKmipEngine(testtools.TestCase): attribute_factory.create_attribute( enums.AttributeType.OPERATION_POLICY_NAME, 'test' + ), + attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } ) ] ) @@ -2473,6 +2480,15 @@ class TestKmipEngine(testtools.TestCase): self.assertEqual('test', symmetric_key.operation_policy_name) self.assertIsNotNone(symmetric_key.initial_date) self.assertNotEqual(0, symmetric_key.initial_date) + self.assertEqual(1, len(symmetric_key.app_specific_info)) + self.assertEqual( + "ssl", + symmetric_key.app_specific_info[0].application_namespace + ) + self.assertEqual( + "www.example.com", + symmetric_key.app_specific_info[0].application_data + ) self.assertEqual(uid, e._id_placeholder) @@ -5599,6 +5615,126 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_application_specific_information(self): + """ + Test the Locate operation when the 'Application Specific Information' + attribute is given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + app_specific_info_a = pie_objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + app_specific_info_b = pie_objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.test.com" + ) + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) + obj_a.app_specific_info.append(app_specific_info_a) + obj_a.app_specific_info.append(app_specific_info_b) + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + obj_b.app_specific_info.append(app_specific_info_a) + obj_c = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.add(obj_c) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the symmetric key objects based on their shared application + # specific information attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified application specific " + "information ('ssl', 'www.example.com') does not match any " + "of the object's associated application " + "specific information attributes." + ) + self.assertEqual(2, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) + + # Locate a single symmetric key object based on its unique application + # specific information attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.test.com" + } + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified application specific " + "information ('ssl', 'www.test.com') does not match any " + "of the object's associated application " + "specific information attributes." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + def test_get(self): """ Test that a Get request can be processed correctly.