diff --git a/kmip/core/objects.py b/kmip/core/objects.py index 5bccb9d..c707eca 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -6046,3 +6046,199 @@ class CapabilityInformation(primitives.Struct): return not (self == other) else: return NotImplemented + + +class ProtectionStorageMasks(primitives.Struct): + """ + A structure containing a list of protection storage masks. + + This is intended for use with KMIP 2.0+. + + Attributes: + protection_storage_masks: A list of integers representing + combined sets of ProtectionStorageMask enumerations detailing + the storage protections supported by the server. + """ + + def __init__(self, + protection_storage_masks=None, + tag=enums.Tags.PROTECTION_STORAGE_MASKS): + """ + Construct a ProtectionStorageMasks structure. + + Args: + protection_storage_masks (list): A list of integers representing + combined sets of ProtectionStorageMask enumerations detailing + the storage protections supported by the server. Optional, + defaults to None. + tag (enum): A Tags enumeration specifying which type of collection + this of protection storage masks this object represents. + Optional, defaults to Tags.PROTECTION_STORAGE_MASKS. + """ + super(ProtectionStorageMasks, self).__init__(tag=tag) + + self._protection_storage_masks = None + + self.protection_storage_masks = protection_storage_masks + + @property + def protection_storage_masks(self): + if self._protection_storage_masks: + return [x.value for x in self._protection_storage_masks] + return None + + @protection_storage_masks.setter + def protection_storage_masks(self, value): + if value is None: + self._protection_storage_masks = None + elif isinstance(value, list): + protection_storage_masks = [] + for x in value: + if isinstance(x, six.integer_types): + if enums.is_bit_mask(enums.ProtectionStorageMask, x): + protection_storage_masks.append( + primitives.Integer( + value=x, + tag=enums.Tags.PROTECTION_STORAGE_MASK + ) + ) + else: + raise TypeError( + "The protection storage masks must be a list of " + "integers representing combinations of " + "ProtectionStorageMask enumerations." + ) + else: + raise TypeError( + "The protection storage masks must be a list of " + "integers representing combinations of " + "ProtectionStorageMask enumerations." + ) + self._protection_storage_masks = protection_storage_masks + else: + raise TypeError( + "The protection storage masks must be a list of " + "integers representing combinations of " + "ProtectionStorageMask enumerations." + ) + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0): + """ + Read the data encoding the ProtectionStorageMasks structure and decode + it into its constituent parts. + + Args: + input_buffer (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 2.0. + + Raises: + VersionNotSupported: Raised when a KMIP version is provided that + does not support the ProtectionStorageMasks structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_2_0: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the ProtectionStorageMasks " + "object.".format( + kmip_version.value + ) + ) + + super(ProtectionStorageMasks, self).read( + input_buffer, + kmip_version=kmip_version + ) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) + + protection_storage_masks = [] + while self.is_tag_next( + enums.Tags.PROTECTION_STORAGE_MASK, + local_buffer + ): + protection_storage_mask = primitives.Integer( + tag=enums.Tags.PROTECTION_STORAGE_MASK + ) + protection_storage_mask.read( + local_buffer, + kmip_version=kmip_version + ) + protection_storage_masks.append(protection_storage_mask) + self._protection_storage_masks = protection_storage_masks + + self.is_oversized(local_buffer) + + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0): + """ + Write the ProtectionStorageMasks structure encoding to the data stream. + + Args: + output_buffer (stream): A data stream in which to encode + CapabilityInformation structure data, supporting a write + method. + kmip_version (enum): A KMIPVersion enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 2.0. + + Raises: + VersionNotSupported: Raised when a KMIP version is provided that + does not support the ProtectionStorageMasks structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_2_0: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the ProtectionStorageMasks " + "object.".format( + kmip_version.value + ) + ) + + local_buffer = BytearrayStream() + + if self._protection_storage_masks: + for protection_storage_mask in self._protection_storage_masks: + protection_storage_mask.write( + local_buffer, + kmip_version=kmip_version + ) + + self.length = local_buffer.length() + super(ProtectionStorageMasks, self).write( + output_buffer, + kmip_version=kmip_version + ) + output_buffer.write(local_buffer.buffer) + + def __repr__(self): + v = "protection_storage_masks={}".format( + "[{}]".format( + ", ".join(str(x) for x in self.protection_storage_masks) + ) if self._protection_storage_masks else None + ) + + return "ProtectionStorageMasks({})".format(v) + + def __str__(self): + v = '"protection_storage_masks": {}'.format( + "[{}]".format( + ", ".join(str(x) for x in self.protection_storage_masks) + ) if self._protection_storage_masks else None + ) + + return '{' + v + '}' + + def __eq__(self, other): + if isinstance(other, ProtectionStorageMasks): + if self.protection_storage_masks != other.protection_storage_masks: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ProtectionStorageMasks): + return not (self == other) + else: + return NotImplemented diff --git a/kmip/tests/unit/core/objects/test_objects.py b/kmip/tests/unit/core/objects/test_objects.py index d86f8d0..d794dfc 100644 --- a/kmip/tests/unit/core/objects/test_objects.py +++ b/kmip/tests/unit/core/objects/test_objects.py @@ -9371,3 +9371,424 @@ class TestCapabilityInformation(testtools.TestCase): self.assertTrue(a != b) self.assertTrue(b != a) + + +class TestProtectionStorageMasks(testtools.TestCase): + + def setUp(self): + super(TestProtectionStorageMasks, self).setUp() + + # This encoding matches the following set of values: + # + # Protection Storage Masks + # Protection Storage Mask - Software | Hardware + # Protection Storage Mask - On Premises | Off Premises + self.full_encoding = utils.BytearrayStream( + b'\x42\x01\x5F\x01\x00\x00\x00\x20' + b'\x42\x01\x5E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x01\x5E\x02\x00\x00\x00\x04\x00\x00\x03\x00\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # + # Protection Storage Masks + self.empty_encoding = utils.BytearrayStream( + b'\x42\x01\x5F\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestProtectionStorageMasks, self).tearDown() + + def test_invalid_protection_storage_masks(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the protection storage masks of a ProtectionStorageMasks structure. + """ + kwargs = {"protection_storage_masks": "invalid"} + self.assertRaisesRegex( + TypeError, + "The protection storage masks must be a list of integers " + "representing combinations of ProtectionStorageMask enumerations.", + objects.ProtectionStorageMasks, + **kwargs + ) + kwargs = {"protection_storage_masks": ["invalid"]} + self.assertRaisesRegex( + TypeError, + "The protection storage masks must be a list of integers " + "representing combinations of ProtectionStorageMask enumerations.", + objects.ProtectionStorageMasks, + **kwargs + ) + kwargs = {"protection_storage_masks": [0x10000000]} + self.assertRaisesRegex( + TypeError, + "The protection storage masks must be a list of integers " + "representing combinations of ProtectionStorageMask enumerations.", + objects.ProtectionStorageMasks, + **kwargs + ) + + args = ( + objects.ProtectionStorageMasks(), + "protection_storage_masks", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The protection storage masks must be a list of integers " + "representing combinations of ProtectionStorageMask enumerations.", + setattr, + *args + ) + args = ( + objects.ProtectionStorageMasks(), + "protection_storage_masks", + ["invalid"] + ) + self.assertRaisesRegex( + TypeError, + "The protection storage masks must be a list of integers " + "representing combinations of ProtectionStorageMask enumerations.", + setattr, + *args + ) + args = ( + objects.ProtectionStorageMasks(), + "protection_storage_masks", + [0x10000000] + ) + self.assertRaisesRegex( + TypeError, + "The protection storage masks must be a list of integers " + "representing combinations of ProtectionStorageMask enumerations.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a ProtectionStorageMasks structure can be correctly read in + from a data stream. + """ + protection_storage_masks = objects.ProtectionStorageMasks() + + self.assertIsNone(protection_storage_masks.protection_storage_masks) + + protection_storage_masks.read(self.full_encoding) + + self.assertEqual( + [0x03, 0x0300], + protection_storage_masks.protection_storage_masks + ) + + def test_read_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the decoding of + a ProtectionStorageMasks structure when the structure is read for an + unsupported KMIP version. + """ + protection_storage_masks = objects.ProtectionStorageMasks() + + args = (self.full_encoding, ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.2 does not support the ProtectionStorageMasks object.", + protection_storage_masks.read, + *args, + **kwargs + ) + + def test_read_empty(self): + """ + Test that a ProtectionStorageMasks structure can be correctly read in + from an empty data stream. + """ + protection_storage_masks = objects.ProtectionStorageMasks() + + self.assertIsNone(protection_storage_masks.protection_storage_masks) + + protection_storage_masks.read(self.empty_encoding) + + self.assertIsNone(protection_storage_masks.protection_storage_masks) + + def test_write(self): + """ + Test that a ProtectionStorageMasks structure can be written to a data + stream. + """ + protection_storage_masks = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + + buffer = utils.BytearrayStream() + protection_storage_masks.write(buffer) + + self.assertEqual(len(self.full_encoding), len(buffer)) + self.assertEqual(str(self.full_encoding), str(buffer)) + + def test_write_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the encoding of + a ProtectionStorageMasks structure when the structure is written for an + unsupported KMIP version. + """ + protection_storage_masks = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + + args = (utils.BytearrayStream(), ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.2 does not support the ProtectionStorageMasks object.", + protection_storage_masks.write, + *args, + **kwargs + ) + + def test_write_empty(self): + """ + Test that an empty ProtectionStorageMasks structure can be correctly + written to a data stream. + """ + protection_storage_masks = objects.ProtectionStorageMasks() + + buffer = utils.BytearrayStream() + protection_storage_masks.write(buffer) + + self.assertEqual(len(self.empty_encoding), len(buffer)) + self.assertEqual(str(self.empty_encoding), str(buffer)) + + def test_repr(self): + """ + Test that repr can be applied to a ProtectionStorageMasks structure. + """ + protection_storage_masks = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + + v = "protection_storage_masks=[3, 768]" + + self.assertEqual( + "ProtectionStorageMasks({})".format(v), + repr(protection_storage_masks) + ) + + def test_str(self): + """ + Test that str can be applied to a ProtectionStorageMasks structure. + """ + protection_storage_masks = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + + v = '"protection_storage_masks": [3, 768]' + + self.assertEqual( + "{" + v + "}", + str(protection_storage_masks) + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + ProtectionStorageMasks structures with the same data. + """ + a = objects.ProtectionStorageMasks() + b = objects.ProtectionStorageMasks() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + b = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_protection_storage_masks(self): + """ + Test that the equality operator returns False when comparing two + ProtectionStorageMasks structures with different protection storage + masks fields. + """ + a = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + b = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value + ) + ] + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + ProtectionStorageMasks structures with different types. + """ + a = objects.ProtectionStorageMasks() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + ProtectionStorageMasks structures with the same data. + """ + a = objects.ProtectionStorageMasks() + b = objects.ProtectionStorageMasks() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + b = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_protection_storage_masks(self): + """ + Test that the inequality operator returns True when comparing two + ProtectionStorageMasks structures with different protection storage + masks fields. + """ + a = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value | + enums.ProtectionStorageMask.OFF_PREMISES.value + ) + ] + ) + b = objects.ProtectionStorageMasks( + protection_storage_masks=[ + ( + enums.ProtectionStorageMask.SOFTWARE.value | + enums.ProtectionStorageMask.HARDWARE.value + ), + ( + enums.ProtectionStorageMask.ON_PREMISES.value + ) + ] + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + ProtectionStorageMasks structures with different types. + """ + a = objects.ProtectionStorageMasks() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a)