diff --git a/kmip/core/objects.py b/kmip/core/objects.py index 206b7a8..6dbfe3b 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -4500,3 +4500,244 @@ class RNGParameters(primitives.Struct): return not (self == other) else: return NotImplemented + + +class ProfileInformation(primitives.Struct): + """ + A structure containing details of supported KMIP profiles. + + This is intended for use with KMIP 1.3+. + + Attributes: + profile_name: A ProfileName enumeration identifying the specific + profile supported. + server_uri: A string specifying a Uniform Resource Identifier that + points to the location of the server supporting the profile. + server_port: An integer specifying the port number to use when + accessing the server supporting the profile. + """ + + def __init__(self, profile_name=None, server_uri=None, server_port=None): + """ + Construct a ProfileInformation structure. + + Args: + profile_name (enum): A ProfileName enumeration identifying the + specific profile supported. Optional, defaults to None. + Required for read/write. + server_uri (string): A string specifying a Uniform Resource + Identifier that points to the location of the server + supporting the profile. Optional, defaults to None. + server_port (int): An integer specifying the port number to use + when accessing the server supporting the profile. Optional, + defaults to None. + """ + super(ProfileInformation, self).__init__( + tag=enums.Tags.PROFILE_INFORMATION + ) + + self._profile_name = None + self._server_uri = None + self._server_port = None + + self.profile_name = profile_name + self.server_uri = server_uri + self.server_port = server_port + + @property + def profile_name(self): + if self._profile_name: + return self._profile_name.value + return None + + @profile_name.setter + def profile_name(self, value): + if value is None: + self._profile_name = None + elif isinstance(value, enums.ProfileName): + self._profile_name = primitives.Enumeration( + enums.ProfileName, + value=value, + tag=enums.Tags.PROFILE_NAME + ) + else: + raise TypeError( + "The profile name must be a ProfileName enumeration." + ) + + @property + def server_uri(self): + if self._server_uri: + return self._server_uri.value + return None + + @server_uri.setter + def server_uri(self, value): + if value is None: + self._server_uri = None + elif isinstance(value, six.string_types): + self._server_uri = primitives.TextString( + value=value, + tag=enums.Tags.SERVER_URI + ) + else: + raise TypeError("The server URI must be a string.") + + @property + def server_port(self): + if self._server_port: + return self._server_port.value + return None + + @server_port.setter + def server_port(self, value): + if value is None: + self._server_port = None + elif isinstance(value, six.integer_types): + self._server_port = primitives.Integer( + value=value, + tag=enums.Tags.SERVER_PORT + ) + else: + raise TypeError("The server port must be an integer.") + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3): + """ + Read the data encoding the ProfileInformation structure and decode it + into its constituent parts. + + Args: + input_buffer (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 2.0. + + Raises: + InvalidKmipEncoding: Raised if the profile name is missing from + the encoding. + VersionNotSupported: Raised when a KMIP version is provided that + does not support the ProfileInformation structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_1_3: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the ProfileInformation " + "object.".format( + kmip_version.value + ) + ) + + super(ProfileInformation, self).read( + input_buffer, + kmip_version=kmip_version + ) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) + + if self.is_tag_next(enums.Tags.PROFILE_NAME, local_buffer): + profile_name = primitives.Enumeration( + enums.ProfileName, + tag=enums.Tags.PROFILE_NAME + ) + profile_name.read(local_buffer, kmip_version=kmip_version) + self._profile_name = profile_name + else: + raise exceptions.InvalidKmipEncoding( + "The ProfileInformation encoding is missing the profile name." + ) + + if self.is_tag_next(enums.Tags.SERVER_URI, local_buffer): + server_uri = primitives.TextString(tag=enums.Tags.SERVER_URI) + server_uri.read(local_buffer, kmip_version=kmip_version) + self._server_uri = server_uri + + if self.is_tag_next(enums.Tags.SERVER_PORT, local_buffer): + server_port = primitives.Integer(tag=enums.Tags.SERVER_PORT) + server_port.read(local_buffer, kmip_version=kmip_version) + self._server_port = server_port + + self.is_oversized(local_buffer) + + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3): + """ + Write the ProfileInformation structure encoding to the data stream. + + Args: + output_buffer (stream): A data stream in which to encode + ProfileInformation structure data, supporting a write method. + kmip_version (enum): A KMIPVersion enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 2.0. + + Raises: + InvalidField: Raised if the profile name field is not defined. + VersionNotSupported: Raised when a KMIP version is provided that + does not support the ProfileInformation structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_1_3: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the ProfileInformation " + "object.".format( + kmip_version.value + ) + ) + + local_buffer = BytearrayStream() + + if self._profile_name: + self._profile_name.write(local_buffer, kmip_version=kmip_version) + else: + raise exceptions.InvalidField( + "The ProfileInformation structure is missing the profile " + "name field." + ) + + if self._server_uri: + self._server_uri.write(local_buffer, kmip_version=kmip_version) + + if self._server_port: + self._server_port.write(local_buffer, kmip_version=kmip_version) + + self.length = local_buffer.length() + super(ProfileInformation, self).write( + output_buffer, + kmip_version=kmip_version + ) + output_buffer.write(local_buffer.buffer) + + def __repr__(self): + n = "profile_name={}".format(self.profile_name) + u = 'server_uri="{}"'.format(self.server_uri) + p = "server_port={}".format(self.server_port) + + v = ", ".join([n, u, p]) + + return "ProfileInformation({})".format(v) + + def __str__(self): + n = '"profile_name": {}'.format(self.profile_name) + u = '"server_uri": "{}"'.format(self.server_uri) + p = '"server_port": {}'.format(self.server_port) + + v = ", ".join([n, u, p]) + + return '{' + v + '}' + + def __eq__(self, other): + if isinstance(other, ProfileInformation): + if self.profile_name != other.profile_name: + return False + elif self.server_uri != other.server_uri: + return False + elif self.server_port != other.server_port: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ProfileInformation): + return not (self == other) + else: + return NotImplemented diff --git a/kmip/tests/unit/core/objects/test_objects.py b/kmip/tests/unit/core/objects/test_objects.py index 7f5e9f4..43576b0 100644 --- a/kmip/tests/unit/core/objects/test_objects.py +++ b/kmip/tests/unit/core/objects/test_objects.py @@ -6513,3 +6513,458 @@ class TestRNGParameters(testtools.TestCase): self.assertTrue(a != b) self.assertTrue(b != a) + + +class TestProfileInformation(testtools.TestCase): + + def setUp(self): + super(TestProfileInformation, self).setUp() + + # This encoding matches the following set of values: + # + # Profile Information + # Profile Name - BASELINE_SERVER_BASIC_KMIPv12 + # Server URI - https://example.com + # Server Port - 5696 + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\xEB\x01\x00\x00\x00\x40' + b'\x42\x00\xEC\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xED\x07\x00\x00\x00\x13' + b'\x68\x74\x74\x70\x73\x3A\x2F\x2F\x65\x78\x61\x6D\x70\x6C\x65\x2E' + b'\x63\x6F\x6D\x00\x00\x00\x00\x00' + b'\x42\x00\xEE\x02\x00\x00\x00\x04\x00\x00\x16\x40\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # + # Profile Information + # Server URI - https://example.com + # Server Port - 5696 + self.no_profile_name_encoding = utils.BytearrayStream( + b'\x42\x00\xEB\x01\x00\x00\x00\x30' + b'\x42\x00\xED\x07\x00\x00\x00\x13' + b'\x68\x74\x74\x70\x73\x3A\x2F\x2F\x65\x78\x61\x6D\x70\x6C\x65\x2E' + b'\x63\x6F\x6D\x00\x00\x00\x00\x00' + b'\x42\x00\xEE\x02\x00\x00\x00\x04\x00\x00\x16\x40\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # + # Profile Information + # Profile Name - BASELINE_SERVER_BASIC_KMIPv12 + self.only_profile_name_encoding = utils.BytearrayStream( + b'\x42\x00\xEB\x01\x00\x00\x00\x10' + b'\x42\x00\xEC\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestProfileInformation, self).tearDown() + + def test_invalid_profile_name(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the profile name of a ProfileInformation structure. + """ + kwargs = {"profile_name": "invalid"} + self.assertRaisesRegex( + TypeError, + "The profile name must be a ProfileName enumeration.", + objects.ProfileInformation, + **kwargs + ) + + args = (objects.ProfileInformation(), "profile_name", "invalid") + self.assertRaisesRegex( + TypeError, + "The profile name must be a ProfileName enumeration.", + setattr, + *args + ) + + def test_invalid_server_uri(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the server URI of a ProfileInformation structure. + """ + kwargs = {"server_uri": 0} + self.assertRaisesRegex( + TypeError, + "The server URI must be a string.", + objects.ProfileInformation, + **kwargs + ) + + args = (objects.ProfileInformation(), "server_uri", 0) + self.assertRaisesRegex( + TypeError, + "The server URI must be a string.", + setattr, + *args + ) + + def test_invalid_server_port(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the server port of a ProfileInformation structure. + """ + kwargs = {"server_port": "invalid"} + self.assertRaisesRegex( + TypeError, + "The server port must be an integer.", + objects.ProfileInformation, + **kwargs + ) + + args = (objects.ProfileInformation(), "server_port", "invalid") + self.assertRaisesRegex( + TypeError, + "The server port must be an integer.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a ProfileInformation structure can be correctly read in + from a data stream. + """ + profile_information = objects.ProfileInformation() + + self.assertIsNone(profile_information.profile_name) + self.assertIsNone(profile_information.server_uri) + self.assertIsNone(profile_information.server_port) + + profile_information.read( + self.full_encoding, + kmip_version=enums.KMIPVersion.KMIP_1_3 + ) + + self.assertEqual( + enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + profile_information.profile_name + ) + self.assertEqual("https://example.com", profile_information.server_uri) + self.assertEqual(5696, profile_information.server_port) + + def test_read_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the decoding of + a ProfileInformation structure when the structure is read for an + unsupported KMIP version. + """ + profile_information = objects.ProfileInformation() + + args = (self.full_encoding, ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.2 does not support the ProfileInformation object.", + profile_information.read, + *args, + **kwargs + ) + + def test_read_missing_profile_name(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of a ProfileInformation structure when the profile name is missing + from the encoding. + """ + profile_information = objects.ProfileInformation() + + args = (self.no_profile_name_encoding, ) + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The ProfileInformation encoding is missing the profile name.", + profile_information.read, + *args + ) + + def test_read_only_profile_name(self): + """ + Test that a ProfileInformation structure can be correctly read in + from a data stream even when missing all fields except the profile + name. + """ + profile_information = objects.ProfileInformation() + + self.assertIsNone(profile_information.profile_name) + self.assertIsNone(profile_information.server_uri) + self.assertIsNone(profile_information.server_port) + + profile_information.read( + self.only_profile_name_encoding, + kmip_version=enums.KMIPVersion.KMIP_1_3 + ) + + self.assertEqual( + enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + profile_information.profile_name + ) + self.assertIsNone(profile_information.server_uri) + self.assertIsNone(profile_information.server_port) + + def test_write(self): + """ + Test that a ProfileInformation structure can be written to a data + stream. + """ + profile_information = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + + buffer = utils.BytearrayStream() + profile_information.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_1_3 + ) + + self.assertEqual(len(self.full_encoding), len(buffer)) + self.assertEqual(str(self.full_encoding), str(buffer)) + + def test_write_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the encoding of + a ProfileInformation structure when the structure is written for an + unsupported KMIP version. + """ + profile_information = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + + args = (utils.BytearrayStream(), ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.2 does not support the ProfileInformation object.", + profile_information.write, + *args, + **kwargs + ) + + def test_write_missing_profile_name(self): + """ + Test that an InvalidField error is raised during the encoding of an + ProfileInformation structure when the structure is missing the profile + name field. + """ + profile_information = objects.ProfileInformation() + + args = (utils.BytearrayStream(), ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The ProfileInformation structure is missing the profile name " + "field.", + profile_information.write, + *args + ) + + def test_write_only_profile_name(self): + """ + Test that a ProfileInformation structure can be written to a data + stream even when missing all fields except the profile name. + """ + profile_information = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12 + ) + + buffer = utils.BytearrayStream() + profile_information.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_1_3 + ) + + self.assertEqual(len(self.only_profile_name_encoding), len(buffer)) + self.assertEqual(str(self.only_profile_name_encoding), str(buffer)) + + def test_repr(self): + """ + Test that repr can be applied to a ProfileInformation structure. + """ + profile_information = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + + n = "profile_name=ProfileName.BASELINE_SERVER_BASIC_KMIPv12" + u = 'server_uri="https://example.com"' + p = "server_port=5696" + + v = ", ".join([n, u, p]) + + self.assertEqual( + "ProfileInformation({})".format(v), + repr(profile_information) + ) + + def test_str(self): + """ + Test that str can be applied to a ProfileInformation structure. + """ + profile_information = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + + n = '"profile_name": ProfileName.BASELINE_SERVER_BASIC_KMIPv12' + u = '"server_uri": "https://example.com"' + p = '"server_port": 5696' + + v = ", ".join([n, u, p]) + + self.assertEqual( + "{" + v + "}", + str(profile_information) + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + ProfileInformation structures with the same data. + """ + a = objects.ProfileInformation() + b = objects.ProfileInformation() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + b = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_profile_name(self): + """ + Test that the equality operator returns False when comparing two + ProfileInformation structures with different profile name fields. + """ + a = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12 + ) + b = objects.ProfileInformation( + profile_name=enums.ProfileName.TAPE_LIBRARY_CLIENT_KMIPv10 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_server_uri(self): + """ + Test that the equality operator returns False when comparing two + ProfileInformation structures with different server URI fields. + """ + a = objects.ProfileInformation(server_uri="https://example.com") + b = objects.ProfileInformation(server_uri="https://test.com") + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_server_port(self): + """ + Test that the equality operator returns False when comparing two + ProfileInformation structures with different server port fields. + """ + a = objects.ProfileInformation(server_port=5696) + b = objects.ProfileInformation(server_port=5697) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + ProfileInformation structures with different types. + """ + a = objects.ProfileInformation() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + ProfileInformation structures with the same data. + """ + a = objects.ProfileInformation() + b = objects.ProfileInformation() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + b = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12, + server_uri="https://example.com", + server_port=5696 + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_profile_name(self): + """ + Test that the inequality operator returns True when comparing two + ProfileInformation structures with different profile name fields. + """ + a = objects.ProfileInformation( + profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12 + ) + b = objects.ProfileInformation( + profile_name=enums.ProfileName.TAPE_LIBRARY_CLIENT_KMIPv10 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_server_uri(self): + """ + Test that the inequality operator returns True when comparing two + ProfileInformation structures with different server URI fields. + """ + a = objects.ProfileInformation(server_uri="https://example.com") + b = objects.ProfileInformation(server_uri="https://test.com") + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_server_port(self): + """ + Test that the inequality operator returns True when comparing two + ProfileInformation structures with different server port fields. + """ + a = objects.ProfileInformation(server_port=5696) + b = objects.ProfileInformation(server_port=5697) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + ProfileInformation structures with different types. + """ + a = objects.ProfileInformation() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a)