diff --git a/kmip/core/attributes.py b/kmip/core/attributes.py index 507c139..9bfe313 100644 --- a/kmip/core/attributes.py +++ b/kmip/core/attributes.py @@ -1248,10 +1248,10 @@ class ApplicationSpecificInformation(primitives.Struct): def __str__(self): value = ", ".join( [ - '"application_namespace": {}'.format( + '"application_namespace": "{}"'.format( self.application_namespace ), - '"application_data": {}'.format(self.application_data) + '"application_data": "{}"'.format(self.application_data) ] ) return "{" + value + "}" diff --git a/kmip/core/factories/attribute_values.py b/kmip/core/factories/attribute_values.py index e394e48..cddab89 100644 --- a/kmip/core/factories/attribute_values.py +++ b/kmip/core/factories/attribute_values.py @@ -277,28 +277,13 @@ class AttributeValueFactory(object): return attributes.ObjectGroup(group) def _create_application_specific_information(self, info): - if info is None: - return attributes.ApplicationSpecificInformation() + if info: + return attributes.ApplicationSpecificInformation( + application_namespace=info.get("application_namespace"), + application_data=info.get("application_data") + ) else: - application_namespace = info.get('application_namespace') - application_data = info.get('application_data') - - if not isinstance(application_namespace, str): - msg = utils.build_er_error( - attributes.ApplicationSpecificInformation, - 'constructor argument type', - str, type(application_namespace)) - raise TypeError(msg) - - if not isinstance(application_data, str): - msg = utils.build_er_error( - attributes.ApplicationSpecificInformation, - 'constructor argument type', - str, type(application_data)) - raise TypeError(msg) - - return attributes.ApplicationSpecificInformation.create( - application_namespace, application_data) + return attributes.ApplicationSpecificInformation() def _create_contact_information(self, info): if info is None: diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index d410fc5..cc9e501 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -730,8 +730,16 @@ class KmipEngine(object): return None elif attr_name == 'Link': return None - elif attr_name == 'Application Specific Information': - return None + elif attr_name == "Application Specific Information": + values = [] + for info in managed_object.app_specific_info: + values.append( + { + "application_namespace": info.application_namespace, + "application_data": info.application_data + } + ) + return values elif attr_name == 'Contact Information': return None elif attr_name == 'Last Change Date': @@ -781,6 +789,14 @@ class KmipEngine(object): raise exceptions.InvalidField( "Cannot set duplicate name values." ) + elif attribute_name == "Application Specific Information": + for value in attribute_value: + managed_object.app_specific_info.append( + objects.ApplicationSpecificInformation( + application_namespace=value.application_namespace, + application_data=value.application_data + ) + ) else: # TODO (peterhamilton) Remove when all attributes are supported raise exceptions.InvalidField( @@ -1662,6 +1678,26 @@ class KmipEngine(object): ) if attribute is None: continue + elif name == "Application Specific Information": + application_namespace = value.application_namespace + application_data = value.application_data + v = { + "application_namespace": application_namespace, + "application_data": application_data + } + if v not in attribute: + self._logger.debug( + "Failed match: " + "the specified application specific " + "information ('{}', '{}') does not match any " + "of the object's associated application " + "specific information attributes.".format( + v.get("application_namespace"), + v.get("application_data") + ) + ) + add_object = False + break elif name == "Name": if value not in attribute: self._logger.debug( diff --git a/kmip/tests/unit/core/attributes/test_application_specific_information.py b/kmip/tests/unit/core/attributes/test_application_specific_information.py index 7ff88d2..f6e6794 100644 --- a/kmip/tests/unit/core/attributes/test_application_specific_information.py +++ b/kmip/tests/unit/core/attributes/test_application_specific_information.py @@ -278,7 +278,7 @@ class TestApplicationSpecificInformation(testtools.TestCase): ("application_data", "www.example.com") ] value = "{}".format( - ", ".join(['"{}": {}'.format(arg[0], arg[1]) for arg in args]) + ", ".join(['"{}": "{}"'.format(arg[0], arg[1]) for arg in args]) ) self.assertEqual( "{" + value + "}", diff --git a/kmip/tests/unit/core/factories/test_attribute_values.py b/kmip/tests/unit/core/factories/test_attribute_values.py index 4ef6618..24cc228 100644 --- a/kmip/tests/unit/core/factories/test_attribute_values.py +++ b/kmip/tests/unit/core/factories/test_attribute_values.py @@ -418,7 +418,55 @@ class TestAttributeValueFactory(testtools.TestCase): """ Test that an ApplicationSpecificInformation attribute can be created. """ - self.skipTest('') + attribute = self.factory.create_attribute_value( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertEqual("ssl", attribute.application_namespace) + self.assertEqual("www.example.com", attribute.application_data) + + attribute = self.factory.create_attribute_value( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + None + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertIsNone(attribute.application_namespace) + self.assertIsNone(attribute.application_data) + + attribute = self.factory.create_attribute_value_by_enum( + enums.Tags.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertEqual("ssl", attribute.application_namespace) + self.assertEqual("www.example.com", attribute.application_data) + + attribute = self.factory.create_attribute_value_by_enum( + enums.Tags.APPLICATION_SPECIFIC_INFORMATION, + None + ) + self.assertIsInstance( + attribute, + attributes.ApplicationSpecificInformation + ) + self.assertIsNone(attribute.application_namespace) + self.assertIsNone(attribute.application_data) def test_create_contact_information(self): """ diff --git a/kmip/tests/unit/pie/objects/test_application_specific_information.py b/kmip/tests/unit/pie/objects/test_application_specific_information.py index ea7fb97..749b7a3 100644 --- a/kmip/tests/unit/pie/objects/test_application_specific_information.py +++ b/kmip/tests/unit/pie/objects/test_application_specific_information.py @@ -247,11 +247,15 @@ class TestApplicationSpecificInformation(testtools.TestCase): session.add(app_specific_info) session.commit() + # Grab the ID now before making a new session to avoid a Detached error + # See http://sqlalche.me/e/bhk3 for more info. + app_specific_info_id = app_specific_info.id + session = sqlalchemy.orm.sessionmaker(bind=engine)() retrieved_info = session.query( objects.ApplicationSpecificInformation ).filter( - objects.ApplicationSpecificInformation.id == app_specific_info.id + objects.ApplicationSpecificInformation.id == app_specific_info_id ).one() session.commit() diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 50f1e06..371272a 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -1602,7 +1602,7 @@ class TestKmipEngine(testtools.TestCase): symmetric_key, 'Application Specific Information' ) - self.assertEqual(None, result) + self.assertEqual([], result) result = e._get_attribute_from_managed_object( symmetric_key, @@ -2423,6 +2423,13 @@ class TestKmipEngine(testtools.TestCase): attribute_factory.create_attribute( enums.AttributeType.OPERATION_POLICY_NAME, 'test' + ), + attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } ) ] ) @@ -2473,6 +2480,15 @@ class TestKmipEngine(testtools.TestCase): self.assertEqual('test', symmetric_key.operation_policy_name) self.assertIsNotNone(symmetric_key.initial_date) self.assertNotEqual(0, symmetric_key.initial_date) + self.assertEqual(1, len(symmetric_key.app_specific_info)) + self.assertEqual( + "ssl", + symmetric_key.app_specific_info[0].application_namespace + ) + self.assertEqual( + "www.example.com", + symmetric_key.app_specific_info[0].application_data + ) self.assertEqual(uid, e._id_placeholder) @@ -5599,6 +5615,126 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_application_specific_information(self): + """ + Test the Locate operation when the 'Application Specific Information' + attribute is given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + app_specific_info_a = pie_objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + app_specific_info_b = pie_objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.test.com" + ) + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) + obj_a.app_specific_info.append(app_specific_info_a) + obj_a.app_specific_info.append(app_specific_info_b) + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + obj_b.app_specific_info.append(app_specific_info_a) + obj_c = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.add(obj_c) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the symmetric key objects based on their shared application + # specific information attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified application specific " + "information ('ssl', 'www.example.com') does not match any " + "of the object's associated application " + "specific information attributes." + ) + self.assertEqual(2, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) + + # Locate a single symmetric key object based on its unique application + # specific information attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "ssl", + "application_data": "www.test.com" + } + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified application specific " + "information ('ssl', 'www.test.com') does not match any " + "of the object's associated application " + "specific information attributes." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + def test_get(self): """ Test that a Get request can be processed correctly.