diff --git a/kmip/core/factories/attribute_values.py b/kmip/core/factories/attribute_values.py index 59bb278..e394e48 100644 --- a/kmip/core/factories/attribute_values.py +++ b/kmip/core/factories/attribute_values.py @@ -38,7 +38,11 @@ class AttributeValueFactory(object): elif name is enums.AttributeType.CRYPTOGRAPHIC_DOMAIN_PARAMETERS: raise NotImplementedError() elif name is enums.AttributeType.CERTIFICATE_TYPE: - raise NotImplementedError() + return primitives.Enumeration( + enums.CertificateType, + value=value, + tag=enums.Tags.CERTIFICATE_TYPE + ) elif name is enums.AttributeType.CERTIFICATE_LENGTH: return primitives.Integer(value, enums.Tags.CERTIFICATE_LENGTH) elif name is enums.AttributeType.X_509_CERTIFICATE_IDENTIFIER: diff --git a/kmip/demos/pie/locate.py b/kmip/demos/pie/locate.py index ed6bedb..549056e 100644 --- a/kmip/demos/pie/locate.py +++ b/kmip/demos/pie/locate.py @@ -38,6 +38,7 @@ if __name__ == '__main__': initial_dates = opts.initial_dates state = opts.state object_type = opts.object_type + certificate_type = opts.certificate_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length cryptographic_usage_masks = opts.cryptographic_usage_masks @@ -171,6 +172,26 @@ if __name__ == '__main__': masks ) ) + if certificate_type: + certificate_type = getattr( + enums.CertificateType, + certificate_type, + None + ) + if certificate_type: + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + certificate_type + ) + ) + else: + logger.error( + "Invalid certificate type provided: {}".format( + opts.certificate_type + ) + ) + sys.exit(-8) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/units/locate.py b/kmip/demos/units/locate.py index 87645dd..92460e3 100644 --- a/kmip/demos/units/locate.py +++ b/kmip/demos/units/locate.py @@ -41,6 +41,7 @@ if __name__ == '__main__': initial_dates = opts.initial_dates state = opts.state object_type = opts.object_type + certificate_type = opts.certificate_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length cryptographic_usage_masks = opts.cryptographic_usage_masks @@ -198,6 +199,27 @@ if __name__ == '__main__': masks ) ) + if certificate_type: + certificate_type = getattr( + enums.CertificateType, + certificate_type, + None + ) + if certificate_type: + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + certificate_type + ) + ) + else: + logger.error( + "Invalid certificate type provided: {}".format( + opts.certificate_type + ) + ) + client.close() + sys.exit(-8) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index 5fc50f8..757be31 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -287,6 +287,14 @@ def build_cli_parser(operation=None): "(e.g., CERTIFICATE, SYMMETRIC_KEY)" ) ) + parser.add_option( + "--certificate-type", + action="store", + type="str", + default=None, + dest="certificate_type", + help="The certificate type of the secret (e.g., X_509)" + ) parser.add_option( "--cryptographic-algorithm", action="store", diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index fe064c9..4fd3f5c 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1762,6 +1762,20 @@ class KmipEngine(object): break if not add_object: break + elif name == "Certificate Type": + value = value.value + if value != attribute: + self._logger.debug( + "Failed match: " + "the specified certificate type ({}) " + "does not match the object's certificate " + "type ({}).".format( + value.name, + attribute.name + ) + ) + add_object = False + break elif name == enums.AttributeType.INITIAL_DATE.value: initial_date["value"] = attribute self._track_date_attributes( diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index 3856a49..a51912c 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -1199,6 +1199,123 @@ class TestIntegration(testtools.TestCase): self.assertEqual( ResultStatus.OPERATION_FAILED, result.result_status.value) + def test_certificate_register_locate_destroy(self): + """ + Test that newly registered certificates can be located based on their + attributes. + """ + label = "Integration Test - Register-Locate-Destroy Certificate" + value = ( + b'\x30\x82\x03\x12\x30\x82\x01\xFA\xA0\x03\x02\x01\x02\x02\x01\x01' + b'\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05\x00\x30' + b'\x3B\x31\x0B\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D' + b'\x30\x0B\x06\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30' + b'\x0C\x06\x03\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30' + b'\x0B\x06\x03\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x1E\x17\x0D' + b'\x31\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x17\x0D\x32' + b'\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x30\x3B\x31\x0B' + b'\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D\x30\x0B\x06' + b'\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30\x0C\x06\x03' + b'\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30\x0B\x06\x03' + b'\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x82\x01\x22\x30\x0D\x06' + b'\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01\x05\x00\x03\x82\x01\x0F' + b'\x00\x30\x82\x01\x0A\x02\x82\x01\x01\x00\xAB\x7F\x16\x1C\x00\x42' + b'\x49\x6C\xCD\x6C\x6D\x4D\xAD\xB9\x19\x97\x34\x35\x35\x77\x76\x00' + b'\x3A\xCF\x54\xB7\xAF\x1E\x44\x0A\xFB\x80\xB6\x4A\x87\x55\xF8\x00' + b'\x2C\xFE\xBA\x6B\x18\x45\x40\xA2\xD6\x60\x86\xD7\x46\x48\x34\x6D' + b'\x75\xB8\xD7\x18\x12\xB2\x05\x38\x7C\x0F\x65\x83\xBC\x4D\x7D\xC7' + b'\xEC\x11\x4F\x3B\x17\x6B\x79\x57\xC4\x22\xE7\xD0\x3F\xC6\x26\x7F' + b'\xA2\xA6\xF8\x9B\x9B\xEE\x9E\x60\xA1\xD7\xC2\xD8\x33\xE5\xA5\xF4' + b'\xBB\x0B\x14\x34\xF4\xE7\x95\xA4\x11\x00\xF8\xAA\x21\x49\x00\xDF' + b'\x8B\x65\x08\x9F\x98\x13\x5B\x1C\x67\xB7\x01\x67\x5A\xBD\xBC\x7D' + b'\x57\x21\xAA\xC9\xD1\x4A\x7F\x08\x1F\xCE\xC8\x0B\x64\xE8\xA0\xEC' + b'\xC8\x29\x53\x53\xC7\x95\x32\x8A\xBF\x70\xE1\xB4\x2E\x7B\xB8\xB7' + b'\xF4\xE8\xAC\x8C\x81\x0C\xDB\x66\xE3\xD2\x11\x26\xEB\xA8\xDA\x7D' + b'\x0C\xA3\x41\x42\xCB\x76\xF9\x1F\x01\x3D\xA8\x09\xE9\xC1\xB7\xAE' + b'\x64\xC5\x41\x30\xFB\xC2\x1D\x80\xE9\xC2\xCB\x06\xC5\xC8\xD7\xCC' + b'\xE8\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73' + b'\xA1\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC' + b'\x41\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01\xA3' + b'\x21\x30\x1F\x30\x1D\x06\x03\x55\x1D\x0E\x04\x16\x04\x14\x04\xE5' + b'\x7B\xD2\xC4\x31\xB2\xE8\x16\xE1\x80\xA1\x98\x23\xFA\xC8\x58\x27' + b'\x3F\x6B\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05' + b'\x00\x03\x82\x01\x01\x00\xA8\x76\xAD\xBC\x6C\x8E\x0F\xF0\x17\x21' + b'\x6E\x19\x5F\xEA\x76\xBF\xF6\x1A\x56\x7C\x9A\x13\xDC\x50\xD1\x3F' + b'\xEC\x12\xA4\x27\x3C\x44\x15\x47\xCF\xAB\xCB\x5D\x61\xD9\x91\xE9' + b'\x66\x31\x9D\xF7\x2C\x0D\x41\xBA\x82\x6A\x45\x11\x2F\xF2\x60\x89' + b'\xA2\x34\x4F\x4D\x71\xCF\x7C\x92\x1B\x4B\xDF\xAE\xF1\x60\x0D\x1B' + b'\xAA\xA1\x53\x36\x05\x7E\x01\x4B\x8B\x49\x6D\x4F\xAE\x9E\x8A\x6C' + b'\x1D\xA9\xAE\xB6\xCB\xC9\x60\xCB\xF2\xFA\xE7\x7F\x58\x7E\xC4\xBB' + b'\x28\x20\x45\x33\x88\x45\xB8\x8D\xD9\xAE\xEA\x53\xE4\x82\xA3\x6E' + b'\x73\x4E\x4F\x5F\x03\xB9\xD0\xDF\xC4\xCA\xFC\x6B\xB3\x4E\xA9\x05' + b'\x3E\x52\xBD\x60\x9E\xE0\x1E\x86\xD9\xB0\x9F\xB5\x11\x20\xC1\x98' + b'\x34\xA9\x97\xB0\x9C\xE0\x8D\x79\xE8\x13\x11\x76\x2F\x97\x4B\xB1' + b'\xC8\xC0\x91\x86\xC4\xD7\x89\x33\xE0\xDB\x38\xE9\x05\x08\x48\x77' + b'\xE1\x47\xC7\x8A\xF5\x2F\xAE\x07\x19\x2F\xF1\x66\xD1\x9F\xA9\x4A' + b'\x11\xCC\x11\xB2\x7E\xD0\x50\xF7\xA2\x7F\xAE\x13\xB2\x05\xA5\x74' + b'\xC4\xEE\x00\xAA\x8B\xD6\x5D\x0D\x70\x57\xC9\x85\xC8\x39\xEF\x33' + b'\x6A\x44\x1E\xD5\x3A\x53\xC6\xB6\xB6\x96\xF1\xBD\xEB\x5F\x7E\xA8' + b'\x11\xEB\xB2\x5A\x7F\x86') + name = self.attr_factory.create_attribute( + enums.AttributeType.NAME, + label + ) + usage_mask = self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.VERIFY + ] + ) + template_attribute = TemplateAttribute(attributes=[usage_mask, name]) + + certificate = Certificate(enums.CertificateType.X_509, value) + result = self.client.register( + enums.ObjectType.CERTIFICATE, + template_attribute, + certificate + ) + uid_a = result.uuid + + self.assertEqual( + enums.ResultStatus.SUCCESS, + result.result_status.value + ) + self.assertIsInstance(uid_a, str) + + # Test locating the certificate by its "Certificate Type" value. + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.X_509 + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.uuids)) + self.assertEqual(uid_a, result.uuids[0]) + + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.PGP + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(0, len(result.uuids)) + + # Clean up certificate + result = self.client.destroy(uid_a) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + result = self.client.get(uuid=result.uuid.value, credential=None) + self.assertEqual( + ResultStatus.OPERATION_FAILED, + result.result_status.value + ) + def test_symmetric_key_create_getattributes_locate_destroy(self): """ Test that newly created keys can be located based on their attributes. diff --git a/kmip/tests/integration/services/test_proxykmipclient.py b/kmip/tests/integration/services/test_proxykmipclient.py index a4ee6bf..1df64a8 100644 --- a/kmip/tests/integration/services/test_proxykmipclient.py +++ b/kmip/tests/integration/services/test_proxykmipclient.py @@ -868,6 +868,101 @@ class TestProxyKmipClientIntegration(testtools.TestCase): self.client.destroy(public_key_id) self.client.destroy(private_key_id) + def test_certificate_register_locate_destroy(self): + """ + Test that newly registered certificates can be located based on their + attributes. + """ + label = "Integration Test - Register-Locate-Destroy Certificate" + value = ( + b'\x30\x82\x03\x12\x30\x82\x01\xFA\xA0\x03\x02\x01\x02\x02\x01\x01' + b'\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05\x00\x30' + b'\x3B\x31\x0B\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D' + b'\x30\x0B\x06\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30' + b'\x0C\x06\x03\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30' + b'\x0B\x06\x03\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x1E\x17\x0D' + b'\x31\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x17\x0D\x32' + b'\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x30\x3B\x31\x0B' + b'\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D\x30\x0B\x06' + b'\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30\x0C\x06\x03' + b'\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30\x0B\x06\x03' + b'\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x82\x01\x22\x30\x0D\x06' + b'\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01\x05\x00\x03\x82\x01\x0F' + b'\x00\x30\x82\x01\x0A\x02\x82\x01\x01\x00\xAB\x7F\x16\x1C\x00\x42' + b'\x49\x6C\xCD\x6C\x6D\x4D\xAD\xB9\x19\x97\x34\x35\x35\x77\x76\x00' + b'\x3A\xCF\x54\xB7\xAF\x1E\x44\x0A\xFB\x80\xB6\x4A\x87\x55\xF8\x00' + b'\x2C\xFE\xBA\x6B\x18\x45\x40\xA2\xD6\x60\x86\xD7\x46\x48\x34\x6D' + b'\x75\xB8\xD7\x18\x12\xB2\x05\x38\x7C\x0F\x65\x83\xBC\x4D\x7D\xC7' + b'\xEC\x11\x4F\x3B\x17\x6B\x79\x57\xC4\x22\xE7\xD0\x3F\xC6\x26\x7F' + b'\xA2\xA6\xF8\x9B\x9B\xEE\x9E\x60\xA1\xD7\xC2\xD8\x33\xE5\xA5\xF4' + b'\xBB\x0B\x14\x34\xF4\xE7\x95\xA4\x11\x00\xF8\xAA\x21\x49\x00\xDF' + b'\x8B\x65\x08\x9F\x98\x13\x5B\x1C\x67\xB7\x01\x67\x5A\xBD\xBC\x7D' + b'\x57\x21\xAA\xC9\xD1\x4A\x7F\x08\x1F\xCE\xC8\x0B\x64\xE8\xA0\xEC' + b'\xC8\x29\x53\x53\xC7\x95\x32\x8A\xBF\x70\xE1\xB4\x2E\x7B\xB8\xB7' + b'\xF4\xE8\xAC\x8C\x81\x0C\xDB\x66\xE3\xD2\x11\x26\xEB\xA8\xDA\x7D' + b'\x0C\xA3\x41\x42\xCB\x76\xF9\x1F\x01\x3D\xA8\x09\xE9\xC1\xB7\xAE' + b'\x64\xC5\x41\x30\xFB\xC2\x1D\x80\xE9\xC2\xCB\x06\xC5\xC8\xD7\xCC' + b'\xE8\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73' + b'\xA1\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC' + b'\x41\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01\xA3' + b'\x21\x30\x1F\x30\x1D\x06\x03\x55\x1D\x0E\x04\x16\x04\x14\x04\xE5' + b'\x7B\xD2\xC4\x31\xB2\xE8\x16\xE1\x80\xA1\x98\x23\xFA\xC8\x58\x27' + b'\x3F\x6B\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05' + b'\x00\x03\x82\x01\x01\x00\xA8\x76\xAD\xBC\x6C\x8E\x0F\xF0\x17\x21' + b'\x6E\x19\x5F\xEA\x76\xBF\xF6\x1A\x56\x7C\x9A\x13\xDC\x50\xD1\x3F' + b'\xEC\x12\xA4\x27\x3C\x44\x15\x47\xCF\xAB\xCB\x5D\x61\xD9\x91\xE9' + b'\x66\x31\x9D\xF7\x2C\x0D\x41\xBA\x82\x6A\x45\x11\x2F\xF2\x60\x89' + b'\xA2\x34\x4F\x4D\x71\xCF\x7C\x92\x1B\x4B\xDF\xAE\xF1\x60\x0D\x1B' + b'\xAA\xA1\x53\x36\x05\x7E\x01\x4B\x8B\x49\x6D\x4F\xAE\x9E\x8A\x6C' + b'\x1D\xA9\xAE\xB6\xCB\xC9\x60\xCB\xF2\xFA\xE7\x7F\x58\x7E\xC4\xBB' + b'\x28\x20\x45\x33\x88\x45\xB8\x8D\xD9\xAE\xEA\x53\xE4\x82\xA3\x6E' + b'\x73\x4E\x4F\x5F\x03\xB9\xD0\xDF\xC4\xCA\xFC\x6B\xB3\x4E\xA9\x05' + b'\x3E\x52\xBD\x60\x9E\xE0\x1E\x86\xD9\xB0\x9F\xB5\x11\x20\xC1\x98' + b'\x34\xA9\x97\xB0\x9C\xE0\x8D\x79\xE8\x13\x11\x76\x2F\x97\x4B\xB1' + b'\xC8\xC0\x91\x86\xC4\xD7\x89\x33\xE0\xDB\x38\xE9\x05\x08\x48\x77' + b'\xE1\x47\xC7\x8A\xF5\x2F\xAE\x07\x19\x2F\xF1\x66\xD1\x9F\xA9\x4A' + b'\x11\xCC\x11\xB2\x7E\xD0\x50\xF7\xA2\x7F\xAE\x13\xB2\x05\xA5\x74' + b'\xC4\xEE\x00\xAA\x8B\xD6\x5D\x0D\x70\x57\xC9\x85\xC8\x39\xEF\x33' + b'\x6A\x44\x1E\xD5\x3A\x53\xC6\xB6\xB6\x96\xF1\xBD\xEB\x5F\x7E\xA8' + b'\x11\xEB\xB2\x5A\x7F\x86') + usage_mask = [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.VERIFY + ] + + certificate = objects.Certificate( + enums.CertificateType.X_509, + value, + masks=usage_mask, + name=label + ) + a_id = self.client.register(certificate) + + # Test locating the certificate by its "Certificate Type" value. + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.X_509 + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertEqual(a_id, result[0]) + + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.PGP + ) + ] + ) + self.assertEqual(0, len(result)) + + # Clean up the certificate + self.client.destroy(a_id) + def test_create_getattributes_locate_destroy(self): """ Test that the ProxyKmipClient can create symmetric keys and then diff --git a/kmip/tests/unit/core/factories/test_attribute_values.py b/kmip/tests/unit/core/factories/test_attribute_values.py index be1ce7e..4ef6618 100644 --- a/kmip/tests/unit/core/factories/test_attribute_values.py +++ b/kmip/tests/unit/core/factories/test_attribute_values.py @@ -156,10 +156,13 @@ class TestAttributeValueFactory(testtools.TestCase): """ Test that a CertificateType attribute can be created. """ - kwargs = {'name': enums.AttributeType.CERTIFICATE_TYPE, - 'value': None} - self.assertRaises( - NotImplementedError, self.factory.create_attribute_value, **kwargs) + certificate_type = self.factory.create_attribute_value( + name=enums.AttributeType.CERTIFICATE_TYPE, + value=enums.CertificateType.X_509 + ) + self.assertIsInstance(certificate_type, primitives.Enumeration) + self.assertEqual(enums.CertificateType.X_509, certificate_type.value) + self.assertEqual(enums.Tags.CERTIFICATE_TYPE, certificate_type.tag) def test_create_certificate_length(self): """ diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 1702ee0..148b2af 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -5286,6 +5286,93 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_certificate_type(self): + """ + Test the Locate operation when the 'Certificate Type' attribute is + given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + obj_a = pie_objects.Certificate( + enums.CertificateType.X_509, + b'', + name='certificate1' + ) + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the certificate object based on its certificate type. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.X_509 + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified attribute (Certificate Type) is not " + "applicable for the object's object type (SECRET_DATA)." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + + # Try to locate a non-existent object based on its certificate + # type. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CERTIFICATE_TYPE, + enums.CertificateType.PGP + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Failed match: " + "the specified certificate type (PGP) does not match " + "the object's certificate type (X_509)." + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified attribute (Certificate Type) is not " + "applicable for the object's object type (SECRET_DATA)." + ) + self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_unique_identifier(self): """ Test the Locate operation when the 'Unique Identifier' attribute