diff --git a/kmip/demos/pie/locate.py b/kmip/demos/pie/locate.py index f7908c6..ed6bedb 100644 --- a/kmip/demos/pie/locate.py +++ b/kmip/demos/pie/locate.py @@ -40,6 +40,7 @@ if __name__ == '__main__': object_type = opts.object_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length + cryptographic_usage_masks = opts.cryptographic_usage_masks unique_identifier = opts.unique_identifier operation_policy_name = opts.operation_policy_name @@ -147,6 +148,29 @@ if __name__ == '__main__': ) ) sys.exit(-6) + if cryptographic_usage_masks: + masks = [] + for cryptographic_usage_mask in cryptographic_usage_masks: + mask = getattr( + enums.CryptographicUsageMask, + cryptographic_usage_mask, + None + ) + if mask: + masks.append(mask) + else: + logger.error( + "Invalid cryptographic usage mask provided: {}".format( + cryptographic_usage_mask + ) + ) + sys.exit(-7) + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + masks + ) + ) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/units/locate.py b/kmip/demos/units/locate.py index f46ee00..87645dd 100644 --- a/kmip/demos/units/locate.py +++ b/kmip/demos/units/locate.py @@ -43,6 +43,7 @@ if __name__ == '__main__': object_type = opts.object_type cryptographic_algorithm = opts.cryptographic_algorithm cryptographic_length = opts.cryptographic_length + cryptographic_usage_masks = opts.cryptographic_usage_masks unique_identifier = opts.unique_identifier operation_policy_name = opts.operation_policy_name @@ -174,6 +175,29 @@ if __name__ == '__main__': ) client.close() sys.exit(-6) + if cryptographic_usage_masks: + masks = [] + for cryptographic_usage_mask in cryptographic_usage_masks: + mask = getattr( + enums.CryptographicUsageMask, + cryptographic_usage_mask, + None + ) + if mask: + masks.append(mask) + else: + logger.error( + "Invalid cryptographic usage mask provided: {}".format( + cryptographic_usage_mask + ) + ) + sys.exit(-7) + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + masks + ) + ) if unique_identifier: attributes.append( attribute_factory.create_attribute( diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index f4308d8..5fc50f8 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -303,6 +303,20 @@ def build_cli_parser(operation=None): dest="cryptographic_length", help="The cryptographic length of the secret (e.g., 128, 2048)" ) + parser.add_option( + "--cryptographic-usage-mask", + action="append", + type="str", + default=[], + dest="cryptographic_usage_masks", + help=( + "The cryptographic usage mask(s) the secret should have set " + "(e.g., ENCRYPT, DECRYPT). Use multiple times to specify " + "multiple cryptographic usage mask enumeration values. All " + "values will get combined into a single mask when sent to the " + "server." + ) + ) parser.add_option( "-i", "--unique-identifier", diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 73cc9e9..fe064c9 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1743,6 +1743,25 @@ class KmipEngine(object): ) add_object = False break + elif name == "Cryptographic Usage Mask": + value = value.value + mask_values = enums.get_enumerations_from_bit_mask( + enums.CryptographicUsageMask, + value + ) + for mask_value in mask_values: + if mask_value not in attribute: + self._logger.debug( + "Failed match: " + "the specified cryptographic usage mask " + "({}) is not set on the object.".format( + mask_value.name + ) + ) + add_object = False + break + if not add_object: + break elif name == enums.AttributeType.INITIAL_DATE.value: initial_date["value"] = attribute self._track_date_attributes( diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index eaceb69..3856a49 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -1493,6 +1493,55 @@ class TestIntegration(testtools.TestCase): self.assertEqual(1, len(result.uuids)) self.assertIn(uid_a, result.uuids) + # Test locating keys using their cryptographic usage masks + mask = [enums.CryptographicUsageMask.ENCRYPT] + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result.uuids)) + self.assertIn(uid_a, result.uuids) + self.assertIn(uid_b, result.uuids) + + mask.append(enums.CryptographicUsageMask.DECRYPT) + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result.uuids)) + self.assertIn(uid_a, result.uuids) + self.assertIn(uid_b, result.uuids) + + mask.append(enums.CryptographicUsageMask.SIGN) + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result.uuids)) + + mask = [enums.CryptographicUsageMask.EXPORT] + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result.uuids)) + # Clean up keys result = self.client.destroy(uid_a) self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) diff --git a/kmip/tests/integration/services/test_proxykmipclient.py b/kmip/tests/integration/services/test_proxykmipclient.py index d38185a..a4ee6bf 100644 --- a/kmip/tests/integration/services/test_proxykmipclient.py +++ b/kmip/tests/integration/services/test_proxykmipclient.py @@ -1148,6 +1148,55 @@ class TestProxyKmipClientIntegration(testtools.TestCase): self.assertEqual(1, len(result)) self.assertIn(a_id, result) + # Test locating keys using their cryptographic usage masks + mask = [enums.CryptographicUsageMask.ENCRYPT] + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result)) + self.assertIn(a_id, result) + self.assertIn(b_id, result) + + mask.append(enums.CryptographicUsageMask.DECRYPT) + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(2, len(result)) + self.assertIn(a_id, result) + self.assertIn(b_id, result) + + mask.append(enums.CryptographicUsageMask.SIGN) + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result)) + + mask = [enums.CryptographicUsageMask.EXPORT] + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + mask + ) + ] + ) + self.assertEqual(0, len(result)) + # Clean up the keys self.client.destroy(a_id) self.client.destroy(b_id) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index d6dbb64..1702ee0 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -5163,6 +5163,129 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_cryptographic_usage_masks(self): + """ + Test the Locate operation when 'Cryptographic Usage Mask' values are + given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) + obj_a.cryptographic_usage_masks = [ + enums.CryptographicUsageMask.EXPORT, + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + obj_b.cryptographic_usage_masks = [ + enums.CryptographicUsageMask.EXPORT + ] + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the objects based on their shared cryptographic usage masks. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.EXPORT] + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + self.assertEqual(2, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) + + # Locate the symmetric key based on its unique cryptographic usage + # masks. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Failed match: the specified cryptographic usage mask (ENCRYPT) " + "is not set on the object." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + + # Try to locate a non-existent object based on its unique cryptographic + # usage masks. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.SIGN + ] + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Failed match: the specified cryptographic usage mask (SIGN) " + "is not set on the object." + ) + e._logger.debug.assert_any_call( + "Failed match: the specified cryptographic usage mask (SIGN) " + "is not set on the object." + ) + self.assertEqual(0, len(response_payload.unique_identifiers)) + def test_locate_with_unique_identifier(self): """ Test the Locate operation when the 'Unique Identifier' attribute