From 938a0a3b16938bebe17f2a985db2954fdb560e83 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 8 Mar 2019 10:51:55 -0500 Subject: [PATCH] Update the Locate payloads This change updates the Locate payloads to the current payload format, adding properties for different payload attributes and adding comparison and string operators. Changes are also made to the PyKMIP clients and the surrounding testing infrastructure to reflect the payload changes. An official unit test suite for the Locate payloads has also been included, which will eventually replace the existing Locate message tests elsewhere in the test suite. This change prepares the Locate payloads for future updates to support KMIP 2.0. --- kmip/core/messages/payloads/locate.py | 556 +++++- kmip/pie/client.py | 3 +- kmip/services/kmip_client.py | 31 +- kmip/services/server/engine.py | 6 +- .../core/messages/payloads/test_locate.py | 1535 ++++++++++++++++- .../tests/unit/core/messages/test_messages.py | 2 +- kmip/tests/unit/pie/test_client.py | 3 +- .../tests/unit/services/server/test_engine.py | 12 +- 8 files changed, 2018 insertions(+), 130 deletions(-) diff --git a/kmip/core/messages/payloads/locate.py b/kmip/core/messages/payloads/locate.py index 785878f..e6f7b16 100644 --- a/kmip/core/messages/payloads/locate.py +++ b/kmip/core/messages/payloads/locate.py @@ -13,139 +13,505 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core import attributes +import six + from kmip.core import enums -from kmip.core.enums import Tags - -from kmip.core.objects import Attribute - -from kmip.core.primitives import Struct -from kmip.core.primitives import Enumeration -from kmip.core.primitives import Integer - -from kmip.core.utils import BytearrayStream +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils -class LocateRequestPayload(Struct): +class LocateRequestPayload(primitives.Struct): + """ + A request payload for the Locate operation. - # 9.1.3.2.33 - class ObjectGroupMember(Enumeration): + Attributes: + maximum_items: The maximum number of object identifiers to be returned. + offset_items: The number of object identifiers to skip when selecting + the object identifiers to return. + storage_status_mask: A bit mask specifying which types of stored + objects should be searched. + object_group_member: The object group member type for the searched + objects. + attributes: The attributes that should be used to filter and match + objects. + """ - def __init__(self, value=None): - super(LocateRequestPayload.ObjectGroupMember, self).__init__( - enums.ObjectGroupMember, value, Tags.OBJECT_GROUP_MEMBER) + def __init__(self, + maximum_items=None, + offset_items=None, + storage_status_mask=None, + object_group_member=None, + attributes=None): + """ + Construct a Locate request payload structure. - class MaximumItems(Integer): - def __init__(self, value=None): - super(LocateRequestPayload.MaximumItems, self).__init__( - value, Tags.MAXIMUM_ITEMS) - - # 9.1.3.3.2 - class StorageStatusMask(Enumeration): - - def __init__(self, value=None): - super(LocateRequestPayload.StorageStatusMask, self).__init__( - enums.StorageStatusMask, value, Tags.STORAGE_STATUS_MASK) - - def __init__(self, maximum_items=None, storage_status_mask=None, - object_group_member=None, attributes=None): + Args: + maximum_items (int): An integer specifying the maximum number of + object identifiers to be returned. Optional, defaults to None. + offset_items (int): An integer specifying the number of object + identifiers to skip when selecting the object identifiers to + return. Optional, defaults to None. + storage_status_mask (int, list): An integer bit mask or a + corresponding list of StorageStatusMask enumerations indicating + which types of stored objects should be searched. Optional, + defaults to None. + object_group_member (enum): An ObjectGroupMember enumeration + specifying the object group member type for the searched + objects. Optional, defaults to None. + attributes (list): A list of Attribute structures containing the + attribute values that should be used to filter and match + objects. Optional, defaults to None. + """ super(LocateRequestPayload, self).__init__(enums.Tags.REQUEST_PAYLOAD) + + self._maximum_items = None + self._offset_items = None + self._storage_status_mask = None + self._object_group_member = None + self._attributes = None + self.maximum_items = maximum_items + self.offset_items = offset_items self.storage_status_mask = storage_status_mask self.object_group_member = object_group_member - self.attributes = attributes or [] - self.validate() + self.attributes = attributes - def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): + @property + def maximum_items(self): + if self._maximum_items: + return self._maximum_items.value + else: + return None + + @maximum_items.setter + def maximum_items(self, value): + if value is None: + self._maximum_items = None + elif isinstance(value, six.integer_types): + self._maximum_items = primitives.Integer( + value=value, + tag=enums.Tags.MAXIMUM_ITEMS + ) + else: + raise TypeError("Maximum items must be an integer.") + + @property + def offset_items(self): + if self._offset_items: + return self._offset_items.value + else: + return None + + @offset_items.setter + def offset_items(self, value): + if value is None: + self._offset_items = None + elif isinstance(value, six.integer_types): + self._offset_items = primitives.Integer( + value=value, + tag=enums.Tags.OFFSET_ITEMS + ) + else: + raise TypeError("Offset items must be an integer.") + + @property + def storage_status_mask(self): + if self._storage_status_mask: + return self._storage_status_mask.value + else: + return None + + @storage_status_mask.setter + def storage_status_mask(self, value): + if value is None: + self._storage_status_mask = None + elif isinstance(value, six.integer_types): + if enums.is_bit_mask(enums.StorageStatusMask, value): + self._storage_status_mask = primitives.Integer( + value=value, + tag=enums.Tags.STORAGE_STATUS_MASK + ) + else: + raise TypeError( + "Storage status mask must be an integer representing a " + "valid StorageStatusMask bit mask." + ) + else: + raise TypeError( + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask." + ) + + @property + def object_group_member(self): + if self._object_group_member: + return self._object_group_member.value + else: + return None + + @object_group_member.setter + def object_group_member(self, value): + if value is None: + self._object_group_member = None + elif isinstance(value, enums.ObjectGroupMember): + self._object_group_member = primitives.Enumeration( + enums.ObjectGroupMember, + value=value, + tag=enums.Tags.OBJECT_GROUP_MEMBER + ) + else: + raise TypeError( + "Object group member must be an ObjectGroupMember enumeration." + ) + + @property + def attributes(self): + if self._attributes: + return self._attributes + return [] + + @attributes.setter + def attributes(self, value): + if value is None: + self._attributes = [] + elif isinstance(value, list): + for v in value: + if not isinstance(v, objects.Attribute): + raise TypeError( + "Attributes must be a list of Attribute structures." + ) + self._attributes = value + else: + raise TypeError( + "Attributes must be a list of Attribute structures." + ) + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Read the data encoding the Locate request payload and decode it into + its constituent parts. + + Args: + input_buffer (stream): A data buffer containing encoded object + data, supporting a read method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 1.0. + """ super(LocateRequestPayload, self).read( - istream, + input_buffer, kmip_version=kmip_version ) - tstream = BytearrayStream(istream.read(self.length)) - if self.is_tag_next(Tags.MAXIMUM_ITEMS, tstream): - self.maximum_items = LocateRequestPayload.MaximumItems() - self.maximum_items.read(tstream, kmip_version=kmip_version) - if self.is_tag_next(Tags.STORAGE_STATUS_MASK, tstream): - self.storage_status_mask = LocateRequestPayload.StorageStatusMask() - self.storage_status_mask.read(tstream, kmip_version=kmip_version) - if self.is_tag_next(Tags.OBJECT_GROUP_MEMBER, tstream): - self.object_group_member = LocateRequestPayload.ObjectGroupMember() - self.object_group_member.read(tstream, kmip_version=kmip_version) - while self.is_tag_next(Tags.ATTRIBUTE, tstream): - attr = Attribute() - attr.read(tstream, kmip_version=kmip_version) - self.attributes.append(attr) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) - self.validate() + if self.is_tag_next(enums.Tags.MAXIMUM_ITEMS, local_buffer): + self._maximum_items = primitives.Integer( + tag=enums.Tags.MAXIMUM_ITEMS + ) + self._maximum_items.read( + local_buffer, + kmip_version=kmip_version + ) - def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): - tstream = BytearrayStream() - if self.maximum_items is not None: - self.maximum_items.write(tstream, kmip_version=kmip_version) - if self.storage_status_mask is not None: - self.storage_status_mask.write(tstream, kmip_version=kmip_version) - if self.object_group_member is not None: - self.object_group_member.write(tstream, kmip_version=kmip_version) - if self.attributes is not None: - for a in self.attributes: - a.write(tstream, kmip_version=kmip_version) + if self.is_tag_next(enums.Tags.OFFSET_ITEMS, local_buffer): + self._offset_items = primitives.Integer( + tag=enums.Tags.OFFSET_ITEMS + ) + self._offset_items.read( + local_buffer, + kmip_version=kmip_version + ) - # Write the length and value of the request payload - self.length = tstream.length() + if self.is_tag_next(enums.Tags.STORAGE_STATUS_MASK, local_buffer): + self._storage_status_mask = primitives.Integer( + tag=enums.Tags.STORAGE_STATUS_MASK + ) + self._storage_status_mask.read( + local_buffer, + kmip_version=kmip_version + ) + + if self.is_tag_next(enums.Tags.OBJECT_GROUP_MEMBER, local_buffer): + self._object_group_member = primitives.Enumeration( + enums.ObjectGroupMember, + tag=enums.Tags.OBJECT_GROUP_MEMBER + ) + self._object_group_member.read( + local_buffer, + kmip_version=kmip_version + ) + + while self.is_tag_next(enums.Tags.ATTRIBUTE, local_buffer): + attribute = objects.Attribute() + attribute.read(local_buffer, kmip_version=kmip_version) + self._attributes.append(attribute) + + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Write the data encoding the Locate request payload to a buffer. + + Args: + output_buffer (stream): A data buffer in which to encode object + data, supporting a write method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 1.0. + """ + local_buffer = utils.BytearrayStream() + + if self._maximum_items: + self._maximum_items.write(local_buffer, kmip_version=kmip_version) + + if self._offset_items: + self._offset_items.write(local_buffer, kmip_version=kmip_version) + + if self._storage_status_mask: + self._storage_status_mask.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._object_group_member: + self._object_group_member.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._attributes: + for attribute in self.attributes: + attribute.write(local_buffer, kmip_version=kmip_version) + + self.length = local_buffer.length() super(LocateRequestPayload, self).write( - ostream, + output_buffer, kmip_version=kmip_version ) - ostream.write(tstream.buffer) + output_buffer.write(local_buffer.buffer) - def validate(self): - self._validate() + def __eq__(self, other): + if isinstance(other, LocateRequestPayload): + if self.maximum_items != other.maximum_items: + return False + elif self.offset_items != other.offset_items: + return False + elif self.storage_status_mask != other.storage_status_mask: + return False + elif self.object_group_member != other.object_group_member: + return False + elif self.attributes != other.attributes: + return False + else: + return True + else: + return NotImplemented - def _validate(self): - # TODO Finish implementation. - pass + def __ne__(self, other): + if isinstance(other, LocateRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "maximum_items={}".format(self.maximum_items), + "offset_items={}".format(self.offset_items), + "storage_status_mask={}".format(self.storage_status_mask), + "object_group_member={}".format(self.object_group_member), + "attributes={}".format( + [repr(attribute) for attribute in self.attributes] + ) + ]) + return "LocateRequestPayload({})".format(args) + + def __str__(self): + value = ", ".join( + [ + '"maximum_items": {}'.format(self.maximum_items), + '"offset_items": {}'.format(self.offset_items), + '"storage_status_mask": {}'.format(self.storage_status_mask), + '"object_group_member": {}'.format(self.object_group_member), + '"attributes": {}'.format( + [str(attribute) for attribute in self.attributes] + ) + ] + ) + return '{' + value + '}' -class LocateResponsePayload(Struct): +class LocateResponsePayload(primitives.Struct): + """ + A response payload for the Locate operation. - def __init__(self, unique_identifiers=[]): + Attributes: + located_items: The number of matching objects found by the server. + unique_identifiers: The object identifiers for the matching objects. + """ + + def __init__(self, + located_items=None, + unique_identifiers=None): + """ + Construct a Locate response payload structure. + + Args: + located_items (int): An integer specifying the number of matching + objects found by the server. Note that this may not equal the + number of object identifiers returned in this payload. + Optional, defaults to None. + unique_identifiers (list): A list of strings specifying the object + identifiers for matching objects. Optional, defaults to None. + """ super(LocateResponsePayload, self).__init__( enums.Tags.RESPONSE_PAYLOAD) - self.unique_identifiers = unique_identifiers or [] - self.validate() - def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): + self._located_items = None + self._unique_identifiers = None + + self.located_items = located_items + self.unique_identifiers = unique_identifiers + + @property + def located_items(self): + if self._located_items: + return self._located_items.value + return None + + @located_items.setter + def located_items(self, value): + if value is None: + self._located_items = None + elif isinstance(value, six.integer_types): + self._located_items = primitives.Integer( + value=value, + tag=enums.Tags.LOCATED_ITEMS + ) + else: + raise TypeError("Located items must be an integer.") + + @property + def unique_identifiers(self): + if self._unique_identifiers: + return [x.value for x in self._unique_identifiers] + return [] + + @unique_identifiers.setter + def unique_identifiers(self, value): + if value is None: + self._unique_identifiers = [] + elif isinstance(value, list): + self._unique_identifiers = [] + for v in value: + if not isinstance(v, six.string_types): + self._unique_identifiers = [] + raise TypeError( + "Unique identifiers must be a list of strings." + ) + self._unique_identifiers.append( + primitives.TextString( + value=v, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + ) + else: + raise TypeError("Unique identifiers must be a list of strings.") + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Read the data encoding the Locate response payload and decode it + into its constituent parts. + + Args: + input_buffer (stream): A data buffer containing encoded object + data, supporting a read method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 1.0. + """ super(LocateResponsePayload, self).read( - istream, + input_buffer, kmip_version=kmip_version ) - tstream = BytearrayStream(istream.read(self.length)) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) - while self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream): - ui = attributes.UniqueIdentifier() - ui.read(tstream, kmip_version=kmip_version) - self.unique_identifiers.append(ui) + if self.is_tag_next(enums.Tags.LOCATED_ITEMS, local_buffer): + self._located_items = primitives.Integer( + tag=enums.Tags.LOCATED_ITEMS + ) + self._located_items.read( + local_buffer, + kmip_version=kmip_version + ) - self.is_oversized(tstream) - self.validate() + self._unique_identifiers = [] + while self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_buffer): + unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + unique_identifier.read(local_buffer, kmip_version=kmip_version) + self._unique_identifiers.append(unique_identifier) - def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): - tstream = BytearrayStream() + self.is_oversized(local_buffer) - for ui in self.unique_identifiers: - ui.write(tstream, kmip_version=kmip_version) + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Write the data encoding the Locate response payload to a buffer. - # Write the length and value of the request payload - self.length = tstream.length() + Args: + output_buffer (stream): A data buffer in which to encode object + data, supporting a write method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 1.0. + """ + local_buffer = utils.BytearrayStream() + + if self._located_items: + self._located_items.write(local_buffer, kmip_version=kmip_version) + + if self._unique_identifiers: + for unique_identifier in self._unique_identifiers: + unique_identifier.write( + local_buffer, + kmip_version=kmip_version + ) + + self.length = local_buffer.length() super(LocateResponsePayload, self).write( - ostream, + output_buffer, kmip_version=kmip_version ) - ostream.write(tstream.buffer) + output_buffer.write(local_buffer.buffer) - def validate(self): - self.__validate() + def __eq__(self, other): + if isinstance(other, LocateResponsePayload): + if self.located_items != other.located_items: + return False + elif self.unique_identifiers != other.unique_identifiers: + return False + else: + return True + else: + return NotImplemented - def __validate(self): - # TODO Finish implementation. - pass + def __ne__(self, other): + if isinstance(other, LocateResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "located_items={}".format(self.located_items), + "unique_identifiers={}".format(self.unique_identifiers) + ]) + return "LocateResponsePayload({})".format(args) + + def __str__(self): + value = ", ".join( + [ + '"located_items": {}'.format(self.located_items), + '"unique_identifiers": {}'.format(self.unique_identifiers), + ] + ) + return '{' + value + '}' diff --git a/kmip/pie/client.py b/kmip/pie/client.py index d126b7d..a82fd0e 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -707,8 +707,7 @@ class ProxyKmipClient(object): status = result.result_status.value if status == enums.ResultStatus.SUCCESS: - uids = [uuid.value for uuid in result.uuids] - return uids + return result.uuids else: reason = result.result_reason.value message = result.result_message.value diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 594d2d3..0e6d3e3 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -1476,30 +1476,21 @@ class KMIPProxy(object): return result def _locate(self, maximum_items=None, storage_status_mask=None, - object_group_member=None, attributes=[], credential=None): + object_group_member=None, attributes=None, credential=None): operation = Operation(OperationEnum.LOCATE) - mxi = None - ssmask = None - objgrp = None + payload = payloads.LocateRequestPayload( + maximum_items=maximum_items, + storage_status_mask=storage_status_mask, + object_group_member=object_group_member, + attributes=attributes + ) - if maximum_items is not None: - mxi = payloads.LocateRequestPayload.MaximumItems(maximum_items) - if storage_status_mask is not None: - m = storage_status_mask - ssmask = payloads.LocateRequestPayload.StorageStatusMask(m) - if object_group_member is not None: - o = object_group_member - objgrp = payloads.LocateRequestPayload.ObjectGroupMember(o) - - payload = payloads.LocateRequestPayload(maximum_items=mxi, - storage_status_mask=ssmask, - object_group_member=objgrp, - attributes=attributes) - - batch_item = messages.RequestBatchItem(operation=operation, - request_payload=payload) + batch_item = messages.RequestBatchItem( + operation=operation, + request_payload=payload + ) message = self._build_request_message(credential, [batch_item]) self._send_message(message) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index a0fe30e..d66574f 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1563,9 +1563,9 @@ class KmipEngine(object): managed_objects = managed_objects_filtered - unique_identifiers = [attributes.UniqueIdentifier( - str(managed_object.unique_identifier)) - for managed_object in managed_objects] + unique_identifiers = [ + str(x.unique_identifier) for x in managed_objects + ] response_payload = payloads.LocateResponsePayload( unique_identifiers=unique_identifiers diff --git a/kmip/tests/unit/core/messages/payloads/test_locate.py b/kmip/tests/unit/core/messages/payloads/test_locate.py index 87b311e..307bad7 100644 --- a/kmip/tests/unit/core/messages/payloads/test_locate.py +++ b/kmip/tests/unit/core/messages/payloads/test_locate.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory +# Copyright (c) 2019 The Johns Hopkins University/Applied Physics Laboratory # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -12,3 +12,1536 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import testtools + +from kmip.core import enums +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils + +from kmip.core.messages import payloads + + +class TestLocateRequestPayload(testtools.TestCase): + + def setUp(self): + super(TestLocateRequestPayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x80' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_maximum_items_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Storage Status Mask field. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_offset_items_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items field. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_storage_status_mask_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_object_group_member_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + self.no_attributes_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x40' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Request Payload + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestLocateRequestPayload, self).tearDown() + + def test_invalid_maximum_items(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the maximum items of a Locate request payload. + """ + kwargs = {"maximum_items": "invalid"} + self.assertRaisesRegex( + TypeError, + "Maximum items must be an integer.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "maximum_items", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Maximum items must be an integer.", + setattr, + *args + ) + + def test_invalid_offset_items(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the offset items of a Locate request payload. + """ + kwargs = {"offset_items": "invalid"} + self.assertRaisesRegex( + TypeError, + "Offset items must be an integer.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "offset_items", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Offset items must be an integer.", + setattr, + *args + ) + + def test_invalid_storage_status_mask(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the storage status mask of a Locate request payload. + """ + kwargs = {"storage_status_mask": "invalid"} + self.assertRaisesRegex( + TypeError, + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask.", + payloads.LocateRequestPayload, + **kwargs + ) + + kwargs = {"storage_status_mask": 55} + self.assertRaisesRegex( + TypeError, + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "storage_status_mask", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask.", + setattr, + *args + ) + + def test_invalid_object_group_member(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the object group member of a Locate request payload. + """ + kwargs = {"object_group_member": "invalid"} + self.assertRaisesRegex( + TypeError, + "Object group member must be an ObjectGroupMember enumeration.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "object_group_member", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Object group member must be an ObjectGroupMember enumeration.", + setattr, + *args + ) + + def test_invalid_attributes(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the attributes of a Locate request payload. + """ + kwargs = {"attributes": "invalid"} + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + payloads.LocateRequestPayload, + **kwargs + ) + + kwargs = {"attributes": ["invalid"]} + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "attributes", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + setattr, + *args + ) + + args = ( + payloads.LocateRequestPayload(), + "attributes", + ["invalid"] + ) + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Locate request payload can be read from a data stream. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.full_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_maximum_items(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the maximum items. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_maximum_items_encoding) + + self.assertIsNone(payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_offset_items(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the offset items. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_offset_items_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_storage_status_mask(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the storage status mask. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_storage_status_mask_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_object_group_member(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the object group member. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_object_group_member_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertIsNone(payload.object_group_member) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_attributes(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the attributes. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_attributes_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertEqual([], payload.attributes) + + def test_read_missing_everything(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing all fields. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.empty_encoding) + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + def test_write(self): + """ + Test that a Locate request payload can be written to a data stream. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_maximum_items(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the maximum items. + """ + payload = payloads.LocateRequestPayload( + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_maximum_items_encoding), len(stream)) + self.assertEqual(str(self.no_maximum_items_encoding), str(stream)) + + def test_write_missing_offset_items(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the offset items. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_offset_items_encoding), len(stream)) + self.assertEqual(str(self.no_offset_items_encoding), str(stream)) + + def test_write_missing_storage_status_mask(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the storage status mask. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual( + len(self.no_storage_status_mask_encoding), + len(stream) + ) + self.assertEqual( + str(self.no_storage_status_mask_encoding), + str(stream) + ) + + def test_write_missing_object_group_member(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the object group member. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual( + len(self.no_object_group_member_encoding), + len(stream) + ) + self.assertEqual( + str(self.no_object_group_member_encoding), + str(stream) + ) + + def test_write_missing_attributes(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the attributes. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_attributes_encoding), len(stream)) + self.assertEqual(str(self.no_attributes_encoding), str(stream)) + + def test_write_missing_everything(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing all fields. + """ + payload = payloads.LocateRequestPayload() + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_repr(self): + """ + Test that repr can be applied to a Locate request payload structure. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + s = "LocateRequestPayload(" \ + "maximum_items=1, " \ + "offset_items=1, " \ + "storage_status_mask=3, " \ + "object_group_member=ObjectGroupMember.GROUP_MEMBER_DEFAULT, " \ + "attributes=[" +# "Attribute(" \ +# "attribute_name=AttributeName(value='Object Group'), " \ +# "attribute_index=None, " \ +# "attribute_value=TextString(value='RoundRobinTestGroup'))" \ +# "])" + + # TODO (ph) Uncomment above when Attribute repr fixed. Fix below too. + + self.assertTrue(repr(payload).startswith(s)) + + def str(self): + """ + Test that str can be applied to a Locate request payload structure. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + s = '{' \ + '"maximum_items": 1, ' \ + '"offset_items": 1, ' \ + '"storage_status_mask": 3, ' \ + '"object_group_member": ObjectGroupMember.GROUP_MEMBER_DEFAULT, ' \ + '"attributes=[' \ + '{' \ + '"attribute_name": "Object Group", ' \ + '"attribute_index": None, ' \ + '"attribute_value": "RoundRobinTestGroup"' \ + '}]' \ + '}' + + self.assertEqual(s, str(payload)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Locate + request payloads with the same data. + """ + a = payloads.LocateRequestPayload() + b = payloads.LocateRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_maximum_items(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different maximum items. + """ + a = payloads.LocateRequestPayload(maximum_items=1) + b = payloads.LocateRequestPayload(maximum_items=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_offset_items(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different offset items. + """ + a = payloads.LocateRequestPayload(offset_items=1) + b = payloads.LocateRequestPayload(offset_items=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_storage_status_mask(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different storage status mask. + """ + a = payloads.LocateRequestPayload(storage_status_mask=1) + b = payloads.LocateRequestPayload(storage_status_mask=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_object_group_member(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different object group member. + """ + a = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT + ) + b = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_FRESH + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_attributes(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different attributes. + """ + a = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Cryptographic Algorithm" + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different types. + """ + a = payloads.LocateRequestPayload() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Locate request payloads with the same data. + """ + a = payloads.LocateRequestPayload() + b = payloads.LocateRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_maximum_items(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different maximum items. + """ + a = payloads.LocateRequestPayload(maximum_items=1) + b = payloads.LocateRequestPayload(maximum_items=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_offset_items(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different offset items. + """ + a = payloads.LocateRequestPayload(offset_items=1) + b = payloads.LocateRequestPayload(offset_items=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_storage_status_mask(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different storage status mask. + """ + a = payloads.LocateRequestPayload(storage_status_mask=1) + b = payloads.LocateRequestPayload(storage_status_mask=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_object_group_member(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different object group member. + """ + a = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT + ) + b = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_FRESH + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_attributes(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different attributes. + """ + a = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Cryptographic Algorithm" + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different types. + """ + a = payloads.LocateRequestPayload() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) + + +class TestLocateResponsePayload(testtools.TestCase): + + def setUp(self): + super(TestLocateResponsePayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Located Items field. + # + # This encoding matches the following set of values: + # Request Payload + # Located Items - 1 + # Unique Identifier - 8d945322-fd70-495d-bf7f-71481d1401f6 + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x40' + b'\x42\x00\xD5\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x38\x64\x39\x34\x35\x33\x32\x32\x2D\x66\x64\x37\x30\x2D\x34\x39' + b'\x35\x64\x2D\x62\x66\x37\x66\x2D\x37\x31\x34\x38\x31\x64\x31\x34' + b'\x30\x31\x66\x36\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 8d945322-fd70-495d-bf7f-71481d1401f6 + self.no_located_items_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x38\x64\x39\x34\x35\x33\x32\x32\x2D\x66\x64\x37\x30\x2D\x34\x39' + b'\x35\x64\x2D\x62\x66\x37\x66\x2D\x37\x31\x34\x38\x31\x64\x31\x34' + b'\x30\x31\x66\x36\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Located Items field. + # + # This encoding matches the following set of values: + # Request Payload + # Located Items - 1 + self.no_unique_identifiers_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x10' + b'\x42\x00\xD5\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Request Payload + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestLocateResponsePayload, self).tearDown() + + def test_invalid_located_items(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the located items of a Locate response payload. + """ + kwargs = {"located_items": "invalid"} + self.assertRaisesRegex( + TypeError, + "Located items must be an integer.", + payloads.LocateResponsePayload, + **kwargs + ) + + args = ( + payloads.LocateResponsePayload(), + "located_items", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Located items must be an integer.", + setattr, + *args + ) + + def test_invalid_unique_identifiers(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifiers of a Locate response payload. + """ + kwargs = {"unique_identifiers": "invalid"} + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + payloads.LocateResponsePayload, + **kwargs + ) + + kwargs = {"unique_identifiers": [0]} + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + payloads.LocateResponsePayload, + **kwargs + ) + + args = ( + payloads.LocateResponsePayload(), + "unique_identifiers", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + setattr, + *args + ) + + args = ( + payloads.LocateResponsePayload(), + "unique_identifiers", + [0] + ) + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Locate response payload can be read from a data stream. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.full_encoding) + + self.assertEqual(1, payload.located_items) + self.assertIsInstance(payload.unique_identifiers, list) + self.assertEqual(1, len(payload.unique_identifiers)) + self.assertEqual( + ["8d945322-fd70-495d-bf7f-71481d1401f6"], + payload.unique_identifiers + ) + + def test_read_missing_located_items(self): + """ + Test that a Locate response payload can be read from a data stream + even when missing the located items. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.no_located_items_encoding) + + self.assertIsNone(payload.located_items) + self.assertIsInstance(payload.unique_identifiers, list) + self.assertEqual(1, len(payload.unique_identifiers)) + self.assertEqual( + ["8d945322-fd70-495d-bf7f-71481d1401f6"], + payload.unique_identifiers + ) + + def test_read_missing_unique_identifiers(self): + """ + Test that a Locate response payload can be read from a data stream + even when missing the unique identifiers. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.no_unique_identifiers_encoding) + + self.assertEqual(1, payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + def test_read_missing_everything(self): + """ + Test that a Locate response payload can be read from a data stream + even when missing all fields. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.empty_encoding) + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + def test_write(self): + """ + Test that a Locate response payload can be written to a data stream. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_located_items(self): + """ + Test that a Locate response payload can be written to a data stream + even when missing the located items. + """ + payload = payloads.LocateResponsePayload( + located_items=1 + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_unique_identifiers_encoding), len(stream)) + self.assertEqual(str(self.no_unique_identifiers_encoding), str(stream)) + + def test_write_missing_unique_identifiers(self): + """ + Test that a Locate response payload can be written to a data stream + even when missing the unique identifiers. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_everything(self): + """ + Test that a Locate response payload can be written to a data stream + even when missing all fields. + """ + payload = payloads.LocateResponsePayload() + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_repr(self): + """ + Test that repr can be applied to a Locate response payload structure. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + s = "LocateResponsePayload(" \ + "located_items=1, " \ + "unique_identifiers=['8d945322-fd70-495d-bf7f-71481d1401f6'])" + + self.assertEqual(s, repr(payload)) + + def str(self): + """ + Test that str can be applied to a Locate response payload structure. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + s = "LocateResponsePayload(" \ + "located_items=1, " \ + "unique_identifiers=['8d945322-fd70-495d-bf7f-71481d1401f6'])" + + self.assertEqual(s, repr(payload)) + + s = '{' \ + '"located_items": 1, ' \ + '"unique_identifiers": [' \ + '"8d945322-fd70-495d-bf7f-71481d1401f6"]' \ + ']' \ + '}' + + self.assertEqual(s, str(payload)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Locate + response payloads with the same data. + """ + a = payloads.LocateResponsePayload() + b = payloads.LocateResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_located_items(self): + """ + Test that the equality operator returns False when comparing two + Locate response payloads with different located items. + """ + a = payloads.LocateResponsePayload(located_items=1) + b = payloads.LocateResponsePayload(located_items=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_unique_identifiers(self): + """ + Test that the equality operator returns False when comparing two + Locate response payloads with different unique identifiers. + """ + a = payloads.LocateResponsePayload( + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + unique_identifiers=["49a1ca88-6bea-4fb2-b450-7e58802c3038"] + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Locate response payloads with different types. + """ + a = payloads.LocateResponsePayload() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Locate response payloads with the same data. + """ + a = payloads.LocateResponsePayload() + b = payloads.LocateResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_located_items(self): + """ + Test that the inequality operator returns True when comparing two + Locate response payloads with different located items. + """ + a = payloads.LocateResponsePayload(located_items=1) + b = payloads.LocateResponsePayload(located_items=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_unique_identifiers(self): + """ + Test that the inequality operator returns True when comparing two + Locate response payloads with different unique identifiers. + """ + a = payloads.LocateResponsePayload( + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + unique_identifiers=["49a1ca88-6bea-4fb2-b450-7e58802c3038"] + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Locate response payloads with different types. + """ + a = payloads.LocateResponsePayload() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index cbd3c1c..1ab7212 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -1946,7 +1946,7 @@ class TestResponseMessage(TestCase): operation = contents.Operation(enums.Operation.LOCATE) result_status = contents.ResultStatus(enums.ResultStatus.SUCCESS) - uuid = attr.UniqueIdentifier('49a1ca88-6bea-4fb2-b450-7e58802c3038') + uuid = "49a1ca88-6bea-4fb2-b450-7e58802c3038" resp_pl = payloads.LocateResponsePayload(unique_identifiers=[uuid]) diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index ad44328..e5ca784 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -2700,8 +2700,7 @@ class TestProxyKmipClient(testtools.TestCase): uuid0 = 'aaaaaaaa-1111-2222-3333-ffffffffffff' uuid1 = 'bbbbbbbb-4444-5555-6666-gggggggggggg' - unique_identifiers = [attr.UniqueIdentifier(uuid0), - attr.UniqueIdentifier(uuid1)] + unique_identifiers = [uuid0, uuid1] result = results.LocateResult( contents.ResultStatus(enums.ResultStatus.SUCCESS), diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index e2f74d0..bd3092b 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -4289,7 +4289,7 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( id_a, - response_payload.unique_identifiers[0].value + response_payload.unique_identifiers[0] ) # Add the second obj and test the locate @@ -4315,11 +4315,11 @@ class TestKmipEngine(testtools.TestCase): ) self.assertIn( id_a, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) self.assertIn( id_b, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) def test_locate_with_name(self): @@ -4381,11 +4381,11 @@ class TestKmipEngine(testtools.TestCase): ) self.assertIn( id_a, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) self.assertIn( id_b, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) # Locate the obj with name 'name1' @@ -4415,7 +4415,7 @@ class TestKmipEngine(testtools.TestCase): ) self.assertIn( id_c, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) def test_get(self):