diff --git a/kmip/core/messages/payloads/locate.py b/kmip/core/messages/payloads/locate.py index 785878f..e6f7b16 100644 --- a/kmip/core/messages/payloads/locate.py +++ b/kmip/core/messages/payloads/locate.py @@ -13,139 +13,505 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core import attributes +import six + from kmip.core import enums -from kmip.core.enums import Tags - -from kmip.core.objects import Attribute - -from kmip.core.primitives import Struct -from kmip.core.primitives import Enumeration -from kmip.core.primitives import Integer - -from kmip.core.utils import BytearrayStream +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils -class LocateRequestPayload(Struct): +class LocateRequestPayload(primitives.Struct): + """ + A request payload for the Locate operation. - # 9.1.3.2.33 - class ObjectGroupMember(Enumeration): + Attributes: + maximum_items: The maximum number of object identifiers to be returned. + offset_items: The number of object identifiers to skip when selecting + the object identifiers to return. + storage_status_mask: A bit mask specifying which types of stored + objects should be searched. + object_group_member: The object group member type for the searched + objects. + attributes: The attributes that should be used to filter and match + objects. + """ - def __init__(self, value=None): - super(LocateRequestPayload.ObjectGroupMember, self).__init__( - enums.ObjectGroupMember, value, Tags.OBJECT_GROUP_MEMBER) + def __init__(self, + maximum_items=None, + offset_items=None, + storage_status_mask=None, + object_group_member=None, + attributes=None): + """ + Construct a Locate request payload structure. - class MaximumItems(Integer): - def __init__(self, value=None): - super(LocateRequestPayload.MaximumItems, self).__init__( - value, Tags.MAXIMUM_ITEMS) - - # 9.1.3.3.2 - class StorageStatusMask(Enumeration): - - def __init__(self, value=None): - super(LocateRequestPayload.StorageStatusMask, self).__init__( - enums.StorageStatusMask, value, Tags.STORAGE_STATUS_MASK) - - def __init__(self, maximum_items=None, storage_status_mask=None, - object_group_member=None, attributes=None): + Args: + maximum_items (int): An integer specifying the maximum number of + object identifiers to be returned. Optional, defaults to None. + offset_items (int): An integer specifying the number of object + identifiers to skip when selecting the object identifiers to + return. Optional, defaults to None. + storage_status_mask (int, list): An integer bit mask or a + corresponding list of StorageStatusMask enumerations indicating + which types of stored objects should be searched. Optional, + defaults to None. + object_group_member (enum): An ObjectGroupMember enumeration + specifying the object group member type for the searched + objects. Optional, defaults to None. + attributes (list): A list of Attribute structures containing the + attribute values that should be used to filter and match + objects. Optional, defaults to None. + """ super(LocateRequestPayload, self).__init__(enums.Tags.REQUEST_PAYLOAD) + + self._maximum_items = None + self._offset_items = None + self._storage_status_mask = None + self._object_group_member = None + self._attributes = None + self.maximum_items = maximum_items + self.offset_items = offset_items self.storage_status_mask = storage_status_mask self.object_group_member = object_group_member - self.attributes = attributes or [] - self.validate() + self.attributes = attributes - def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): + @property + def maximum_items(self): + if self._maximum_items: + return self._maximum_items.value + else: + return None + + @maximum_items.setter + def maximum_items(self, value): + if value is None: + self._maximum_items = None + elif isinstance(value, six.integer_types): + self._maximum_items = primitives.Integer( + value=value, + tag=enums.Tags.MAXIMUM_ITEMS + ) + else: + raise TypeError("Maximum items must be an integer.") + + @property + def offset_items(self): + if self._offset_items: + return self._offset_items.value + else: + return None + + @offset_items.setter + def offset_items(self, value): + if value is None: + self._offset_items = None + elif isinstance(value, six.integer_types): + self._offset_items = primitives.Integer( + value=value, + tag=enums.Tags.OFFSET_ITEMS + ) + else: + raise TypeError("Offset items must be an integer.") + + @property + def storage_status_mask(self): + if self._storage_status_mask: + return self._storage_status_mask.value + else: + return None + + @storage_status_mask.setter + def storage_status_mask(self, value): + if value is None: + self._storage_status_mask = None + elif isinstance(value, six.integer_types): + if enums.is_bit_mask(enums.StorageStatusMask, value): + self._storage_status_mask = primitives.Integer( + value=value, + tag=enums.Tags.STORAGE_STATUS_MASK + ) + else: + raise TypeError( + "Storage status mask must be an integer representing a " + "valid StorageStatusMask bit mask." + ) + else: + raise TypeError( + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask." + ) + + @property + def object_group_member(self): + if self._object_group_member: + return self._object_group_member.value + else: + return None + + @object_group_member.setter + def object_group_member(self, value): + if value is None: + self._object_group_member = None + elif isinstance(value, enums.ObjectGroupMember): + self._object_group_member = primitives.Enumeration( + enums.ObjectGroupMember, + value=value, + tag=enums.Tags.OBJECT_GROUP_MEMBER + ) + else: + raise TypeError( + "Object group member must be an ObjectGroupMember enumeration." + ) + + @property + def attributes(self): + if self._attributes: + return self._attributes + return [] + + @attributes.setter + def attributes(self, value): + if value is None: + self._attributes = [] + elif isinstance(value, list): + for v in value: + if not isinstance(v, objects.Attribute): + raise TypeError( + "Attributes must be a list of Attribute structures." + ) + self._attributes = value + else: + raise TypeError( + "Attributes must be a list of Attribute structures." + ) + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Read the data encoding the Locate request payload and decode it into + its constituent parts. + + Args: + input_buffer (stream): A data buffer containing encoded object + data, supporting a read method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 1.0. + """ super(LocateRequestPayload, self).read( - istream, + input_buffer, kmip_version=kmip_version ) - tstream = BytearrayStream(istream.read(self.length)) - if self.is_tag_next(Tags.MAXIMUM_ITEMS, tstream): - self.maximum_items = LocateRequestPayload.MaximumItems() - self.maximum_items.read(tstream, kmip_version=kmip_version) - if self.is_tag_next(Tags.STORAGE_STATUS_MASK, tstream): - self.storage_status_mask = LocateRequestPayload.StorageStatusMask() - self.storage_status_mask.read(tstream, kmip_version=kmip_version) - if self.is_tag_next(Tags.OBJECT_GROUP_MEMBER, tstream): - self.object_group_member = LocateRequestPayload.ObjectGroupMember() - self.object_group_member.read(tstream, kmip_version=kmip_version) - while self.is_tag_next(Tags.ATTRIBUTE, tstream): - attr = Attribute() - attr.read(tstream, kmip_version=kmip_version) - self.attributes.append(attr) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) - self.validate() + if self.is_tag_next(enums.Tags.MAXIMUM_ITEMS, local_buffer): + self._maximum_items = primitives.Integer( + tag=enums.Tags.MAXIMUM_ITEMS + ) + self._maximum_items.read( + local_buffer, + kmip_version=kmip_version + ) - def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): - tstream = BytearrayStream() - if self.maximum_items is not None: - self.maximum_items.write(tstream, kmip_version=kmip_version) - if self.storage_status_mask is not None: - self.storage_status_mask.write(tstream, kmip_version=kmip_version) - if self.object_group_member is not None: - self.object_group_member.write(tstream, kmip_version=kmip_version) - if self.attributes is not None: - for a in self.attributes: - a.write(tstream, kmip_version=kmip_version) + if self.is_tag_next(enums.Tags.OFFSET_ITEMS, local_buffer): + self._offset_items = primitives.Integer( + tag=enums.Tags.OFFSET_ITEMS + ) + self._offset_items.read( + local_buffer, + kmip_version=kmip_version + ) - # Write the length and value of the request payload - self.length = tstream.length() + if self.is_tag_next(enums.Tags.STORAGE_STATUS_MASK, local_buffer): + self._storage_status_mask = primitives.Integer( + tag=enums.Tags.STORAGE_STATUS_MASK + ) + self._storage_status_mask.read( + local_buffer, + kmip_version=kmip_version + ) + + if self.is_tag_next(enums.Tags.OBJECT_GROUP_MEMBER, local_buffer): + self._object_group_member = primitives.Enumeration( + enums.ObjectGroupMember, + tag=enums.Tags.OBJECT_GROUP_MEMBER + ) + self._object_group_member.read( + local_buffer, + kmip_version=kmip_version + ) + + while self.is_tag_next(enums.Tags.ATTRIBUTE, local_buffer): + attribute = objects.Attribute() + attribute.read(local_buffer, kmip_version=kmip_version) + self._attributes.append(attribute) + + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Write the data encoding the Locate request payload to a buffer. + + Args: + output_buffer (stream): A data buffer in which to encode object + data, supporting a write method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 1.0. + """ + local_buffer = utils.BytearrayStream() + + if self._maximum_items: + self._maximum_items.write(local_buffer, kmip_version=kmip_version) + + if self._offset_items: + self._offset_items.write(local_buffer, kmip_version=kmip_version) + + if self._storage_status_mask: + self._storage_status_mask.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._object_group_member: + self._object_group_member.write( + local_buffer, + kmip_version=kmip_version + ) + + if self._attributes: + for attribute in self.attributes: + attribute.write(local_buffer, kmip_version=kmip_version) + + self.length = local_buffer.length() super(LocateRequestPayload, self).write( - ostream, + output_buffer, kmip_version=kmip_version ) - ostream.write(tstream.buffer) + output_buffer.write(local_buffer.buffer) - def validate(self): - self._validate() + def __eq__(self, other): + if isinstance(other, LocateRequestPayload): + if self.maximum_items != other.maximum_items: + return False + elif self.offset_items != other.offset_items: + return False + elif self.storage_status_mask != other.storage_status_mask: + return False + elif self.object_group_member != other.object_group_member: + return False + elif self.attributes != other.attributes: + return False + else: + return True + else: + return NotImplemented - def _validate(self): - # TODO Finish implementation. - pass + def __ne__(self, other): + if isinstance(other, LocateRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "maximum_items={}".format(self.maximum_items), + "offset_items={}".format(self.offset_items), + "storage_status_mask={}".format(self.storage_status_mask), + "object_group_member={}".format(self.object_group_member), + "attributes={}".format( + [repr(attribute) for attribute in self.attributes] + ) + ]) + return "LocateRequestPayload({})".format(args) + + def __str__(self): + value = ", ".join( + [ + '"maximum_items": {}'.format(self.maximum_items), + '"offset_items": {}'.format(self.offset_items), + '"storage_status_mask": {}'.format(self.storage_status_mask), + '"object_group_member": {}'.format(self.object_group_member), + '"attributes": {}'.format( + [str(attribute) for attribute in self.attributes] + ) + ] + ) + return '{' + value + '}' -class LocateResponsePayload(Struct): +class LocateResponsePayload(primitives.Struct): + """ + A response payload for the Locate operation. - def __init__(self, unique_identifiers=[]): + Attributes: + located_items: The number of matching objects found by the server. + unique_identifiers: The object identifiers for the matching objects. + """ + + def __init__(self, + located_items=None, + unique_identifiers=None): + """ + Construct a Locate response payload structure. + + Args: + located_items (int): An integer specifying the number of matching + objects found by the server. Note that this may not equal the + number of object identifiers returned in this payload. + Optional, defaults to None. + unique_identifiers (list): A list of strings specifying the object + identifiers for matching objects. Optional, defaults to None. + """ super(LocateResponsePayload, self).__init__( enums.Tags.RESPONSE_PAYLOAD) - self.unique_identifiers = unique_identifiers or [] - self.validate() - def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): + self._located_items = None + self._unique_identifiers = None + + self.located_items = located_items + self.unique_identifiers = unique_identifiers + + @property + def located_items(self): + if self._located_items: + return self._located_items.value + return None + + @located_items.setter + def located_items(self, value): + if value is None: + self._located_items = None + elif isinstance(value, six.integer_types): + self._located_items = primitives.Integer( + value=value, + tag=enums.Tags.LOCATED_ITEMS + ) + else: + raise TypeError("Located items must be an integer.") + + @property + def unique_identifiers(self): + if self._unique_identifiers: + return [x.value for x in self._unique_identifiers] + return [] + + @unique_identifiers.setter + def unique_identifiers(self, value): + if value is None: + self._unique_identifiers = [] + elif isinstance(value, list): + self._unique_identifiers = [] + for v in value: + if not isinstance(v, six.string_types): + self._unique_identifiers = [] + raise TypeError( + "Unique identifiers must be a list of strings." + ) + self._unique_identifiers.append( + primitives.TextString( + value=v, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + ) + else: + raise TypeError("Unique identifiers must be a list of strings.") + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Read the data encoding the Locate response payload and decode it + into its constituent parts. + + Args: + input_buffer (stream): A data buffer containing encoded object + data, supporting a read method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 1.0. + """ super(LocateResponsePayload, self).read( - istream, + input_buffer, kmip_version=kmip_version ) - tstream = BytearrayStream(istream.read(self.length)) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) - while self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream): - ui = attributes.UniqueIdentifier() - ui.read(tstream, kmip_version=kmip_version) - self.unique_identifiers.append(ui) + if self.is_tag_next(enums.Tags.LOCATED_ITEMS, local_buffer): + self._located_items = primitives.Integer( + tag=enums.Tags.LOCATED_ITEMS + ) + self._located_items.read( + local_buffer, + kmip_version=kmip_version + ) - self.is_oversized(tstream) - self.validate() + self._unique_identifiers = [] + while self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_buffer): + unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + unique_identifier.read(local_buffer, kmip_version=kmip_version) + self._unique_identifiers.append(unique_identifier) - def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): - tstream = BytearrayStream() + self.is_oversized(local_buffer) - for ui in self.unique_identifiers: - ui.write(tstream, kmip_version=kmip_version) + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Write the data encoding the Locate response payload to a buffer. - # Write the length and value of the request payload - self.length = tstream.length() + Args: + output_buffer (stream): A data buffer in which to encode object + data, supporting a write method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 1.0. + """ + local_buffer = utils.BytearrayStream() + + if self._located_items: + self._located_items.write(local_buffer, kmip_version=kmip_version) + + if self._unique_identifiers: + for unique_identifier in self._unique_identifiers: + unique_identifier.write( + local_buffer, + kmip_version=kmip_version + ) + + self.length = local_buffer.length() super(LocateResponsePayload, self).write( - ostream, + output_buffer, kmip_version=kmip_version ) - ostream.write(tstream.buffer) + output_buffer.write(local_buffer.buffer) - def validate(self): - self.__validate() + def __eq__(self, other): + if isinstance(other, LocateResponsePayload): + if self.located_items != other.located_items: + return False + elif self.unique_identifiers != other.unique_identifiers: + return False + else: + return True + else: + return NotImplemented - def __validate(self): - # TODO Finish implementation. - pass + def __ne__(self, other): + if isinstance(other, LocateResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "located_items={}".format(self.located_items), + "unique_identifiers={}".format(self.unique_identifiers) + ]) + return "LocateResponsePayload({})".format(args) + + def __str__(self): + value = ", ".join( + [ + '"located_items": {}'.format(self.located_items), + '"unique_identifiers": {}'.format(self.unique_identifiers), + ] + ) + return '{' + value + '}' diff --git a/kmip/pie/client.py b/kmip/pie/client.py index d126b7d..a82fd0e 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -707,8 +707,7 @@ class ProxyKmipClient(object): status = result.result_status.value if status == enums.ResultStatus.SUCCESS: - uids = [uuid.value for uuid in result.uuids] - return uids + return result.uuids else: reason = result.result_reason.value message = result.result_message.value diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 594d2d3..0e6d3e3 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -1476,30 +1476,21 @@ class KMIPProxy(object): return result def _locate(self, maximum_items=None, storage_status_mask=None, - object_group_member=None, attributes=[], credential=None): + object_group_member=None, attributes=None, credential=None): operation = Operation(OperationEnum.LOCATE) - mxi = None - ssmask = None - objgrp = None + payload = payloads.LocateRequestPayload( + maximum_items=maximum_items, + storage_status_mask=storage_status_mask, + object_group_member=object_group_member, + attributes=attributes + ) - if maximum_items is not None: - mxi = payloads.LocateRequestPayload.MaximumItems(maximum_items) - if storage_status_mask is not None: - m = storage_status_mask - ssmask = payloads.LocateRequestPayload.StorageStatusMask(m) - if object_group_member is not None: - o = object_group_member - objgrp = payloads.LocateRequestPayload.ObjectGroupMember(o) - - payload = payloads.LocateRequestPayload(maximum_items=mxi, - storage_status_mask=ssmask, - object_group_member=objgrp, - attributes=attributes) - - batch_item = messages.RequestBatchItem(operation=operation, - request_payload=payload) + batch_item = messages.RequestBatchItem( + operation=operation, + request_payload=payload + ) message = self._build_request_message(credential, [batch_item]) self._send_message(message) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index a0fe30e..d66574f 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1563,9 +1563,9 @@ class KmipEngine(object): managed_objects = managed_objects_filtered - unique_identifiers = [attributes.UniqueIdentifier( - str(managed_object.unique_identifier)) - for managed_object in managed_objects] + unique_identifiers = [ + str(x.unique_identifier) for x in managed_objects + ] response_payload = payloads.LocateResponsePayload( unique_identifiers=unique_identifiers diff --git a/kmip/tests/unit/core/messages/payloads/test_locate.py b/kmip/tests/unit/core/messages/payloads/test_locate.py index 87b311e..307bad7 100644 --- a/kmip/tests/unit/core/messages/payloads/test_locate.py +++ b/kmip/tests/unit/core/messages/payloads/test_locate.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory +# Copyright (c) 2019 The Johns Hopkins University/Applied Physics Laboratory # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -12,3 +12,1536 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import testtools + +from kmip.core import enums +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils + +from kmip.core.messages import payloads + + +class TestLocateRequestPayload(testtools.TestCase): + + def setUp(self): + super(TestLocateRequestPayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x80' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_maximum_items_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Storage Status Mask field. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_offset_items_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items field. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Object Group Member - Group Member Default + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_storage_status_mask_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Attribute + # Attribute Name - Object Group + # Attribute Value - RoundRobinTestGroup + self.no_object_group_member_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x70' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x38' + b'\x42\x00\x0A\x07\x00\x00\x00\x0C' + b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' + b'\x42\x00\x0B\x07\x00\x00\x00\x13' + b'\x52\x6F\x75\x6E\x64\x52\x6F\x62\x69\x6E\x54\x65\x73\x74\x47\x72' + b'\x6F\x75\x70\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Offset Items and Storage Status Mask fields. + # + # This encoding matches the following set of values: + # Request Payload + # Maximum Items - 1 + # Offset Items - 1 + # Storage Status Mask - Online Storage | Archival Storage + # Object Group Member - Group Member Default + self.no_attributes_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x40' + b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Request Payload + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestLocateRequestPayload, self).tearDown() + + def test_invalid_maximum_items(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the maximum items of a Locate request payload. + """ + kwargs = {"maximum_items": "invalid"} + self.assertRaisesRegex( + TypeError, + "Maximum items must be an integer.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "maximum_items", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Maximum items must be an integer.", + setattr, + *args + ) + + def test_invalid_offset_items(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the offset items of a Locate request payload. + """ + kwargs = {"offset_items": "invalid"} + self.assertRaisesRegex( + TypeError, + "Offset items must be an integer.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "offset_items", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Offset items must be an integer.", + setattr, + *args + ) + + def test_invalid_storage_status_mask(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the storage status mask of a Locate request payload. + """ + kwargs = {"storage_status_mask": "invalid"} + self.assertRaisesRegex( + TypeError, + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask.", + payloads.LocateRequestPayload, + **kwargs + ) + + kwargs = {"storage_status_mask": 55} + self.assertRaisesRegex( + TypeError, + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "storage_status_mask", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Storage status mask must be an integer representing a valid " + "StorageStatusMask bit mask.", + setattr, + *args + ) + + def test_invalid_object_group_member(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the object group member of a Locate request payload. + """ + kwargs = {"object_group_member": "invalid"} + self.assertRaisesRegex( + TypeError, + "Object group member must be an ObjectGroupMember enumeration.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "object_group_member", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Object group member must be an ObjectGroupMember enumeration.", + setattr, + *args + ) + + def test_invalid_attributes(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the attributes of a Locate request payload. + """ + kwargs = {"attributes": "invalid"} + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + payloads.LocateRequestPayload, + **kwargs + ) + + kwargs = {"attributes": ["invalid"]} + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + payloads.LocateRequestPayload, + **kwargs + ) + + args = ( + payloads.LocateRequestPayload(), + "attributes", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + setattr, + *args + ) + + args = ( + payloads.LocateRequestPayload(), + "attributes", + ["invalid"] + ) + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Attribute structures.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Locate request payload can be read from a data stream. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.full_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_maximum_items(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the maximum items. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_maximum_items_encoding) + + self.assertIsNone(payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_offset_items(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the offset items. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_offset_items_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_storage_status_mask(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the storage status mask. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_storage_status_mask_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_object_group_member(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the object group member. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_object_group_member_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertIsNone(payload.object_group_member) + self.assertIsInstance(payload.attributes, list) + self.assertEqual(1, len(payload.attributes)) + self.assertEqual( + objects.Attribute( + attribute_name=objects.Attribute.AttributeName("Object Group"), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ), + payload.attributes[0] + ) + + def test_read_missing_attributes(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing the attributes. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.no_attributes_encoding) + + self.assertEqual(1, payload.maximum_items) + self.assertEqual(1, payload.offset_items) + self.assertEqual( + enums.get_bit_mask_from_enumerations( + [ + enums.StorageStatusMask.ONLINE_STORAGE, + enums.StorageStatusMask.ARCHIVAL_STORAGE + ] + ), + payload.storage_status_mask + ) + self.assertEqual( + enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + payload.object_group_member + ) + self.assertEqual([], payload.attributes) + + def test_read_missing_everything(self): + """ + Test that a Locate request payload can be read from a data stream + even when missing all fields. + """ + payload = payloads.LocateRequestPayload() + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + payload.read(self.empty_encoding) + + self.assertIsNone(payload.maximum_items) + self.assertIsNone(payload.offset_items) + self.assertIsNone(payload.storage_status_mask) + self.assertIsNone(payload.object_group_member) + self.assertEqual([], payload.attributes) + + def test_write(self): + """ + Test that a Locate request payload can be written to a data stream. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_maximum_items(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the maximum items. + """ + payload = payloads.LocateRequestPayload( + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_maximum_items_encoding), len(stream)) + self.assertEqual(str(self.no_maximum_items_encoding), str(stream)) + + def test_write_missing_offset_items(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the offset items. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_offset_items_encoding), len(stream)) + self.assertEqual(str(self.no_offset_items_encoding), str(stream)) + + def test_write_missing_storage_status_mask(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the storage status mask. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual( + len(self.no_storage_status_mask_encoding), + len(stream) + ) + self.assertEqual( + str(self.no_storage_status_mask_encoding), + str(stream) + ) + + def test_write_missing_object_group_member(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the object group member. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual( + len(self.no_object_group_member_encoding), + len(stream) + ) + self.assertEqual( + str(self.no_object_group_member_encoding), + str(stream) + ) + + def test_write_missing_attributes(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing the attributes. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_attributes_encoding), len(stream)) + self.assertEqual(str(self.no_attributes_encoding), str(stream)) + + def test_write_missing_everything(self): + """ + Test that a Locate request payload can be written to a data stream + even when missing all fields. + """ + payload = payloads.LocateRequestPayload() + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_repr(self): + """ + Test that repr can be applied to a Locate request payload structure. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + s = "LocateRequestPayload(" \ + "maximum_items=1, " \ + "offset_items=1, " \ + "storage_status_mask=3, " \ + "object_group_member=ObjectGroupMember.GROUP_MEMBER_DEFAULT, " \ + "attributes=[" +# "Attribute(" \ +# "attribute_name=AttributeName(value='Object Group'), " \ +# "attribute_index=None, " \ +# "attribute_value=TextString(value='RoundRobinTestGroup'))" \ +# "])" + + # TODO (ph) Uncomment above when Attribute repr fixed. Fix below too. + + self.assertTrue(repr(payload).startswith(s)) + + def str(self): + """ + Test that str can be applied to a Locate request payload structure. + """ + payload = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + s = '{' \ + '"maximum_items": 1, ' \ + '"offset_items": 1, ' \ + '"storage_status_mask": 3, ' \ + '"object_group_member": ObjectGroupMember.GROUP_MEMBER_DEFAULT, ' \ + '"attributes=[' \ + '{' \ + '"attribute_name": "Object Group", ' \ + '"attribute_index": None, ' \ + '"attribute_value": "RoundRobinTestGroup"' \ + '}]' \ + '}' + + self.assertEqual(s, str(payload)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Locate + request payloads with the same data. + """ + a = payloads.LocateRequestPayload() + b = payloads.LocateRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_maximum_items(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different maximum items. + """ + a = payloads.LocateRequestPayload(maximum_items=1) + b = payloads.LocateRequestPayload(maximum_items=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_offset_items(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different offset items. + """ + a = payloads.LocateRequestPayload(offset_items=1) + b = payloads.LocateRequestPayload(offset_items=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_storage_status_mask(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different storage status mask. + """ + a = payloads.LocateRequestPayload(storage_status_mask=1) + b = payloads.LocateRequestPayload(storage_status_mask=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_object_group_member(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different object group member. + """ + a = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT + ) + b = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_FRESH + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_attributes(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different attributes. + """ + a = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Cryptographic Algorithm" + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Locate request payloads with different types. + """ + a = payloads.LocateRequestPayload() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Locate request payloads with the same data. + """ + a = payloads.LocateRequestPayload() + b = payloads.LocateRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + maximum_items=1, + offset_items=1, + storage_status_mask=3, + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT, + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_maximum_items(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different maximum items. + """ + a = payloads.LocateRequestPayload(maximum_items=1) + b = payloads.LocateRequestPayload(maximum_items=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_offset_items(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different offset items. + """ + a = payloads.LocateRequestPayload(offset_items=1) + b = payloads.LocateRequestPayload(offset_items=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_storage_status_mask(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different storage status mask. + """ + a = payloads.LocateRequestPayload(storage_status_mask=1) + b = payloads.LocateRequestPayload(storage_status_mask=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_object_group_member(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different object group member. + """ + a = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT + ) + b = payloads.LocateRequestPayload( + object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_FRESH + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_attributes(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different attributes. + """ + a = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Object Group" + ), + attribute_value=primitives.TextString( + value="RoundRobinTestGroup", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ] + ) + b = payloads.LocateRequestPayload( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Cryptographic Algorithm" + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Locate request payloads with different types. + """ + a = payloads.LocateRequestPayload() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) + + +class TestLocateResponsePayload(testtools.TestCase): + + def setUp(self): + super(TestLocateResponsePayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Located Items field. + # + # This encoding matches the following set of values: + # Request Payload + # Located Items - 1 + # Unique Identifier - 8d945322-fd70-495d-bf7f-71481d1401f6 + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x40' + b'\x42\x00\xD5\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x38\x64\x39\x34\x35\x33\x32\x32\x2D\x66\x64\x37\x30\x2D\x34\x39' + b'\x35\x64\x2D\x62\x66\x37\x66\x2D\x37\x31\x34\x38\x31\x64\x31\x34' + b'\x30\x31\x66\x36\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 8d945322-fd70-495d-bf7f-71481d1401f6 + self.no_located_items_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x38\x64\x39\x34\x35\x33\x32\x32\x2D\x66\x64\x37\x30\x2D\x34\x39' + b'\x35\x64\x2D\x62\x66\x37\x66\x2D\x37\x31\x34\x38\x31\x64\x31\x34' + b'\x30\x31\x66\x36\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4. + # Modified to include the Located Items field. + # + # This encoding matches the following set of values: + # Request Payload + # Located Items - 1 + self.no_unique_identifiers_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x10' + b'\x42\x00\xD5\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Request Payload + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestLocateResponsePayload, self).tearDown() + + def test_invalid_located_items(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the located items of a Locate response payload. + """ + kwargs = {"located_items": "invalid"} + self.assertRaisesRegex( + TypeError, + "Located items must be an integer.", + payloads.LocateResponsePayload, + **kwargs + ) + + args = ( + payloads.LocateResponsePayload(), + "located_items", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Located items must be an integer.", + setattr, + *args + ) + + def test_invalid_unique_identifiers(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifiers of a Locate response payload. + """ + kwargs = {"unique_identifiers": "invalid"} + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + payloads.LocateResponsePayload, + **kwargs + ) + + kwargs = {"unique_identifiers": [0]} + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + payloads.LocateResponsePayload, + **kwargs + ) + + args = ( + payloads.LocateResponsePayload(), + "unique_identifiers", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + setattr, + *args + ) + + args = ( + payloads.LocateResponsePayload(), + "unique_identifiers", + [0] + ) + self.assertRaisesRegex( + TypeError, + "Unique identifiers must be a list of strings.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Locate response payload can be read from a data stream. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.full_encoding) + + self.assertEqual(1, payload.located_items) + self.assertIsInstance(payload.unique_identifiers, list) + self.assertEqual(1, len(payload.unique_identifiers)) + self.assertEqual( + ["8d945322-fd70-495d-bf7f-71481d1401f6"], + payload.unique_identifiers + ) + + def test_read_missing_located_items(self): + """ + Test that a Locate response payload can be read from a data stream + even when missing the located items. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.no_located_items_encoding) + + self.assertIsNone(payload.located_items) + self.assertIsInstance(payload.unique_identifiers, list) + self.assertEqual(1, len(payload.unique_identifiers)) + self.assertEqual( + ["8d945322-fd70-495d-bf7f-71481d1401f6"], + payload.unique_identifiers + ) + + def test_read_missing_unique_identifiers(self): + """ + Test that a Locate response payload can be read from a data stream + even when missing the unique identifiers. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.no_unique_identifiers_encoding) + + self.assertEqual(1, payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + def test_read_missing_everything(self): + """ + Test that a Locate response payload can be read from a data stream + even when missing all fields. + """ + payload = payloads.LocateResponsePayload() + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + payload.read(self.empty_encoding) + + self.assertIsNone(payload.located_items) + self.assertEqual([], payload.unique_identifiers) + + def test_write(self): + """ + Test that a Locate response payload can be written to a data stream. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_located_items(self): + """ + Test that a Locate response payload can be written to a data stream + even when missing the located items. + """ + payload = payloads.LocateResponsePayload( + located_items=1 + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_unique_identifiers_encoding), len(stream)) + self.assertEqual(str(self.no_unique_identifiers_encoding), str(stream)) + + def test_write_missing_unique_identifiers(self): + """ + Test that a Locate response payload can be written to a data stream + even when missing the unique identifiers. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_everything(self): + """ + Test that a Locate response payload can be written to a data stream + even when missing all fields. + """ + payload = payloads.LocateResponsePayload() + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_repr(self): + """ + Test that repr can be applied to a Locate response payload structure. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + s = "LocateResponsePayload(" \ + "located_items=1, " \ + "unique_identifiers=['8d945322-fd70-495d-bf7f-71481d1401f6'])" + + self.assertEqual(s, repr(payload)) + + def str(self): + """ + Test that str can be applied to a Locate response payload structure. + """ + payload = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + s = "LocateResponsePayload(" \ + "located_items=1, " \ + "unique_identifiers=['8d945322-fd70-495d-bf7f-71481d1401f6'])" + + self.assertEqual(s, repr(payload)) + + s = '{' \ + '"located_items": 1, ' \ + '"unique_identifiers": [' \ + '"8d945322-fd70-495d-bf7f-71481d1401f6"]' \ + ']' \ + '}' + + self.assertEqual(s, str(payload)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Locate + response payloads with the same data. + """ + a = payloads.LocateResponsePayload() + b = payloads.LocateResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_located_items(self): + """ + Test that the equality operator returns False when comparing two + Locate response payloads with different located items. + """ + a = payloads.LocateResponsePayload(located_items=1) + b = payloads.LocateResponsePayload(located_items=2) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_unique_identifiers(self): + """ + Test that the equality operator returns False when comparing two + Locate response payloads with different unique identifiers. + """ + a = payloads.LocateResponsePayload( + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + unique_identifiers=["49a1ca88-6bea-4fb2-b450-7e58802c3038"] + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Locate response payloads with different types. + """ + a = payloads.LocateResponsePayload() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Locate response payloads with the same data. + """ + a = payloads.LocateResponsePayload() + b = payloads.LocateResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + located_items=1, + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_located_items(self): + """ + Test that the inequality operator returns True when comparing two + Locate response payloads with different located items. + """ + a = payloads.LocateResponsePayload(located_items=1) + b = payloads.LocateResponsePayload(located_items=2) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_unique_identifiers(self): + """ + Test that the inequality operator returns True when comparing two + Locate response payloads with different unique identifiers. + """ + a = payloads.LocateResponsePayload( + unique_identifiers=["8d945322-fd70-495d-bf7f-71481d1401f6"] + ) + b = payloads.LocateResponsePayload( + unique_identifiers=["49a1ca88-6bea-4fb2-b450-7e58802c3038"] + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Locate response payloads with different types. + """ + a = payloads.LocateResponsePayload() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index cbd3c1c..1ab7212 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -1946,7 +1946,7 @@ class TestResponseMessage(TestCase): operation = contents.Operation(enums.Operation.LOCATE) result_status = contents.ResultStatus(enums.ResultStatus.SUCCESS) - uuid = attr.UniqueIdentifier('49a1ca88-6bea-4fb2-b450-7e58802c3038') + uuid = "49a1ca88-6bea-4fb2-b450-7e58802c3038" resp_pl = payloads.LocateResponsePayload(unique_identifiers=[uuid]) diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index ad44328..e5ca784 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -2700,8 +2700,7 @@ class TestProxyKmipClient(testtools.TestCase): uuid0 = 'aaaaaaaa-1111-2222-3333-ffffffffffff' uuid1 = 'bbbbbbbb-4444-5555-6666-gggggggggggg' - unique_identifiers = [attr.UniqueIdentifier(uuid0), - attr.UniqueIdentifier(uuid1)] + unique_identifiers = [uuid0, uuid1] result = results.LocateResult( contents.ResultStatus(enums.ResultStatus.SUCCESS), diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index e2f74d0..bd3092b 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -4289,7 +4289,7 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( id_a, - response_payload.unique_identifiers[0].value + response_payload.unique_identifiers[0] ) # Add the second obj and test the locate @@ -4315,11 +4315,11 @@ class TestKmipEngine(testtools.TestCase): ) self.assertIn( id_a, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) self.assertIn( id_b, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) def test_locate_with_name(self): @@ -4381,11 +4381,11 @@ class TestKmipEngine(testtools.TestCase): ) self.assertIn( id_a, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) self.assertIn( id_b, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) # Locate the obj with name 'name1' @@ -4415,7 +4415,7 @@ class TestKmipEngine(testtools.TestCase): ) self.assertIn( id_c, - [uid.value for uid in response_payload.unique_identifiers] + response_payload.unique_identifiers ) def test_get(self):