From 44d55f25507a20230235fa0800faf13e21b40223 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 16 Aug 2019 16:53:15 -0400 Subject: [PATCH] Add CertificateType filtering support for Locate This change updates Locate operation support in the PyKMIP server, allowing users to filter objects based on the object's Certificate Type. Unit tests and integration tests have been added to test and verify the correctness of this feature. Additionally, the Locate demo scripts have also been updated to support Certificate Type filtering. Simply use the "--certificate-type" flag to specify a Certificate Type enumeration values for the Locate script to filter on. --- kmip/core/factories/attribute_values.py | 6 +- kmip/demos/pie/locate.py | 21 ++++ kmip/demos/units/locate.py | 22 ++++ kmip/demos/utils.py | 8 ++ kmip/services/server/engine.py | 14 +++ .../integration/services/test_integration.py | 117 ++++++++++++++++++ .../services/test_proxykmipclient.py | 95 ++++++++++++++ .../core/factories/test_attribute_values.py | 11 +- .../tests/unit/services/server/test_engine.py | 87 +++++++++++++ 9 files changed, 376 insertions(+), 5 deletions(-) diff --git a/kmip/core/factories/attribute_values.py b/kmip/core/factories/attribute_values.py index 59bb278..e394e48 100644 --- a/kmip/core/factories/attribute_values.py +++ b/kmip/core/factories/attribute_values.py @@ -38,7 +38,11 @@ class AttributeValueFactory(object): elif name is enums.AttributeType.CRYPTOGRAPHIC_DOMAIN_PARAMETERS: raise NotImplementedError() elif name is enums.AttributeType.CERTIFICATE_TYPE: - raise NotImplementedError() + return primitives.Enumeration( + enums.CertificateType, + value=value, + tag=enums.Tags.CERTIFICATE_TYPE + ) elif name is enums.AttributeType.CERTIFICATE_LENGTH: return primitives.Integer(value, enums.Tags.CERTIFICATE_LENGTH) elif name is enums.AttributeType.X_509_CERTIFICATE_IDENTIFIER: diff --git a/kmip/demos/pie/locate.py b/kmip/demos/pie/locate.py index ed6bedb..549056e 100644 --- a/kmip/demos/pie/locate.py +++ b/kmip/demos/pie/locate.py @@ -38,6 +38,7 @@ if __name__ == '__main__': initial_dates = opts.initial_dates state = opts.state object_type = opts.object_type + certificate_type = opts.certificate_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length cryptographic_usage_masks = opts.cryptographic_usage_masks @@ -171,6 +172,26 @@ if __name__ == '__main__': masks ) ) + if certificate_type: + certificate_type = getattr( + enums.CertificateType, + certificate_type, + None + ) + if certificate_type: + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + certificate_type + ) + ) + else: + logger.error( + "Invalid certificate type provided: {}".format( + opts.certificate_type + ) + ) + sys.exit(-8) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/units/locate.py b/kmip/demos/units/locate.py index 87645dd..92460e3 100644 --- a/kmip/demos/units/locate.py +++ b/kmip/demos/units/locate.py @@ -41,6 +41,7 @@ if __name__ == '__main__': initial_dates = opts.initial_dates state = opts.state object_type = opts.object_type + certificate_type = opts.certificate_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length cryptographic_usage_masks = opts.cryptographic_usage_masks @@ -198,6 +199,27 @@ if __name__ == '__main__': masks ) ) + if certificate_type: + certificate_type = getattr( + enums.CertificateType, + certificate_type, + None + ) + if certificate_type: + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + certificate_type + ) + ) + else: + logger.error( + "Invalid certificate type provided: {}".format( + opts.certificate_type + ) + ) + client.close() + sys.exit(-8) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index 5fc50f8..757be31 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -287,6 +287,14 @@ def build_cli_parser(operation=None): "(e.g., CERTIFICATE, SYMMETRIC_KEY)" ) ) + parser.add_option( + "--certificate-type", + action="store", + type="str", + default=None, + dest="certificate_type", + help="The certificate type of the secret (e.g., X_509)" + ) parser.add_option( "--cryptographic-algorithm", action="store", diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index fe064c9..4fd3f5c 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1762,6 +1762,20 @@ class KmipEngine(object): break if not add_object: break + elif name == "Certificate Type": + value = value.value + if value != attribute: + self._logger.debug( + "Failed match: " + "the specified certificate type ({}) " + "does not match the object's certificate " + "type ({}).".format( + value.name, + attribute.name + ) + ) + add_object = False + break elif name == enums.AttributeType.INITIAL_DATE.value: initial_date["value"] = attribute self._track_date_attributes( diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index 3856a49..a51912c 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -1199,6 +1199,123 @@ class TestIntegration(testtools.TestCase): self.assertEqual( ResultStatus.OPERATION_FAILED, result.result_status.value) + def test_certificate_register_locate_destroy(self): + """ + Test that newly registered certificates can be located based on their + attributes. + """ + label = "Integration Test - Register-Locate-Destroy Certificate" + value = ( + b'\x30\x82\x03\x12\x30\x82\x01\xFA\xA0\x03\x02\x01\x02\x02\x01\x01' + b'\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05\x00\x30' + b'\x3B\x31\x0B\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D' + b'\x30\x0B\x06\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30' + b'\x0C\x06\x03\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30' + b'\x0B\x06\x03\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x1E\x17\x0D' + b'\x31\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x17\x0D\x32' + b'\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x30\x3B\x31\x0B' + b'\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D\x30\x0B\x06' + b'\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30\x0C\x06\x03' + b'\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30\x0B\x06\x03' + b'\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x82\x01\x22\x30\x0D\x06' + b'\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01\x05\x00\x03\x82\x01\x0F' + b'\x00\x30\x82\x01\x0A\x02\x82\x01\x01\x00\xAB\x7F\x16\x1C\x00\x42' + b'\x49\x6C\xCD\x6C\x6D\x4D\xAD\xB9\x19\x97\x34\x35\x35\x77\x76\x00' + b'\x3A\xCF\x54\xB7\xAF\x1E\x44\x0A\xFB\x80\xB6\x4A\x87\x55\xF8\x00' + b'\x2C\xFE\xBA\x6B\x18\x45\x40\xA2\xD6\x60\x86\xD7\x46\x48\x34\x6D' + b'\x75\xB8\xD7\x18\x12\xB2\x05\x38\x7C\x0F\x65\x83\xBC\x4D\x7D\xC7' + b'\xEC\x11\x4F\x3B\x17\x6B\x79\x57\xC4\x22\xE7\xD0\x3F\xC6\x26\x7F' + b'\xA2\xA6\xF8\x9B\x9B\xEE\x9E\x60\xA1\xD7\xC2\xD8\x33\xE5\xA5\xF4' + b'\xBB\x0B\x14\x34\xF4\xE7\x95\xA4\x11\x00\xF8\xAA\x21\x49\x00\xDF' + b'\x8B\x65\x08\x9F\x98\x13\x5B\x1C\x67\xB7\x01\x67\x5A\xBD\xBC\x7D' + b'\x57\x21\xAA\xC9\xD1\x4A\x7F\x08\x1F\xCE\xC8\x0B\x64\xE8\xA0\xEC' + b'\xC8\x29\x53\x53\xC7\x95\x32\x8A\xBF\x70\xE1\xB4\x2E\x7B\xB8\xB7' + b'\xF4\xE8\xAC\x8C\x81\x0C\xDB\x66\xE3\xD2\x11\x26\xEB\xA8\xDA\x7D' + b'\x0C\xA3\x41\x42\xCB\x76\xF9\x1F\x01\x3D\xA8\x09\xE9\xC1\xB7\xAE' + b'\x64\xC5\x41\x30\xFB\xC2\x1D\x80\xE9\xC2\xCB\x06\xC5\xC8\xD7\xCC' + b'\xE8\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73' + b'\xA1\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC' + b'\x41\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01\xA3' + b'\x21\x30\x1F\x30\x1D\x06\x03\x55\x1D\x0E\x04\x16\x04\x14\x04\xE5' + b'\x7B\xD2\xC4\x31\xB2\xE8\x16\xE1\x80\xA1\x98\x23\xFA\xC8\x58\x27' + b'\x3F\x6B\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05' + b'\x00\x03\x82\x01\x01\x00\xA8\x76\xAD\xBC\x6C\x8E\x0F\xF0\x17\x21' + b'\x6E\x19\x5F\xEA\x76\xBF\xF6\x1A\x56\x7C\x9A\x13\xDC\x50\xD1\x3F' + b'\xEC\x12\xA4\x27\x3C\x44\x15\x47\xCF\xAB\xCB\x5D\x61\xD9\x91\xE9' + b'\x66\x31\x9D\xF7\x2C\x0D\x41\xBA\x82\x6A\x45\x11\x2F\xF2\x60\x89' + b'\xA2\x34\x4F\x4D\x71\xCF\x7C\x92\x1B\x4B\xDF\xAE\xF1\x60\x0D\x1B' + b'\xAA\xA1\x53\x36\x05\x7E\x01\x4B\x8B\x49\x6D\x4F\xAE\x9E\x8A\x6C' + b'\x1D\xA9\xAE\xB6\xCB\xC9\x60\xCB\xF2\xFA\xE7\x7F\x58\x7E\xC4\xBB' + b'\x28\x20\x45\x33\x88\x45\xB8\x8D\xD9\xAE\xEA\x53\xE4\x82\xA3\x6E' + b'\x73\x4E\x4F\x5F\x03\xB9\xD0\xDF\xC4\xCA\xFC\x6B\xB3\x4E\xA9\x05' + b'\x3E\x52\xBD\x60\x9E\xE0\x1E\x86\xD9\xB0\x9F\xB5\x11\x20\xC1\x98' + b'\x34\xA9\x97\xB0\x9C\xE0\x8D\x79\xE8\x13\x11\x76\x2F\x97\x4B\xB1' + b'\xC8\xC0\x91\x86\xC4\xD7\x89\x33\xE0\xDB\x38\xE9\x05\x08\x48\x77' + b'\xE1\x47\xC7\x8A\xF5\x2F\xAE\x07\x19\x2F\xF1\x66\xD1\x9F\xA9\x4A' + b'\x11\xCC\x11\xB2\x7E\xD0\x50\xF7\xA2\x7F\xAE\x13\xB2\x05\xA5\x74' + b'\xC4\xEE\x00\xAA\x8B\xD6\x5D\x0D\x70\x57\xC9\x85\xC8\x39\xEF\x33' + b'\x6A\x44\x1E\xD5\x3A\x53\xC6\xB6\xB6\x96\xF1\xBD\xEB\x5F\x7E\xA8' + b'\x11\xEB\xB2\x5A\x7F\x86') + name = self.attr_factory.create_attribute( + enums.AttributeType.NAME, + label + ) + usage_mask = self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.VERIFY + ] + ) + template_attribute = TemplateAttribute(attributes=[usage_mask, name]) + + certificate = Certificate(enums.CertificateType.X_509, value) + result = self.client.register( + enums.ObjectType.CERTIFICATE, + template_attribute, + certificate + ) + uid_a = result.uuid + + self.assertEqual( + enums.ResultStatus.SUCCESS, + result.result_status.value + ) + self.assertIsInstance(uid_a, str) + + # Test locating the certificate by its "Certificate Type" value. + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.X_509 + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.uuids)) + self.assertEqual(uid_a, result.uuids[0]) + + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.PGP + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(0, len(result.uuids)) + + # Clean up certificate + result = self.client.destroy(uid_a) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + result = self.client.get(uuid=result.uuid.value, credential=None) + self.assertEqual( + ResultStatus.OPERATION_FAILED, + result.result_status.value + ) + def test_symmetric_key_create_getattributes_locate_destroy(self): """ Test that newly created keys can be located based on their attributes. diff --git a/kmip/tests/integration/services/test_proxykmipclient.py b/kmip/tests/integration/services/test_proxykmipclient.py index a4ee6bf..1df64a8 100644 --- a/kmip/tests/integration/services/test_proxykmipclient.py +++ b/kmip/tests/integration/services/test_proxykmipclient.py @@ -868,6 +868,101 @@ class TestProxyKmipClientIntegration(testtools.TestCase): self.client.destroy(public_key_id) self.client.destroy(private_key_id) + def test_certificate_register_locate_destroy(self): + """ + Test that newly registered certificates can be located based on their + attributes. + """ + label = "Integration Test - Register-Locate-Destroy Certificate" + value = ( + b'\x30\x82\x03\x12\x30\x82\x01\xFA\xA0\x03\x02\x01\x02\x02\x01\x01' + b'\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05\x00\x30' + b'\x3B\x31\x0B\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D' + b'\x30\x0B\x06\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30' + b'\x0C\x06\x03\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30' + b'\x0B\x06\x03\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x1E\x17\x0D' + b'\x31\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x17\x0D\x32' + b'\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x30\x3B\x31\x0B' + b'\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D\x30\x0B\x06' + b'\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30\x0C\x06\x03' + b'\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30\x0B\x06\x03' + b'\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x82\x01\x22\x30\x0D\x06' + b'\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01\x05\x00\x03\x82\x01\x0F' + b'\x00\x30\x82\x01\x0A\x02\x82\x01\x01\x00\xAB\x7F\x16\x1C\x00\x42' + b'\x49\x6C\xCD\x6C\x6D\x4D\xAD\xB9\x19\x97\x34\x35\x35\x77\x76\x00' + b'\x3A\xCF\x54\xB7\xAF\x1E\x44\x0A\xFB\x80\xB6\x4A\x87\x55\xF8\x00' + b'\x2C\xFE\xBA\x6B\x18\x45\x40\xA2\xD6\x60\x86\xD7\x46\x48\x34\x6D' + b'\x75\xB8\xD7\x18\x12\xB2\x05\x38\x7C\x0F\x65\x83\xBC\x4D\x7D\xC7' + b'\xEC\x11\x4F\x3B\x17\x6B\x79\x57\xC4\x22\xE7\xD0\x3F\xC6\x26\x7F' + b'\xA2\xA6\xF8\x9B\x9B\xEE\x9E\x60\xA1\xD7\xC2\xD8\x33\xE5\xA5\xF4' + b'\xBB\x0B\x14\x34\xF4\xE7\x95\xA4\x11\x00\xF8\xAA\x21\x49\x00\xDF' + b'\x8B\x65\x08\x9F\x98\x13\x5B\x1C\x67\xB7\x01\x67\x5A\xBD\xBC\x7D' + b'\x57\x21\xAA\xC9\xD1\x4A\x7F\x08\x1F\xCE\xC8\x0B\x64\xE8\xA0\xEC' + b'\xC8\x29\x53\x53\xC7\x95\x32\x8A\xBF\x70\xE1\xB4\x2E\x7B\xB8\xB7' + b'\xF4\xE8\xAC\x8C\x81\x0C\xDB\x66\xE3\xD2\x11\x26\xEB\xA8\xDA\x7D' + b'\x0C\xA3\x41\x42\xCB\x76\xF9\x1F\x01\x3D\xA8\x09\xE9\xC1\xB7\xAE' + b'\x64\xC5\x41\x30\xFB\xC2\x1D\x80\xE9\xC2\xCB\x06\xC5\xC8\xD7\xCC' + b'\xE8\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73' + b'\xA1\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC' + b'\x41\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01\xA3' + b'\x21\x30\x1F\x30\x1D\x06\x03\x55\x1D\x0E\x04\x16\x04\x14\x04\xE5' + b'\x7B\xD2\xC4\x31\xB2\xE8\x16\xE1\x80\xA1\x98\x23\xFA\xC8\x58\x27' + b'\x3F\x6B\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05' + b'\x00\x03\x82\x01\x01\x00\xA8\x76\xAD\xBC\x6C\x8E\x0F\xF0\x17\x21' + b'\x6E\x19\x5F\xEA\x76\xBF\xF6\x1A\x56\x7C\x9A\x13\xDC\x50\xD1\x3F' + b'\xEC\x12\xA4\x27\x3C\x44\x15\x47\xCF\xAB\xCB\x5D\x61\xD9\x91\xE9' + b'\x66\x31\x9D\xF7\x2C\x0D\x41\xBA\x82\x6A\x45\x11\x2F\xF2\x60\x89' + b'\xA2\x34\x4F\x4D\x71\xCF\x7C\x92\x1B\x4B\xDF\xAE\xF1\x60\x0D\x1B' + b'\xAA\xA1\x53\x36\x05\x7E\x01\x4B\x8B\x49\x6D\x4F\xAE\x9E\x8A\x6C' + b'\x1D\xA9\xAE\xB6\xCB\xC9\x60\xCB\xF2\xFA\xE7\x7F\x58\x7E\xC4\xBB' + b'\x28\x20\x45\x33\x88\x45\xB8\x8D\xD9\xAE\xEA\x53\xE4\x82\xA3\x6E' + b'\x73\x4E\x4F\x5F\x03\xB9\xD0\xDF\xC4\xCA\xFC\x6B\xB3\x4E\xA9\x05' + b'\x3E\x52\xBD\x60\x9E\xE0\x1E\x86\xD9\xB0\x9F\xB5\x11\x20\xC1\x98' + b'\x34\xA9\x97\xB0\x9C\xE0\x8D\x79\xE8\x13\x11\x76\x2F\x97\x4B\xB1' + b'\xC8\xC0\x91\x86\xC4\xD7\x89\x33\xE0\xDB\x38\xE9\x05\x08\x48\x77' + b'\xE1\x47\xC7\x8A\xF5\x2F\xAE\x07\x19\x2F\xF1\x66\xD1\x9F\xA9\x4A' + b'\x11\xCC\x11\xB2\x7E\xD0\x50\xF7\xA2\x7F\xAE\x13\xB2\x05\xA5\x74' + b'\xC4\xEE\x00\xAA\x8B\xD6\x5D\x0D\x70\x57\xC9\x85\xC8\x39\xEF\x33' + b'\x6A\x44\x1E\xD5\x3A\x53\xC6\xB6\xB6\x96\xF1\xBD\xEB\x5F\x7E\xA8' + b'\x11\xEB\xB2\x5A\x7F\x86') + usage_mask = [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.VERIFY + ] + + certificate = objects.Certificate( + enums.CertificateType.X_509, + value, + masks=usage_mask, + name=label + ) + a_id = self.client.register(certificate) + + # Test locating the certificate by its "Certificate Type" value. + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.X_509 + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertEqual(a_id, result[0]) + + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.PGP + ) + ] + ) + self.assertEqual(0, len(result)) + + # Clean up the certificate + self.client.destroy(a_id) + def test_create_getattributes_locate_destroy(self): """ Test that the ProxyKmipClient can create symmetric keys and then diff --git a/kmip/tests/unit/core/factories/test_attribute_values.py b/kmip/tests/unit/core/factories/test_attribute_values.py index be1ce7e..4ef6618 100644 --- a/kmip/tests/unit/core/factories/test_attribute_values.py +++ b/kmip/tests/unit/core/factories/test_attribute_values.py @@ -156,10 +156,13 @@ class TestAttributeValueFactory(testtools.TestCase): """ Test that a CertificateType attribute can be created. """ - kwargs = {'name': enums.AttributeType.CERTIFICATE_TYPE, - 'value': None} - self.assertRaises( - NotImplementedError, self.factory.create_attribute_value, **kwargs) + certificate_type = self.factory.create_attribute_value( + name=enums.AttributeType.CERTIFICATE_TYPE, + value=enums.CertificateType.X_509 + ) + self.assertIsInstance(certificate_type, primitives.Enumeration) + self.assertEqual(enums.CertificateType.X_509, certificate_type.value) + self.assertEqual(enums.Tags.CERTIFICATE_TYPE, certificate_type.tag) def test_create_certificate_length(self): """ diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 1702ee0..148b2af 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -5286,6 +5286,93 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_certificate_type(self): + """ + Test the Locate operation when the 'Certificate Type' attribute is + given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + obj_a = pie_objects.Certificate( + enums.CertificateType.X_509, + b'', + name='certificate1' + ) + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the certificate object based on its certificate type. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.X_509 + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified attribute (Certificate Type) is not " + "applicable for the object's object type (SECRET_DATA)." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + + # Try to locate a non-existent object based on its certificate + # type. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.PGP + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Failed match: " + "the specified certificate type (PGP) does not match " + "the object's certificate type (X_509)." + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified attribute (Certificate Type) is not " + "applicable for the object's object type (SECRET_DATA)." + ) + self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_unique_identifier(self): """ Test the Locate operation when the 'Unique Identifier' attribute