From b5a87391574dc5a14080e6282764da2e3f71429e Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Mon, 12 Aug 2019 15:22:04 -0400 Subject: [PATCH] Add CryptographicUsageMask filtering support for Locate This change updates Locate operation support in the PyKMIP server, allowing users to filter objects based on the object's Cryptographic Usage Masks. Unit tests and integration tests have been added to test and verify the correctness of this feature. Additionally, the Locate demo scripts have also been updated to support Cryptographic Usage Mask filtering. Simply use the "--cryptographic-usage-mask" flag to specify one or more Cryptographic Usage Mask enumeration values for the Locate script to filter on. --- kmip/demos/pie/locate.py | 24 ++++ kmip/demos/units/locate.py | 24 ++++ kmip/demos/utils.py | 14 ++ kmip/services/server/engine.py | 19 +++ .../integration/services/test_integration.py | 49 +++++++ .../services/test_proxykmipclient.py | 49 +++++++ .../tests/unit/services/server/test_engine.py | 123 ++++++++++++++++++ 7 files changed, 302 insertions(+) diff --git a/kmip/demos/pie/locate.py b/kmip/demos/pie/locate.py index f7908c6..ed6bedb 100644 --- a/kmip/demos/pie/locate.py +++ b/kmip/demos/pie/locate.py @@ -40,6 +40,7 @@ if __name__ == '__main__': object_type = opts.object_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length + cryptographic_usage_masks = opts.cryptographic_usage_masks unique_identifier = opts.unique_identifier operation_policy_name = opts.operation_policy_name @@ -147,6 +148,29 @@ if __name__ == '__main__': ) ) sys.exit(-6) + if cryptographic_usage_masks: + masks = [] + for cryptographic_usage_mask in cryptographic_usage_masks: + mask = getattr( + enums.CryptographicUsageMask, + cryptographic_usage_mask, + None + ) + if mask: + masks.append(mask) + else: + logger.error( + "Invalid cryptographic usage mask provided: {}".format( + cryptographic_usage_mask + ) + ) + sys.exit(-7) + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + masks + ) + ) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/units/locate.py b/kmip/demos/units/locate.py index f46ee00..87645dd 100644 --- a/kmip/demos/units/locate.py +++ b/kmip/demos/units/locate.py @@ -43,6 +43,7 @@ if __name__ == '__main__': object_type = opts.object_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length + cryptographic_usage_masks = opts.cryptographic_usage_masks unique_identifier = opts.unique_identifier operation_policy_name = opts.operation_policy_name @@ -174,6 +175,29 @@ if __name__ == '__main__': ) client.close() sys.exit(-6) + if cryptographic_usage_masks: + masks = [] + for cryptographic_usage_mask in cryptographic_usage_masks: + mask = getattr( + enums.CryptographicUsageMask, + cryptographic_usage_mask, + None + ) + if mask: + masks.append(mask) + else: + logger.error( + "Invalid cryptographic usage mask provided: {}".format( + cryptographic_usage_mask + ) + ) + sys.exit(-7) + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + masks + ) + ) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index f4308d8..5fc50f8 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -303,6 +303,20 @@ def build_cli_parser(operation=None): dest="cryptographic_length", help="The cryptographic length of the secret (e.g., 128, 2048)" ) + parser.add_option( + "--cryptographic-usage-mask", + action="append", + type="str", + default=[], + dest="cryptographic_usage_masks", + help=( + "The cryptographic usage mask(s) the secret should have set " + "(e.g., ENCRYPT, DECRYPT). Use multiple times to specify " + "multiple cryptographic usage mask enumeration values. All " + "values will get combined into a single mask when sent to the " + "server." + ) + ) parser.add_option( "-i", "--unique-identifier", diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 73cc9e9..fe064c9 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1743,6 +1743,25 @@ class KmipEngine(object): ) add_object = False break + elif name == "Cryptographic Usage Mask": + value = value.value + mask_values = enums.get_enumerations_from_bit_mask( + enums.CryptographicUsageMask, + value + ) + for mask_value in mask_values: + if mask_value not in attribute: + self._logger.debug( + "Failed match: " + "the specified cryptographic usage mask " + "({}) is not set on the object.".format( + mask_value.name + ) + ) + add_object = False + break + if not add_object: + break elif name == enums.AttributeType.INITIAL_DATE.value: initial_date["value"] = attribute self._track_date_attributes( diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index eaceb69..3856a49 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -1493,6 +1493,55 @@ class TestIntegration(testtools.TestCase): self.assertEqual(1, len(result.uuids)) self.assertIn(uid_a, result.uuids) + # Test locating keys using their cryptographic usage masks + mask = [enums.CryptographicUsageMask.ENCRYPT] + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result.uuids)) + self.assertIn(uid_a, result.uuids) + self.assertIn(uid_b, result.uuids) + + mask.append(enums.CryptographicUsageMask.DECRYPT) + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result.uuids)) + self.assertIn(uid_a, result.uuids) + self.assertIn(uid_b, result.uuids) + + mask.append(enums.CryptographicUsageMask.SIGN) + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result.uuids)) + + mask = [enums.CryptographicUsageMask.EXPORT] + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result.uuids)) + # Clean up keys result = self.client.destroy(uid_a) self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) diff --git a/kmip/tests/integration/services/test_proxykmipclient.py b/kmip/tests/integration/services/test_proxykmipclient.py index d38185a..a4ee6bf 100644 --- a/kmip/tests/integration/services/test_proxykmipclient.py +++ b/kmip/tests/integration/services/test_proxykmipclient.py @@ -1148,6 +1148,55 @@ class TestProxyKmipClientIntegration(testtools.TestCase): self.assertEqual(1, len(result)) self.assertIn(a_id, result) + # Test locating keys using their cryptographic usage masks + mask = [enums.CryptographicUsageMask.ENCRYPT] + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result)) + self.assertIn(a_id, result) + self.assertIn(b_id, result) + + mask.append(enums.CryptographicUsageMask.DECRYPT) + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result)) + self.assertIn(a_id, result) + self.assertIn(b_id, result) + + mask.append(enums.CryptographicUsageMask.SIGN) + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result)) + + mask = [enums.CryptographicUsageMask.EXPORT] + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result)) + # Clean up the keys self.client.destroy(a_id) self.client.destroy(b_id) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index d6dbb64..1702ee0 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -5163,6 +5163,129 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_cryptographic_usage_masks(self): + """ + Test the Locate operation when 'Cryptographic Usage Mask' values are + given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) + obj_a.cryptographic_usage_masks = [ + enums.CryptographicUsageMask.EXPORT, + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + obj_b.cryptographic_usage_masks = [ + enums.CryptographicUsageMask.EXPORT + ] + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the objects based on their shared cryptographic usage masks. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.EXPORT] + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + self.assertEqual(2, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) + + # Locate the symmetric key based on its unique cryptographic usage + # masks. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Failed match: the specified cryptographic usage mask (ENCRYPT) " + "is not set on the object." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + + # Try to locate a non-existent object based on its unique cryptographic + # usage masks. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.SIGN + ] + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Failed match: the specified cryptographic usage mask (SIGN) " + "is not set on the object." + ) + e._logger.debug.assert_any_call( + "Failed match: the specified cryptographic usage mask (SIGN) " + "is not set on the object." + ) + self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_unique_identifier(self): """ Test the Locate operation when the 'Unique Identifier' attribute