diff --git a/kmip/demos/pie/locate.py b/kmip/demos/pie/locate.py index 964a894..691804c 100644 --- a/kmip/demos/pie/locate.py +++ b/kmip/demos/pie/locate.py @@ -36,6 +36,7 @@ if __name__ == '__main__': initial_dates = opts.initial_dates state = opts.state object_type = opts.object_type + cryptographic_algorithm = opts.cryptographic_algorithm attribute_factory = AttributeFactory() @@ -99,6 +100,26 @@ if __name__ == '__main__': "Invalid object type provided: {}".format(opts.object_type) ) sys.exit(-4) + if cryptographic_algorithm: + cryptographic_algorithm = getattr( + enums.CryptographicAlgorithm, + cryptographic_algorithm, + None + ) + if cryptographic_algorithm: + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + cryptographic_algorithm + ) + ) + else: + logger.error( + "Invalid cryptographic algorithm provided: {}".format( + opts.cryptographic_algorithm + ) + ) + sys.exit(-5) # Build the client and connect to the server with client.ProxyKmipClient( diff --git a/kmip/demos/units/locate.py b/kmip/demos/units/locate.py index 9c854b8..5ee2f22 100644 --- a/kmip/demos/units/locate.py +++ b/kmip/demos/units/locate.py @@ -39,6 +39,7 @@ if __name__ == '__main__': initial_dates = opts.initial_dates state = opts.state object_type = opts.object_type + cryptographic_algorithm = opts.cryptographic_algorithm attribute_factory = AttributeFactory() credential_factory = CredentialFactory() @@ -124,6 +125,27 @@ if __name__ == '__main__': ) client.close() sys.exit(-4) + if cryptographic_algorithm: + cryptographic_algorithm = getattr( + enums.CryptographicAlgorithm, + cryptographic_algorithm, + None + ) + if cryptographic_algorithm: + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + cryptographic_algorithm + ) + ) + else: + logger.error( + "Invalid cryptographic algorithm provided: {}".format( + opts.cryptographic_algorithm + ) + ) + client.close() + sys.exit(-5) result = client.locate(attributes=attributes, credential=credential) client.close() diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index 75884b0..2b6c48e 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -271,6 +271,14 @@ def build_cli_parser(operation=None): "(e.g., CERTIFICATE, SYMMETRIC_KEY)" ) ) + parser.add_option( + "--cryptographic-algorithm", + action="store", + type="str", + default=None, + dest="cryptographic_algorithm", + help="The cryptographic algorithm of the secret (e.g., AES, RSA)" + ) elif operation is Operation.REGISTER: parser.add_option( "-f", diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index fb9a43b..0b1a32c 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1623,6 +1623,26 @@ class KmipEngine(object): for payload_attribute in payload.attributes: name = payload_attribute.attribute_name.value value = payload_attribute.attribute_value + + # Verify that the attribute is applicable to the current + # object. If not, the object doesn't match, so skip it. + policy = self._attribute_policy + if not policy.is_attribute_applicable_to_object_type( + name, + managed_object.object_type + ): + self._logger.debug( + "Failed match: " + "the specified attribute ({}) is not applicable " + "for the object's object type ({}).".format( + name, + managed_object.object_type.name) + ) + add_object = False + break + + # Fetch the attribute from the object and check if it + # matches. If not, the object doesn't match, so skip it. attribute = self._get_attribute_from_managed_object( managed_object, name @@ -1667,6 +1687,20 @@ class KmipEngine(object): ) add_object = False break + elif name == "Cryptographic Algorithm": + value = value.value + if value != attribute: + self._logger.debug( + "Failed match: " + "the specified cryptographic algorithm ({}) " + "does not match the object's cryptographic " + "algorithm ({}).".format( + value.name, + attribute.name + ) + ) + add_object = False + break elif name == enums.AttributeType.INITIAL_DATE.value: initial_date["value"] = attribute self._track_date_attributes( diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index c972fbc..c650339 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -1373,6 +1373,31 @@ class TestIntegration(testtools.TestCase): self.assertIn(uid_a, result.uuids) self.assertIn(uid_b, result.uuids) + # Test locating each key by its cryptographic algorithm. + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(2, len(result.uuids)) + self.assertIn(uid_a, result.uuids) + self.assertIn(uid_b, result.uuids) + + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.IDEA + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(0, len(result.uuids)) + # Clean up keys result = self.client.destroy(uid_a) self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) diff --git a/kmip/tests/integration/services/test_proxykmipclient.py b/kmip/tests/integration/services/test_proxykmipclient.py index 0407bb0..9449f0d 100644 --- a/kmip/tests/integration/services/test_proxykmipclient.py +++ b/kmip/tests/integration/services/test_proxykmipclient.py @@ -883,7 +883,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase): mid_time = int(time.time()) time.sleep(2) - b_id = self.client.create(enums.CryptographicAlgorithm.AES, 128) + b_id = self.client.create(enums.CryptographicAlgorithm.IDEA, 128) time.sleep(2) end_time = int(time.time()) @@ -1010,6 +1010,39 @@ class TestProxyKmipClientIntegration(testtools.TestCase): self.assertIn(a_id, result) self.assertIn(b_id, result) + # Test locating each key by its cryptographic algorithm. + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertIn(a_id, result) + + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.IDEA + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertIn(b_id, result) + + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ) + ] + ) + self.assertEqual(0, len(result)) + # Clean up the keys self.client.destroy(a_id) self.client.destroy(b_id) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 2f47dba..0667e23 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -4889,6 +4889,94 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_cryptographic_algorithm(self): + """ + Test the Locate operation when the 'Cryptographic Algorithm' attribute + is given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the symmetric key object based on its cryptographic algorithm. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified attribute (Cryptographic Algorithm) is not " + "applicable for the object's object type (SECRET_DATA)." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + + # Try to locate a non-existent object based on its cryptographic + # algorithm. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Failed match: " + "the specified cryptographic algorithm (RSA) does not match " + "the object's cryptographic algorithm (AES)." + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified attribute (Cryptographic Algorithm) is not " + "applicable for the object's object type (SECRET_DATA)." + ) + self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_get(self): """ Test that a Get request can be processed correctly.