diff --git a/kmip/core/objects.py b/kmip/core/objects.py index d807c42..5bccb9d 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -5499,3 +5499,550 @@ class ValidationInformation(primitives.Struct): return not (self == other) else: return NotImplemented + + +class CapabilityInformation(primitives.Struct): + """ + A structure containing details of supported server capabilities. + + This is intended for use with KMIP 1.3+. + + Attributes: + streaming_capability: A boolean flag indicating whether or not + the server supports streaming data. + asynchronous_capability: A boolean flag indicating whether or not + the server supports asynchronous operations. + attestation_capability: A boolean flag indicating whether or not + the server supports attestation. + batch_undo_capability: A boolean flag indicating whether or not + the server supports batch undo. Added in KMIP 1.4. + batch_continue_capability: A boolean flag indicating whether or not + the server supports batch continue. Added in KMIP 1.4. + unwrap_mode: An UnwrapMode enumeration identifying the unwrap mode + supported by the server. + destroy_action: A DestroyAction enumeration identifying the destroy + action supported by the server. + shredding_algorithm: A ShreddingAlgorithm enumeration identifying + the shredding algorithm supported by the server. + rng_mode: An RNGMode enumeration identifying the RNG mode supported + by the server. + """ + + def __init__(self, + streaming_capability=None, + asynchronous_capability=None, + attestation_capability=None, + batch_undo_capability=None, + batch_continue_capability=None, + unwrap_mode=None, + destroy_action=None, + shredding_algorithm=None, + rng_mode=None): + """ + Construct a CapabilityInformation structure. + + Args: + streaming_capability (bool): A boolean flag indicating whether or + not the server supports streaming data. Optional, defaults to + None. + asynchronous_capability (bool): A boolean flag indicating whether + or not the server supports asynchronous operations. Optional, + defaults to None. + attestation_capability (bool): A boolean flag indicating whether + or not the server supports attestation. Optional, defaults to + None. + batch_undo_capability (bool): A boolean flag indicating whether or + not the server supports batch undo. Added in KMIP 1.4. + Optional, defaults to None. + batch_continue_capability (bool): A boolean flag indicating whether + or not the server supports batch continue. Added in KMIP 1.4. + Optional, defaults to None. + unwrap_mode (enum): An UnwrapMode enumeration identifying the + unwrap mode supported by the server. Optional, defaults to + None. + destroy_action (enum): A DestroyAction enumeration identifying the + destroy action supported by the server. Optional, defaults to + None. + shredding_algorithm (enum): A ShreddingAlgorithm enumeration + identifying the shredding algorithm supported by the server. + Optional, defaults to None. + rng_mode (enum): An RNGMode enumeration identifying the RNG mode + supported by the server. Optional, defaults to None. + """ + super(CapabilityInformation, self).__init__( + tag=enums.Tags.CAPABILITY_INFORMATION + ) + + self._streaming_capability = None + self._asynchronous_capability = None + self._attestation_capability = None + self._batch_undo_capability = None + self._batch_continue_capability = None + self._unwrap_mode = None + self._destroy_action = None + self._shredding_algorithm = None + self._rng_mode = None + + self.streaming_capability = streaming_capability + self.asynchronous_capability = asynchronous_capability + self.attestation_capability = attestation_capability + self.batch_undo_capability = batch_undo_capability + self.batch_continue_capability = batch_continue_capability + self.unwrap_mode = unwrap_mode + self.destroy_action = destroy_action + self.shredding_algorithm = shredding_algorithm + self.rng_mode = rng_mode + + @property + def streaming_capability(self): + if self._streaming_capability: + return self._streaming_capability.value + return None + + @streaming_capability.setter + def streaming_capability(self, value): + if value is None: + self._streaming_capability = None + elif isinstance(value, bool): + self._streaming_capability = primitives.Boolean( + value=value, + tag=enums.Tags.STREAMING_CAPABILITY + ) + else: + raise TypeError("The streaming capability must be a boolean.") + + @property + def asynchronous_capability(self): + if self._asynchronous_capability: + return self._asynchronous_capability.value + return None + + @asynchronous_capability.setter + def asynchronous_capability(self, value): + if value is None: + self._asynchronous_capability = None + elif isinstance(value, bool): + self._asynchronous_capability = primitives.Boolean( + value=value, + tag=enums.Tags.ASYNCHRONOUS_CAPABILITY + ) + else: + raise TypeError( + "The asynchronous capability must be a boolean." + ) + + @property + def attestation_capability(self): + if self._attestation_capability: + return self._attestation_capability.value + return None + + @attestation_capability.setter + def attestation_capability(self, value): + if value is None: + self._attestation_capability = None + elif isinstance(value, bool): + self._attestation_capability = primitives.Boolean( + value=value, + tag=enums.Tags.ATTESTATION_CAPABILITY + ) + else: + raise TypeError("The attestation capability must be a boolean.") + + @property + def batch_undo_capability(self): + if self._batch_undo_capability: + return self._batch_undo_capability.value + return None + + @batch_undo_capability.setter + def batch_undo_capability(self, value): + if value is None: + self._batch_undo_capability = None + elif isinstance(value, bool): + self._batch_undo_capability = primitives.Boolean( + value=value, + tag=enums.Tags.BATCH_UNDO_CAPABILITY + ) + else: + raise TypeError("The batch undo capability must be a boolean.") + + @property + def batch_continue_capability(self): + if self._batch_continue_capability: + return self._batch_continue_capability.value + return None + + @batch_continue_capability.setter + def batch_continue_capability(self, value): + if value is None: + self._batch_continue_capability = None + elif isinstance(value, bool): + self._batch_continue_capability = primitives.Boolean( + value=value, + tag=enums.Tags.BATCH_CONTINUE_CAPABILITY + ) + else: + raise TypeError( + "The batch continue capability must be a boolean." + ) + + @property + def unwrap_mode(self): + if self._unwrap_mode: + return self._unwrap_mode.value + return None + + @unwrap_mode.setter + def unwrap_mode(self, value): + if value is None: + self._unwrap_mode = None + elif isinstance(value, enums.UnwrapMode): + self._unwrap_mode = primitives.Enumeration( + enums.UnwrapMode, + value=value, + tag=enums.Tags.UNWRAP_MODE + ) + else: + raise TypeError( + "The unwrap mode must be an UnwrapMode enumeration." + ) + + @property + def destroy_action(self): + if self._destroy_action: + return self._destroy_action.value + return None + + @destroy_action.setter + def destroy_action(self, value): + if value is None: + self._destroy_action = None + elif isinstance(value, enums.DestroyAction): + self._destroy_action = primitives.Enumeration( + enums.DestroyAction, + value=value, + tag=enums.Tags.DESTROY_ACTION + ) + else: + raise TypeError( + "The destroy action must be a DestroyAction enumeration." + ) + + @property + def shredding_algorithm(self): + if self._shredding_algorithm: + return self._shredding_algorithm.value + return None + + @shredding_algorithm.setter + def shredding_algorithm(self, value): + if value is None: + self._shredding_algorithm = None + elif isinstance(value, enums.ShreddingAlgorithm): + self._shredding_algorithm = primitives.Enumeration( + enums.ShreddingAlgorithm, + value=value, + tag=enums.Tags.SHREDDING_ALGORITHM + ) + else: + raise TypeError( + "The shredding algorithm must be a ShreddingAlgorithm " + "enumeration." + ) + + @property + def rng_mode(self): + if self._rng_mode: + return self._rng_mode.value + return None + + @rng_mode.setter + def rng_mode(self, value): + if value is None: + self._rng_mode = None + elif isinstance(value, enums.RNGMode): + self._rng_mode = primitives.Enumeration( + enums.RNGMode, + value=value, + tag=enums.Tags.RNG_MODE + ) + else: + raise TypeError("The RNG mode must be an RNGMode enumeration.") + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3): + """ + Read the data encoding the CapabilityInformation structure and decode + it into its constituent parts. + + Args: + input_buffer (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 2.0. + + Raises: + VersionNotSupported: Raised when a KMIP version is provided that + does not support the CapabilityInformation structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_1_3: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the CapabilityInformation " + "object.".format( + kmip_version.value + ) + ) + + super(CapabilityInformation, self).read( + input_buffer, + kmip_version=kmip_version + ) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) + + if self.is_tag_next(enums.Tags.STREAMING_CAPABILITY, local_buffer): + streaming_capability = primitives.Boolean( + tag=enums.Tags.STREAMING_CAPABILITY + ) + streaming_capability.read(local_buffer, kmip_version=kmip_version) + self._streaming_capability = streaming_capability + + if self.is_tag_next(enums.Tags.ASYNCHRONOUS_CAPABILITY, local_buffer): + asynchronous_capability = primitives.Boolean( + tag=enums.Tags.ASYNCHRONOUS_CAPABILITY + ) + asynchronous_capability.read( + local_buffer, + kmip_version=kmip_version + ) + self._asynchronous_capability = asynchronous_capability + + if self.is_tag_next(enums.Tags.ATTESTATION_CAPABILITY, local_buffer): + attestation_capability = primitives.Boolean( + tag=enums.Tags.ATTESTATION_CAPABILITY + ) + attestation_capability.read( + local_buffer, + kmip_version=kmip_version + ) + self._attestation_capability = attestation_capability + + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self.is_tag_next( + enums.Tags.BATCH_UNDO_CAPABILITY, + local_buffer + ): + batch_undo_capability = primitives.Boolean( + tag=enums.Tags.BATCH_UNDO_CAPABILITY + ) + batch_undo_capability.read( + local_buffer, + kmip_version=kmip_version + ) + self._batch_continue_capability = batch_undo_capability + + if self.is_tag_next( + enums.Tags.BATCH_CONTINUE_CAPABILITY, + local_buffer + ): + batch_continue_capability = primitives.Boolean( + tag=enums.Tags.BATCH_CONTINUE_CAPABILITY + ) + batch_continue_capability.read( + local_buffer, + kmip_version=kmip_version + ) + self._batch_continue_capability = batch_continue_capability + + if self.is_tag_next(enums.Tags.UNWRAP_MODE, local_buffer): + unwrap_mode = primitives.Enumeration( + enums.UnwrapMode, + tag=enums.Tags.UNWRAP_MODE + ) + unwrap_mode.read(local_buffer, kmip_version=kmip_version) + self._unwrap_mode = unwrap_mode + + if self.is_tag_next(enums.Tags.DESTROY_ACTION, local_buffer): + destroy_action = primitives.Enumeration( + enums.DestroyAction, + tag=enums.Tags.DESTROY_ACTION + ) + destroy_action.read(local_buffer, kmip_version=kmip_version) + self._destroy_action = destroy_action + + if self.is_tag_next(enums.Tags.SHREDDING_ALGORITHM, local_buffer): + shredding_algorithm = primitives.Enumeration( + enums.ShreddingAlgorithm, + tag=enums.Tags.SHREDDING_ALGORITHM + ) + shredding_algorithm.read(local_buffer, kmip_version=kmip_version) + self._shredding_algorithm = shredding_algorithm + + if self.is_tag_next(enums.Tags.RNG_MODE, local_buffer): + rng_mode = primitives.Enumeration( + enums.RNGMode, + tag=enums.Tags.RNG_MODE + ) + rng_mode.read(local_buffer, kmip_version=kmip_version) + self._rng_mode = rng_mode + + self.is_oversized(local_buffer) + + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3): + """ + Write the CapabilityInformation structure encoding to the data stream. + + Args: + output_buffer (stream): A data stream in which to encode + CapabilityInformation structure data, supporting a write + method. + kmip_version (enum): A KMIPVersion enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 2.0. + + Raises: + VersionNotSupported: Raised when a KMIP version is provided that + does not support the CapabilityInformation structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_1_3: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the CapabilityInformation " + "object.".format( + kmip_version.value + ) + ) + + local_buffer = BytearrayStream() + + if self._streaming_capability: + self._streaming_capability.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._asynchronous_capability: + self._asynchronous_capability.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._attestation_capability: + self._attestation_capability.write( + local_buffer, + kmip_version=kmip_version + ) + + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self._batch_undo_capability: + self._batch_undo_capability.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._batch_continue_capability: + self._batch_continue_capability.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._unwrap_mode: + self._unwrap_mode.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._destroy_action: + self._destroy_action.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._shredding_algorithm: + self._shredding_algorithm.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._rng_mode: + self._rng_mode.write( + local_buffer, + kmip_version=kmip_version + ) + + self.length = local_buffer.length() + super(CapabilityInformation, self).write( + output_buffer, + kmip_version=kmip_version + ) + output_buffer.write(local_buffer.buffer) + + def __repr__(self): + sc = "streaming_capability={}".format(self.streaming_capability) + rc = "asynchronous_capability={}".format(self.asynchronous_capability) + tc = "attestation_capability={}".format(self.attestation_capability) + buc = "batch_undo_capability={}".format(self.batch_undo_capability) + bcc = "batch_continue_capability={}".format( + self.batch_continue_capability + ) + um = "unwrap_mode={}".format(self.unwrap_mode) + da = "destroy_action={}".format(self.destroy_action) + sa = "shredding_algorithm={}".format(self.shredding_algorithm) + rm = "rng_mode={}".format(self.rng_mode) + + v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm]) + + return "CapabilityInformation({})".format(v) + + def __str__(self): + sc = '"streaming_capability": {}'.format(self.streaming_capability) + rc = '"asynchronous_capability": {}'.format( + self.asynchronous_capability + ) + tc = '"attestation_capability": {}'.format( + self.attestation_capability + ) + buc = '"batch_undo_capability": {}'.format(self.batch_undo_capability) + bcc = '"batch_continue_capability": {}'.format( + self.batch_continue_capability + ) + um = '"unwrap_mode": {}'.format(self.unwrap_mode) + da = '"destroy_action": {}'.format(self.destroy_action) + sa = '"shredding_algorithm": {}'.format(self.shredding_algorithm) + rm = '"rng_mode": {}'.format(self.rng_mode) + + v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm]) + + return '{' + v + '}' + + def __eq__(self, other): + if isinstance(other, CapabilityInformation): + if self.streaming_capability != other.streaming_capability: + return False + elif self.asynchronous_capability != other.asynchronous_capability: + return False + elif self.attestation_capability != other.attestation_capability: + return False + elif self.batch_undo_capability != other.batch_undo_capability: + return False + elif self.batch_continue_capability != \ + other.batch_continue_capability: + return False + elif self.unwrap_mode != other.unwrap_mode: + return False + elif self.destroy_action != other.destroy_action: + return False + elif self.shredding_algorithm != other.shredding_algorithm: + return False + elif self.rng_mode != other.rng_mode: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, CapabilityInformation): + return not (self == other) + else: + return NotImplemented diff --git a/kmip/tests/unit/core/objects/test_objects.py b/kmip/tests/unit/core/objects/test_objects.py index 4f69a26..d86f8d0 100644 --- a/kmip/tests/unit/core/objects/test_objects.py +++ b/kmip/tests/unit/core/objects/test_objects.py @@ -8444,3 +8444,930 @@ class TestValidationInformation(testtools.TestCase): self.assertTrue(a != b) self.assertTrue(b != a) + + +class TestCapabilityInformation(testtools.TestCase): + + def setUp(self): + super(TestCapabilityInformation, self).setUp() + + # This encoding matches the following set of values: + # + # Capability Information + # Streaming Capability - False + # Asynchronous Capability - True + # Attestation Capability - True + # Batch Undo Capability - False + # Batch Continue Capability - True + # Unwrap Mode - PROCESSED + # Destroy Action - SHREDDED + # Shredding Algorithm - CRYPTOGRAPHIC + # RNG Mode - NON_SHARED_INSTANTIATION + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\xF7\x01\x00\x00\x00\x90' + b'\x42\x00\xEF\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xF0\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\xF1\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\xF9\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xFA\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\xF2\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\xF3\x05\x00\x00\x00\x04\x00\x00\x00\x07\x00\x00\x00\x00' + b'\x42\x00\xF4\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\xF5\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # + # Capability Information + # Streaming Capability - False + # Asynchronous Capability - True + # Attestation Capability - True + # Unwrap Mode - PROCESSED + # Destroy Action - SHREDDED + # Shredding Algorithm - CRYPTOGRAPHIC + # RNG Mode - NON_SHARED_INSTANTIATION + self.full_encoding_kmip_1_3 = utils.BytearrayStream( + b'\x42\x00\xF7\x01\x00\x00\x00\x70' + b'\x42\x00\xEF\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xF0\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\xF1\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\xF2\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\xF3\x05\x00\x00\x00\x04\x00\x00\x00\x07\x00\x00\x00\x00' + b'\x42\x00\xF4\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\xF5\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # + # Capability Information + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\xF7\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestCapabilityInformation, self).tearDown() + + def test_invalid_streaming_capability(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the streaming capability of a CapabilityInformation structure. + """ + kwargs = {"streaming_capability": "invalid"} + self.assertRaisesRegex( + TypeError, + "The streaming capability must be a boolean.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "streaming_capability", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The streaming capability must be a boolean.", + setattr, + *args + ) + + def test_invalid_asynchronous_capability(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the asynchronous capability of a CapabilityInformation structure. + """ + kwargs = {"asynchronous_capability": "invalid"} + self.assertRaisesRegex( + TypeError, + "The asynchronous capability must be a boolean.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "asynchronous_capability", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The asynchronous capability must be a boolean.", + setattr, + *args + ) + + def test_invalid_attestation_capability(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the attestation capability of a CapabilityInformation structure. + """ + kwargs = {"attestation_capability": "invalid"} + self.assertRaisesRegex( + TypeError, + "The attestation capability must be a boolean.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "attestation_capability", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The attestation capability must be a boolean.", + setattr, + *args + ) + + def test_invalid_batch_undo_capability(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the batch undo capability of a CapabilityInformation structure. + """ + kwargs = {"batch_undo_capability": "invalid"} + self.assertRaisesRegex( + TypeError, + "The batch undo capability must be a boolean.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "batch_undo_capability", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The batch undo capability must be a boolean.", + setattr, + *args + ) + + def test_invalid_batch_continue_capability(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the batch continue capability of a CapabilityInformation structure. + """ + kwargs = {"batch_continue_capability": "invalid"} + self.assertRaisesRegex( + TypeError, + "The batch continue capability must be a boolean.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "batch_continue_capability", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The batch continue capability must be a boolean.", + setattr, + *args + ) + + def test_invalid_unwrap_mode(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unwrap mode of a CapabilityInformation structure. + """ + kwargs = {"unwrap_mode": "invalid"} + self.assertRaisesRegex( + TypeError, + "The unwrap mode must be an UnwrapMode enumeration.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "unwrap_mode", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The unwrap mode must be an UnwrapMode enumeration.", + setattr, + *args + ) + + def test_invalid_destroy_action(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the destroy action of a CapabilityInformation structure. + """ + kwargs = {"destroy_action": "invalid"} + self.assertRaisesRegex( + TypeError, + "The destroy action must be a DestroyAction enumeration.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "destroy_action", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The destroy action must be a DestroyAction enumeration.", + setattr, + *args + ) + + def test_invalid_shredding_algorithm(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the shredding algorithm of a CapabilityInformation structure. + """ + kwargs = {"shredding_algorithm": "invalid"} + self.assertRaisesRegex( + TypeError, + "The shredding algorithm must be a ShreddingAlgorithm " + "enumeration.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "shredding_algorithm", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The shredding algorithm must be a ShreddingAlgorithm " + "enumeration.", + setattr, + *args + ) + + def test_invalid_rng_mode(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the RNG mode of a CapabilityInformation structure. + """ + kwargs = {"rng_mode": "invalid"} + self.assertRaisesRegex( + TypeError, + "The RNG mode must be an RNGMode enumeration.", + objects.CapabilityInformation, + **kwargs + ) + + args = ( + objects.CapabilityInformation(), + "rng_mode", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The RNG mode must be an RNGMode enumeration.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a CapabilityInformation structure can be correctly read in + from a data stream. + """ + capability_information = objects.CapabilityInformation() + + self.assertIsNone(capability_information.streaming_capability) + self.assertIsNone(capability_information.asynchronous_capability) + self.assertIsNone(capability_information.attestation_capability) + self.assertIsNone(capability_information.batch_undo_capability) + self.assertIsNone(capability_information.batch_continue_capability) + self.assertIsNone(capability_information.unwrap_mode) + self.assertIsNone(capability_information.destroy_action) + self.assertIsNone(capability_information.shredding_algorithm) + self.assertIsNone(capability_information.rng_mode) + + capability_information.read( + self.full_encoding, + kmip_version=enums.KMIPVersion.KMIP_1_4 + ) + + self.assertFalse(capability_information.streaming_capability) + self.assertTrue(capability_information.asynchronous_capability) + self.assertTrue(capability_information.attestation_capability) + self.assertFalse(capability_information.batch_undo_capability) + self.assertTrue(capability_information.batch_continue_capability) + self.assertEqual( + enums.UnwrapMode.PROCESSED, + capability_information.unwrap_mode + ) + self.assertEqual( + enums.DestroyAction.SHREDDED, + capability_information.destroy_action + ) + self.assertEqual( + enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + capability_information.shredding_algorithm + ) + self.assertEqual( + enums.RNGMode.NON_SHARED_INSTANTIATION, + capability_information.rng_mode + ) + + def test_read_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the decoding of + a CapabilityInformation structure when the structure is read for an + unsupported KMIP version. + """ + capability_information = objects.CapabilityInformation() + + args = (self.full_encoding, ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.2 does not support the CapabilityInformation object.", + capability_information.read, + *args, + **kwargs + ) + + def test_read_kmip_1_3(self): + """ + Test that a CapabilityInformation structure can be correctly read in + from a data stream with only KMIP 1.3 features. + """ + capability_information = objects.CapabilityInformation() + + self.assertIsNone(capability_information.streaming_capability) + self.assertIsNone(capability_information.asynchronous_capability) + self.assertIsNone(capability_information.attestation_capability) + self.assertIsNone(capability_information.batch_undo_capability) + self.assertIsNone(capability_information.batch_continue_capability) + self.assertIsNone(capability_information.unwrap_mode) + self.assertIsNone(capability_information.destroy_action) + self.assertIsNone(capability_information.shredding_algorithm) + self.assertIsNone(capability_information.rng_mode) + + capability_information.read( + self.full_encoding_kmip_1_3, + kmip_version=enums.KMIPVersion.KMIP_1_4 + ) + + self.assertFalse(capability_information.streaming_capability) + self.assertTrue(capability_information.asynchronous_capability) + self.assertTrue(capability_information.attestation_capability) + self.assertIsNone(capability_information.batch_undo_capability) + self.assertIsNone(capability_information.batch_continue_capability) + self.assertEqual( + enums.UnwrapMode.PROCESSED, + capability_information.unwrap_mode + ) + self.assertEqual( + enums.DestroyAction.SHREDDED, + capability_information.destroy_action + ) + self.assertEqual( + enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + capability_information.shredding_algorithm + ) + self.assertEqual( + enums.RNGMode.NON_SHARED_INSTANTIATION, + capability_information.rng_mode + ) + + def test_read_empty(self): + """ + Test that a CapabilityInformation structure can be correctly read in + from an empty data stream. + """ + capability_information = objects.CapabilityInformation() + + self.assertIsNone(capability_information.streaming_capability) + self.assertIsNone(capability_information.asynchronous_capability) + self.assertIsNone(capability_information.attestation_capability) + self.assertIsNone(capability_information.batch_undo_capability) + self.assertIsNone(capability_information.batch_continue_capability) + self.assertIsNone(capability_information.unwrap_mode) + self.assertIsNone(capability_information.destroy_action) + self.assertIsNone(capability_information.shredding_algorithm) + self.assertIsNone(capability_information.rng_mode) + + capability_information.read( + self.empty_encoding, + kmip_version=enums.KMIPVersion.KMIP_1_4 + ) + + self.assertIsNone(capability_information.streaming_capability) + self.assertIsNone(capability_information.asynchronous_capability) + self.assertIsNone(capability_information.attestation_capability) + self.assertIsNone(capability_information.batch_undo_capability) + self.assertIsNone(capability_information.batch_continue_capability) + self.assertIsNone(capability_information.unwrap_mode) + self.assertIsNone(capability_information.destroy_action) + self.assertIsNone(capability_information.shredding_algorithm) + self.assertIsNone(capability_information.rng_mode) + + def test_write(self): + """ + Test that a CapabilityInformation structure can be written to a data + stream. + """ + capability_information = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + buffer = utils.BytearrayStream() + capability_information.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_1_4 + ) + + self.assertEqual(len(self.full_encoding), len(buffer)) + self.assertEqual(str(self.full_encoding), str(buffer)) + + def test_write_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the encoding of + a CapabilityInformation structure when the structure is written for an + unsupported KMIP version. + """ + capability_information = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + args = (utils.BytearrayStream(), ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.2 does not support the CapabilityInformation object.", + capability_information.write, + *args, + **kwargs + ) + + def test_write_kmip_1_3(self): + """ + Test that a CapabilityInformation structure can be written to a data + stream with only KMIP 1.3 features. + """ + capability_information = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + buffer = utils.BytearrayStream() + capability_information.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_1_3 + ) + + self.assertEqual(len(self.full_encoding_kmip_1_3), len(buffer)) + self.assertEqual(str(self.full_encoding_kmip_1_3), str(buffer)) + + def test_write_empty(self): + """ + Test that an empty CapabilityInformation structure can be correctly + written to a data stream. + """ + capability_information = objects.CapabilityInformation() + + buffer = utils.BytearrayStream() + capability_information.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_1_3 + ) + + self.assertEqual(len(self.empty_encoding), len(buffer)) + self.assertEqual(str(self.empty_encoding), str(buffer)) + + def test_repr(self): + """ + Test that repr can be applied to a CapabilityInformation structure. + """ + capability_information = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + sc = "streaming_capability=False" + rc = "asynchronous_capability=True" + tc = "attestation_capability=True" + buc = "batch_undo_capability=False" + bcc = "batch_continue_capability=True" + um = "unwrap_mode=UnwrapMode.PROCESSED" + da = "destroy_action=DestroyAction.SHREDDED" + sa = "shredding_algorithm=ShreddingAlgorithm.CRYPTOGRAPHIC" + rm = "rng_mode=RNGMode.NON_SHARED_INSTANTIATION" + + v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm]) + + self.assertEqual( + "CapabilityInformation({})".format(v), + repr(capability_information) + ) + + def test_str(self): + """ + Test that str can be applied to a CapabilityInformation structure. + """ + capability_information = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + sc = '"streaming_capability": False' + rc = '"asynchronous_capability": True' + tc = '"attestation_capability": True' + buc = '"batch_undo_capability": False' + bcc = '"batch_continue_capability": True' + um = '"unwrap_mode": UnwrapMode.PROCESSED' + da = '"destroy_action": DestroyAction.SHREDDED' + sa = '"shredding_algorithm": ShreddingAlgorithm.CRYPTOGRAPHIC' + rm = '"rng_mode": RNGMode.NON_SHARED_INSTANTIATION' + + v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm]) + + self.assertEqual( + "{" + v + "}", + str(capability_information) + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + CapabilityInformation structures with the same data. + """ + a = objects.CapabilityInformation() + b = objects.CapabilityInformation() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + b = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_streaming_capability(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different streaming capability + fields. + """ + a = objects.CapabilityInformation(streaming_capability=True) + b = objects.CapabilityInformation(streaming_capability=False) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_asynchronous_capability(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different asynchronous + capability fields. + """ + a = objects.CapabilityInformation(asynchronous_capability=True) + b = objects.CapabilityInformation(asynchronous_capability=False) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_attestation_capability(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different attestation capability + fields. + """ + a = objects.CapabilityInformation(attestation_capability=True) + b = objects.CapabilityInformation(attestation_capability=False) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_batch_undo_capability(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different batch undo capability + fields. + """ + a = objects.CapabilityInformation(batch_undo_capability=True) + b = objects.CapabilityInformation(batch_undo_capability=False) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_batch_continue_capability(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different batch continue + capability fields. + """ + a = objects.CapabilityInformation(batch_continue_capability=True) + b = objects.CapabilityInformation(batch_continue_capability=False) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_unwrap_mode(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different unwrap mode fields. + """ + a = objects.CapabilityInformation( + unwrap_mode=enums.UnwrapMode.PROCESSED + ) + b = objects.CapabilityInformation( + unwrap_mode=enums.UnwrapMode.NOT_PROCESSED + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_destroy_action(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different destroy action fields. + """ + a = objects.CapabilityInformation( + destroy_action=enums.DestroyAction.DELETED + ) + b = objects.CapabilityInformation( + destroy_action=enums.DestroyAction.SHREDDED + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_shredding_algorithm(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different shredding algorithm + fields. + """ + a = objects.CapabilityInformation( + shredding_algorithm=enums.ShreddingAlgorithm.UNSPECIFIED + ) + b = objects.CapabilityInformation( + shredding_algorithm=enums.ShreddingAlgorithm.UNSUPPORTED + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_rng_mode(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different RNG mode fields. + """ + a = objects.CapabilityInformation( + rng_mode=enums.RNGMode.SHARED_INSTANTIATION + ) + b = objects.CapabilityInformation( + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + CapabilityInformation structures with different types. + """ + a = objects.CapabilityInformation() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + CapabilityInformation structures with the same data. + """ + a = objects.CapabilityInformation() + b = objects.CapabilityInformation() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + b = objects.CapabilityInformation( + streaming_capability=False, + asynchronous_capability=True, + attestation_capability=True, + batch_undo_capability=False, + batch_continue_capability=True, + unwrap_mode=enums.UnwrapMode.PROCESSED, + destroy_action=enums.DestroyAction.SHREDDED, + shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC, + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_streaming_capability(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different streaming capability + fields. + """ + a = objects.CapabilityInformation(streaming_capability=True) + b = objects.CapabilityInformation(streaming_capability=False) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_asynchronous_capability(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different asynchronous + capability fields. + """ + a = objects.CapabilityInformation(asynchronous_capability=True) + b = objects.CapabilityInformation(asynchronous_capability=False) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_attestation_capability(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different attestation capability + fields. + """ + a = objects.CapabilityInformation(attestation_capability=True) + b = objects.CapabilityInformation(attestation_capability=False) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_batch_undo_capability(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different batch undo capability + fields. + """ + a = objects.CapabilityInformation(batch_undo_capability=True) + b = objects.CapabilityInformation(batch_undo_capability=False) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_batch_continue_capability(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different batch continue + capability fields. + """ + a = objects.CapabilityInformation(batch_continue_capability=True) + b = objects.CapabilityInformation(batch_continue_capability=False) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_unwrap_mode(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different unwrap mode fields. + """ + a = objects.CapabilityInformation( + unwrap_mode=enums.UnwrapMode.PROCESSED + ) + b = objects.CapabilityInformation( + unwrap_mode=enums.UnwrapMode.NOT_PROCESSED + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_destroy_action(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different destroy action fields. + """ + a = objects.CapabilityInformation( + destroy_action=enums.DestroyAction.DELETED + ) + b = objects.CapabilityInformation( + destroy_action=enums.DestroyAction.SHREDDED + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_shredding_algorithm(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different shredding algorithm + fields. + """ + a = objects.CapabilityInformation( + shredding_algorithm=enums.ShreddingAlgorithm.UNSPECIFIED + ) + b = objects.CapabilityInformation( + shredding_algorithm=enums.ShreddingAlgorithm.UNSUPPORTED + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_rng_mode(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different RNG mode fields. + """ + a = objects.CapabilityInformation( + rng_mode=enums.RNGMode.SHARED_INSTANTIATION + ) + b = objects.CapabilityInformation( + rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + CapabilityInformation structures with different types. + """ + a = objects.CapabilityInformation() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a)