diff --git a/kmip/core/messages/payloads/create.py b/kmip/core/messages/payloads/create.py index 55cc4c8..383303f 100644 --- a/kmip/core/messages/payloads/create.py +++ b/kmip/core/messages/payloads/create.py @@ -13,115 +13,443 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core import attributes +import six + from kmip.core import enums -from kmip.core.enums import Tags - -from kmip.core.objects import TemplateAttribute - -from kmip.core.primitives import Struct - -from kmip.core.utils import BytearrayStream +from kmip.core import exceptions +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils -class CreateRequestPayload(Struct): +class CreateRequestPayload(primitives.Struct): + """ + A request payload for the Create operation. + + Attributes: + object_type: The type of the object to create. + template_attribute: A group of attributes to set on the new object. + """ def __init__(self, object_type=None, template_attribute=None): + """ + Construct a Create request payload structure. + + Args: + object_type (enum): An ObjectType enumeration specifying the type + of object to create. Optional, defaults to None. Required for + read/write. + template_attribute (TemplateAttribute): A TemplateAttribute + structure containing a set of attributes to set on the new + object. Optional, defaults to None. Required for read/write. + """ super(CreateRequestPayload, self).__init__( - tag=enums.Tags.REQUEST_PAYLOAD) + tag=enums.Tags.REQUEST_PAYLOAD + ) + + self._object_type = None + self._template_attribute = None + self.object_type = object_type self.template_attribute = template_attribute - self.validate() - def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): + @property + def object_type(self): + if self._object_type: + return self._object_type.value + else: + return None + + @object_type.setter + def object_type(self, value): + if value is None: + self._object_type = None + elif isinstance(value, enums.ObjectType): + self._object_type = primitives.Enumeration( + enums.ObjectType, + value=value, + tag=enums.Tags.OBJECT_TYPE + ) + else: + raise TypeError( + "Object type must be an ObjectType enumeration." + ) + + @property + def template_attribute(self): + return self._template_attribute + + @template_attribute.setter + def template_attribute(self, value): + if value is None: + self._template_attribute = None + elif isinstance(value, objects.TemplateAttribute): + self._template_attribute = value + else: + raise TypeError( + "Template attribute must be a TemplateAttribute structure." + ) + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Read the data encoding the Create request payload and decode it into + its constituent parts. + + Args: + input_buffer (stream): A data buffer containing encoded object + data, supporting a read method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 1.0. + + Raises: + InvalidKmipEncoding: Raised if the object type or template + attribute is missing from the encoded payload. + """ super(CreateRequestPayload, self).read( - istream, + input_buffer, kmip_version=kmip_version ) - tstream = BytearrayStream(istream.read(self.length)) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) - self.object_type = attributes.ObjectType() - self.template_attribute = TemplateAttribute() + if self.is_tag_next(enums.Tags.OBJECT_TYPE, local_buffer): + self._object_type = primitives.Enumeration( + enums.ObjectType, + tag=enums.Tags.OBJECT_TYPE + ) + self._object_type.read(local_buffer, kmip_version=kmip_version) + else: + raise exceptions.InvalidKmipEncoding( + "The Create request payload encoding is missing the object " + "type." + ) - self.object_type.read(tstream, kmip_version=kmip_version) - self.template_attribute.read(tstream, kmip_version=kmip_version) + if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer): + self._template_attribute = objects.TemplateAttribute() + self._template_attribute.read( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidKmipEncoding( + "The Create request payload encoding is missing the template " + "attribute." + ) - self.is_oversized(tstream) - self.validate() + self.is_oversized(local_buffer) - def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): - tstream = BytearrayStream() + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Write the data encoding the Create request payload to a buffer. - # Write the object type and template attribute of the request payload - self.object_type.write(tstream, kmip_version=kmip_version) - self.template_attribute.write(tstream, kmip_version=kmip_version) + Args: + output_buffer (stream): A data buffer in which to encode object + data, supporting a write method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 1.0. - # Write the length and value of the request payload - self.length = tstream.length() + Raises: + InvalidField: Raised if the object type attribute or template + attribute is not defined. + """ + local_buffer = utils.BytearrayStream() + + if self._object_type: + self._object_type.write(local_buffer, kmip_version=kmip_version) + else: + raise exceptions.InvalidField( + "The Create request payload is missing the object type field." + ) + + if self._template_attribute: + self._template_attribute.write( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidField( + "The Create request payload is missing the template attribute " + "field." + ) + + self.length = local_buffer.length() super(CreateRequestPayload, self).write( - ostream, + output_buffer, kmip_version=kmip_version ) - ostream.write(tstream.buffer) + output_buffer.write(local_buffer.buffer) - def validate(self): - # TODO (peter-hamilton) Finish implementation. - pass + def __eq__(self, other): + if isinstance(other, CreateRequestPayload): + if self.object_type != other.object_type: + return False + elif self.template_attribute != other.template_attribute: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, CreateRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "object_type={}".format(self.object_type), + "template_attribute={}".format(repr(self.template_attribute)) + ]) + return "CreateRequestPayload({})".format(args) + + def __str__(self): + value = ", ".join( + [ + '"object_type": {}'.format(self.object_type), + '"template_attribute": {}'.format(self.template_attribute) + ] + ) + return '{' + value + '}' -class CreateResponsePayload(Struct): +class CreateResponsePayload(primitives.Struct): + """ + A response payload for the Create operation. + + Attributes: + object_type: The type of the object created. + unique_identifier: The unique ID of the new object. + template_attribute: A group of attributes that were set on the new + object. + """ def __init__(self, object_type=None, unique_identifier=None, template_attribute=None): + """ + Construct a Create response payload structure. + + Args: + object_type (enum): An ObjectType enumeration specifying the type + of object created. Optional, defaults to None. Required for + read/write. + unique_identifier (string): The ID of the new object. Optional, + defaults to None. Required for read/write. + template_attribute (TemplateAttribute): A TemplateAttribute + structure containing a set of attributes that were set on the + new object. Optional, defaults to None. + """ super(CreateResponsePayload, self).__init__( - tag=enums.Tags.RESPONSE_PAYLOAD) + tag=enums.Tags.RESPONSE_PAYLOAD + ) + + self._object_type = None + self._unique_identifier = None + self._template_attribute = None + self.object_type = object_type self.unique_identifier = unique_identifier self.template_attribute = template_attribute - self.validate() - def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): + @property + def object_type(self): + if self._object_type: + return self._object_type.value + else: + return None + + @object_type.setter + def object_type(self, value): + if value is None: + self._object_type = None + elif isinstance(value, enums.ObjectType): + self._object_type = primitives.Enumeration( + enums.ObjectType, + value=value, + tag=enums.Tags.OBJECT_TYPE + ) + else: + raise TypeError( + "Object type must be an ObjectType enumeration." + ) + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + @property + def template_attribute(self): + return self._template_attribute + + @template_attribute.setter + def template_attribute(self, value): + if value is None: + self._template_attribute = None + elif isinstance(value, objects.TemplateAttribute): + self._template_attribute = value + else: + raise TypeError( + "Template attribute must be a TemplateAttribute structure." + ) + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Read the data encoding the Create response payload and decode it into + its constituent parts. + + Args: + input_buffer (stream): A data buffer containing encoded object + data, supporting a read method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 1.0. + + Raises: + InvalidKmipEncoding: Raised if the object type or unique + identifier is missing from the encoded payload. + """ super(CreateResponsePayload, self).read( - istream, + input_buffer, kmip_version=kmip_version ) - tstream = BytearrayStream(istream.read(self.length)) + local_buffer = utils.BytearrayStream(input_buffer.read(self.length)) - self.object_type = attributes.ObjectType() - self.unique_identifier = attributes.UniqueIdentifier() + if self.is_tag_next(enums.Tags.OBJECT_TYPE, local_buffer): + self._object_type = primitives.Enumeration( + enums.ObjectType, + tag=enums.Tags.OBJECT_TYPE + ) + self._object_type.read(local_buffer, kmip_version=kmip_version) + else: + raise exceptions.InvalidKmipEncoding( + "The Create response payload encoding is missing the object " + "type." + ) - self.object_type.read(tstream, kmip_version=kmip_version) - self.unique_identifier.read(tstream, kmip_version=kmip_version) + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_buffer): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidKmipEncoding( + "The Create response payload encoding is missing the unique " + "identifier." + ) - if self.is_tag_next(Tags.TEMPLATE_ATTRIBUTE, tstream): - self.template_attribute = TemplateAttribute() - self.template_attribute.read(tstream, kmip_version=kmip_version) + if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer): + self._template_attribute = objects.TemplateAttribute() + self._template_attribute.read( + local_buffer, + kmip_version=kmip_version + ) - self.is_oversized(tstream) - self.validate() + self.is_oversized(local_buffer) - def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): - tstream = BytearrayStream() + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0): + """ + Write the data encoding the Create response payload to a buffer. - # Write the contents of the request payload - self.object_type.write(tstream, kmip_version=kmip_version) - self.unique_identifier.write(tstream, kmip_version=kmip_version) + Args: + output_buffer (stream): A data buffer in which to encode object + data, supporting a write method. + kmip_version (KMIPVersion): An enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 1.0. - if self.template_attribute is not None: - self.template_attribute.write(tstream, kmip_version=kmip_version) + Raises: + InvalidField: Raised if the object type attribute or unique + identifier is not defined. + """ + local_buffer = utils.BytearrayStream() - # Write the length and value of the request payload - self.length = tstream.length() + if self._object_type: + self._object_type.write(local_buffer, kmip_version=kmip_version) + else: + raise exceptions.InvalidField( + "The Create response payload is missing the object type field." + ) + + if self._unique_identifier: + self._unique_identifier.write( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidField( + "The Create response payload is missing the unique identifier " + "field." + ) + + if self._template_attribute: + self._template_attribute.write( + local_buffer, + kmip_version=kmip_version + ) + + self.length = local_buffer.length() super(CreateResponsePayload, self).write( - ostream, + output_buffer, kmip_version=kmip_version ) - ostream.write(tstream.buffer) + output_buffer.write(local_buffer.buffer) - def validate(self): - # TODO (peter-hamilton) Finish implementation. - pass + def __eq__(self, other): + if isinstance(other, CreateResponsePayload): + if self.object_type != other.object_type: + return False + elif self.unique_identifier != other.unique_identifier: + return False + elif self.template_attribute != other.template_attribute: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, CreateResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "object_type={}".format(self.object_type), + "unique_identifier='{}'".format(self.unique_identifier), + "template_attribute={}".format(repr(self.template_attribute)) + ]) + return "CreateResponsePayload({})".format(args) + + def __str__(self): + value = ", ".join( + [ + '"object_type": {}'.format(self.object_type), + '"unique_identifier": "{}"'.format(self.unique_identifier), + '"template_attribute": {}'.format(self.template_attribute) + ] + ) + return '{' + value + '}' diff --git a/kmip/demos/units/create.py b/kmip/demos/units/create.py index 60179ef..bfd2109 100644 --- a/kmip/demos/units/create.py +++ b/kmip/demos/units/create.py @@ -130,8 +130,8 @@ if __name__ == '__main__': if result.result_status.value == ResultStatus.SUCCESS: logger.info('created object type: {0}'.format( - result.object_type.value)) - logger.info('created UUID: {0}'.format(result.uuid.value)) + result.object_type)) + logger.info('created UUID: {0}'.format(result.uuid)) logger.info('created template attribute: {0}'. format(result.template_attribute)) else: diff --git a/kmip/pie/client.py b/kmip/pie/client.py index 0506700..c16ee62 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -247,8 +247,7 @@ class ProxyKmipClient(object): status = result.result_status.value if status == enums.ResultStatus.SUCCESS: - uid = result.uuid.value - return uid + return result.uuid else: reason = result.result_reason.value message = result.result_message.value diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 0eb6fab..e005db1 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -310,7 +310,6 @@ class KMIPProxy(object): self.socket = None def create(self, object_type, template_attribute, credential=None): - object_type = attr.ObjectType(object_type) return self._create(object_type=object_type, template_attribute=template_attribute, credential=credential) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index e24b35a..b297b3d 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1017,7 +1017,7 @@ class KmipEngine(object): def _process_create(self, payload): self._logger.info("Processing operation: Create") - object_type = payload.object_type.value + object_type = payload.object_type template_attribute = payload.template_attribute if object_type != enums.ObjectType.SYMMETRIC_KEY: @@ -1097,9 +1097,7 @@ class KmipEngine(object): response_payload = payloads.CreateResponsePayload( object_type=payload.object_type, - unique_identifier=attributes.UniqueIdentifier( - str(managed_object.unique_identifier) - ), + unique_identifier=str(managed_object.unique_identifier), template_attribute=None ) diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index 06465fd..09dea3a 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -338,11 +338,11 @@ class TestIntegration(TestCase): result = self._create_symmetric_key(key_name=key_name) self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS) - self._check_object_type(result.object_type.value, ObjectType, + self._check_object_type(result.object_type, ObjectType, ObjectType.SYMMETRIC_KEY) - self._check_uuid(result.uuid.value, str) + self._check_uuid(result.uuid, str) - result = self.client.get(uuid=result.uuid.value, credential=None) + result = self.client.get(uuid=result.uuid, credential=None) self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS) self._check_object_type(result.object_type, ObjectType, @@ -1160,10 +1160,10 @@ class TestIntegration(TestCase): """ key_name = 'Integration Test - Create-GetAttributeList-Destroy Key' result = self._create_symmetric_key(key_name=key_name) - uid = result.uuid.value + uid = result.uuid self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) - self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type.value) + self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type) self.assertIsInstance(uid, str) try: diff --git a/kmip/tests/integration/services/test_kmip_client.py b/kmip/tests/integration/services/test_kmip_client.py index 47de636..43164d2 100644 --- a/kmip/tests/integration/services/test_kmip_client.py +++ b/kmip/tests/integration/services/test_kmip_client.py @@ -104,9 +104,9 @@ class TestKMIPClientIntegration(TestCase): self._check_result_status(result.result_status.value, ResultStatus, ResultStatus.SUCCESS) - self._check_object_type(result.object_type.value, ObjectType, + self._check_object_type(result.object_type, ObjectType, ObjectType.SYMMETRIC_KEY) - self._check_uuid(result.uuid.value, str) + self._check_uuid(result.uuid, str) # Check the template attribute type self._check_template_attribute(result.template_attribute, @@ -122,7 +122,7 @@ class TestKMIPClientIntegration(TestCase): credential = self.cred_factory.create_credential(credential_type, credential_value) result = self._create_symmetric_key() - uuid = result.uuid.value + uuid = result.uuid result = self.client.get(uuid=uuid, credential=credential) @@ -146,7 +146,7 @@ class TestKMIPClientIntegration(TestCase): credential = self.cred_factory.create_credential(credential_type, credential_value) result = self._create_symmetric_key() - uuid = result.uuid.value + uuid = result.uuid # Verify the secret was created result = self.client.get(uuid=uuid, credential=credential) diff --git a/kmip/tests/unit/core/messages/payloads/test_create.py b/kmip/tests/unit/core/messages/payloads/test_create.py index 87b311e..c78b0cf 100644 --- a/kmip/tests/unit/core/messages/payloads/test_create.py +++ b/kmip/tests/unit/core/messages/payloads/test_create.py @@ -12,3 +12,1445 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import testtools + +from kmip.core import enums +from kmip.core import exceptions +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils + +from kmip.core.messages import payloads + + +class TestCreateRequestPayload(testtools.TestCase): + + def setUp(self): + super(TestCreateRequestPayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, + # Section 3.1.1. + # + # This encoding matches the following set of values: + # Request Payload + # Object Type - Symmetric Key + # Template Attribute + # Attribute + # Attribute Name - Cryptographic Algorithm + # Attribute Value - AES + # Attribute + # Attribute Name - Cryptographic Length + # Attribute Value - 128 + # Attribute + # Attribute Name - Cryptographic Usage Mask + # Attribute Value - Encrypt | Decrypt + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\xC0' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x91\x01\x00\x00\x00\xA8' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x17' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x41\x6C' + b'\x67\x6F\x72\x69\x74\x68\x6D\x00' + b'\x42\x00\x0B\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x14' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x4C\x65' + b'\x6E\x67\x74\x68\x00\x00\x00\x00' + b'\x42\x00\x0B\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x18' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x55\x73' + b'\x61\x67\x65\x20\x4D\x61\x73\x6B' + b'\x42\x00\x0B\x02\x00\x00\x00\x04\x00\x00\x00\x0C\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, + # Section 3.1.1. + # + # This encoding matches the following set of values: + # Request Payload + # Template Attribute + # Attribute + # Attribute Name - Cryptographic Algorithm + # Attribute Value - AES + # Attribute + # Attribute Name - Cryptographic Length + # Attribute Value - 128 + # Attribute + # Attribute Name - Cryptographic Usage Mask + # Attribute Value - Encrypt | Decrypt + self.no_object_type_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\xB0' + b'\x42\x00\x91\x01\x00\x00\x00\xA8' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x17' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x41\x6C' + b'\x67\x6F\x72\x69\x74\x68\x6D\x00' + b'\x42\x00\x0B\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x14' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x4C\x65' + b'\x6E\x67\x74\x68\x00\x00\x00\x00' + b'\x42\x00\x0B\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x18' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x55\x73' + b'\x61\x67\x65\x20\x4D\x61\x73\x6B' + b'\x42\x00\x0B\x02\x00\x00\x00\x04\x00\x00\x00\x0C\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, + # Section 3.1.1. + # + # This encoding matches the following set of values: + # Request Payload + # Object Type - Symmetric Key + self.no_template_attribute_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x10' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestCreateRequestPayload, self).tearDown() + + def test_invalid_object_type(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the object type of a Create request payload. + """ + kwargs = {'object_type': 'invalid'} + self.assertRaisesRegex( + TypeError, + "Object type must be an ObjectType enumeration.", + payloads.CreateRequestPayload, + **kwargs + ) + + args = ( + payloads.CreateRequestPayload(), + 'object_type', + 'invalid' + ) + self.assertRaisesRegex( + TypeError, + "Object type must be an ObjectType enumeration.", + setattr, + *args + ) + + def test_invalid_template_attribute(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the template attribute of a Create request payload. + """ + kwargs = {'template_attribute': 'invalid'} + self.assertRaisesRegex( + TypeError, + "Template attribute must be a TemplateAttribute structure.", + payloads.CreateRequestPayload, + **kwargs + ) + + args = ( + payloads.CreateRequestPayload(), + 'template_attribute', + 'invalid' + ) + self.assertRaisesRegex( + TypeError, + "Template attribute must be a TemplateAttribute structure.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Create request payload can be read from a data stream. + """ + payload = payloads.CreateRequestPayload() + + self.assertEqual(None, payload.object_type) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.full_encoding) + + self.assertEqual( + enums.ObjectType.SYMMETRIC_KEY, + payload.object_type + ) + self.assertEqual( + objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Usage Mask' + ), + attribute_value=primitives.Integer( + value=( + enums.CryptographicUsageMask.ENCRYPT.value | + enums.CryptographicUsageMask.DECRYPT.value + ), + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ), + payload.template_attribute + ) + + def test_read_missing_object_type(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of a Create request payload when the object type is missing from the + encoding. + """ + payload = payloads.CreateRequestPayload() + + self.assertIsNone(payload.object_type) + self.assertIsNone(payload.template_attribute) + + args = (self.no_object_type_encoding, ) + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The Create request payload encoding is missing the object type.", + payload.read, + *args + ) + + def test_read_missing_template_attribute(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of a Create request payload when the template attribute is missing + from the encoding. + """ + payload = payloads.CreateRequestPayload() + + self.assertIsNone(payload.object_type) + self.assertIsNone(payload.template_attribute) + + args = (self.no_template_attribute_encoding, ) + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The Create request payload encoding is missing the template " + "attribute.", + payload.read, + *args + ) + + def test_write(self): + """ + Test that a Create request payload can be written to a data stream. + """ + payload = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Usage Mask' + ), + attribute_value=primitives.Integer( + value=( + enums.CryptographicUsageMask.ENCRYPT.value | + enums.CryptographicUsageMask.DECRYPT.value + ), + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ) + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_object_type(self): + """ + Test that an InvalidField error is raised during the encoding of a + Create request payload when the payload is missing the object type. + """ + payload = payloads.CreateRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The Create request payload is missing the object type field.", + payload.write, + *args + ) + + def test_write_missing_template_attribute(self): + """ + Test that an InvalidField error is raised during the encoding of a + Create request payload when the payload is missing the template + attribute. + """ + payload = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY + ) + + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The Create request payload is missing the template attribute " + "field.", + payload.write, + *args + ) + + def test_repr(self): + """ + Test that repr can be applied to a Create request payload structure. + """ + payload = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + self.assertEqual( + "CreateRequestPayload(" + "object_type=ObjectType.SYMMETRIC_KEY, " + "template_attribute=Struct())", + repr(payload) + ) + + def test_str(self): + """ + Test that str can be applied to a Create request payload structure. + """ + payload = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + self.assertEqual( + '{' + '"object_type": ObjectType.SYMMETRIC_KEY, ' + '"template_attribute": Struct()' + '}', + str(payload) + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Create + request payloads with the same data. + """ + a = payloads.CreateRequestPayload() + b = payloads.CreateRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Usage Mask' + ), + attribute_value=primitives.Integer( + value=( + enums.CryptographicUsageMask.ENCRYPT.value | + enums.CryptographicUsageMask.DECRYPT.value + ), + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ) + ) + b = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Usage Mask' + ), + attribute_value=primitives.Integer( + value=( + enums.CryptographicUsageMask.ENCRYPT.value | + enums.CryptographicUsageMask.DECRYPT.value + ), + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ) + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_object_type(self): + """ + Test that the equality operator returns False when comparing two Create + request payloads with different object types. + """ + a = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY + ) + b = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SECRET_DATA + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_template_attribute(self): + """ + Test that the equality operator returns False when comparing two Create + request payloads with different template attributes. + """ + a = payloads.CreateRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + ) + b = payloads.CreateRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two Create + request payloads with different types. + """ + a = payloads.CreateRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Create request payloads with the same data. + """ + a = payloads.CreateRequestPayload() + b = payloads.CreateRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Usage Mask' + ), + attribute_value=primitives.Integer( + value=( + enums.CryptographicUsageMask.ENCRYPT.value | + enums.CryptographicUsageMask.DECRYPT.value + ), + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ) + ) + b = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Usage Mask' + ), + attribute_value=primitives.Integer( + value=( + enums.CryptographicUsageMask.ENCRYPT.value | + enums.CryptographicUsageMask.DECRYPT.value + ), + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ) + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_object_type(self): + """ + Test that the inequality operator returns True when comparing two + Create request payloads with different object types. + """ + a = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY + ) + b = payloads.CreateRequestPayload( + object_type=enums.ObjectType.SECRET_DATA + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_template_attribute(self): + """ + Test that the inequality operator returns True when comparing two + Create request payloads with different template attributes. + """ + a = payloads.CreateRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + ) + b = payloads.CreateRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Create request payloads with different types. + """ + a = payloads.CreateRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + +class TestCreateResponsePayload(testtools.TestCase): + + def setUp(self): + super(TestCreateResponsePayload, self).setUp() + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 3.1.1. The TemplateAttribute was added manually from the + # Create request payload encoding. + # + # This encoding matches the following set of values: + # Response Payload + # Object Type - Symmetric Key + # Unique Identifier - fb4b5b9c-6188-4c63-8142-fe9c328129fc + # Template Attribute + # Attribute + # Attribute Name - State + # Attribute Value - PRE_ACTIVE + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x70' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63' + b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31' + b'\x32\x39\x66\x63\x00\x00\x00\x00' + b'\x42\x00\x91\x01\x00\x00\x00\x28' + b'\x42\x00\x08\x01\x00\x00\x00\x20' + b'\x42\x00\x0A\x07\x00\x00\x00\x05' + b'\x53\x74\x61\x74\x65\x00\x00\x00' + b'\x42\x00\x0B\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 3.1.1. The TemplateAttribute was added manually from the + # Create request payload encoding. + # + # This encoding matches the following set of values: + # Response Payload + # Unique Identifier - fb4b5b9c-6188-4c63-8142-fe9c328129fc + # Template Attribute + # Attribute + # Attribute Name - State + # Attribute Value - PRE_ACTIVE + self.no_object_type_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x60' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63' + b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31' + b'\x32\x39\x66\x63\x00\x00\x00\x00' + b'\x42\x00\x91\x01\x00\x00\x00\x28' + b'\x42\x00\x08\x01\x00\x00\x00\x20' + b'\x42\x00\x0A\x07\x00\x00\x00\x05' + b'\x53\x74\x61\x74\x65\x00\x00\x00' + b'\x42\x00\x0B\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 3.1.1. The TemplateAttribute was added manually from the + # Create request payload encoding. + # + # This encoding matches the following set of values: + # Response Payload + # Object Type - Symmetric Key + # Template Attribute + # Attribute + # Attribute Name - State + # Attribute Value - PRE_ACTIVE + self.no_unique_identifier_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x50' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x91\x01\x00\x00\x00\x28' + b'\x42\x00\x08\x01\x00\x00\x00\x20' + b'\x42\x00\x0A\x07\x00\x00\x00\x05' + b'\x53\x74\x61\x74\x65\x00\x00\x00' + b'\x42\x00\x0B\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 3.1.1. + # + # This encoding matches the following set of values: + # Response Payload + # Object Type - Symmetric Key + # Unique Identifier - fb4b5b9c-6188-4c63-8142-fe9c328129fc + self.no_template_attribute_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x40' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63' + b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31' + b'\x32\x39\x66\x63\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestCreateResponsePayload, self).tearDown() + + def test_invalid_object_type(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the object type of a Create response payload. + """ + kwargs = {'object_type': 'invalid'} + self.assertRaisesRegex( + TypeError, + "Object type must be an ObjectType enumeration.", + payloads.CreateResponsePayload, + **kwargs + ) + + args = ( + payloads.CreateResponsePayload(), + 'object_type', + 'invalid' + ) + self.assertRaisesRegex( + TypeError, + "Object type must be an ObjectType enumeration.", + setattr, + *args + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Create response payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegex( + TypeError, + "Unique identifier must be a string.", + payloads.CreateResponsePayload, + **kwargs + ) + + args = (payloads.CreateResponsePayload(), 'unique_identifier', 0) + self.assertRaisesRegex( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_invalid_template_attribute(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the template attribute of a Create response payload. + """ + kwargs = {'template_attribute': 'invalid'} + self.assertRaisesRegex( + TypeError, + "Template attribute must be a TemplateAttribute structure.", + payloads.CreateResponsePayload, + **kwargs + ) + + args = ( + payloads.CreateResponsePayload(), + 'template_attribute', + 'invalid' + ) + self.assertRaisesRegex( + TypeError, + "Template attribute must be a TemplateAttribute structure.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Create response payload can be read from a data stream. + """ + payload = payloads.CreateResponsePayload() + + self.assertEqual(None, payload.object_type) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.full_encoding) + + self.assertEqual( + enums.ObjectType.SYMMETRIC_KEY, + payload.object_type + ) + self.assertEqual( + 'fb4b5b9c-6188-4c63-8142-fe9c328129fc', + payload.unique_identifier + ) + self.assertEqual( + objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'State' + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ), + payload.template_attribute + ) + + def test_read_missing_object_type(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of a Create response payload when the object type is missing from the + encoding. + """ + payload = payloads.CreateResponsePayload() + + self.assertIsNone(payload.object_type) + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload.template_attribute) + + args = (self.no_object_type_encoding, ) + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The Create response payload encoding is missing the object type.", + payload.read, + *args + ) + + def test_read_missing_unique_identifier(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of a Create response payload when the unique identifier is missing + from the encoding. + """ + payload = payloads.CreateResponsePayload() + + self.assertIsNone(payload.object_type) + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload.template_attribute) + + args = (self.no_unique_identifier_encoding, ) + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The Create response payload encoding is missing the unique " + "identifier.", + payload.read, + *args + ) + + def test_read_missing_template_attribute(self): + """ + Test that a Create response payload can be read from a data stream + event when missing the template attribute. + """ + payload = payloads.CreateResponsePayload() + + self.assertEqual(None, payload.object_type) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.no_template_attribute_encoding) + + self.assertEqual( + enums.ObjectType.SYMMETRIC_KEY, + payload.object_type + ) + self.assertEqual( + 'fb4b5b9c-6188-4c63-8142-fe9c328129fc', + payload.unique_identifier + ) + self.assertIsNone(payload.template_attribute) + + def test_write(self): + """ + Test that a Create response payload can be written to a data stream. + """ + payload = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_object_type(self): + """ + Test that an InvalidField error is raised during the encoding of a + Create response payload when the payload is missing the object type. + """ + payload = payloads.CreateResponsePayload( + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The Create response payload is missing the object type field.", + payload.write, + *args + ) + + def test_write_missing_unique_identifier(self): + """ + Test that an InvalidField error is raised during the encoding of a + Create response payload when the payload is missing the unique + identifier. + """ + payload = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The Create response payload is missing the unique identifier " + "field.", + payload.write, + *args + ) + + def test_write_missing_template_attribute(self): + """ + Test that a Create response payload can be written to a data stream + even when missing the template attribute. + """ + payload = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc" + ) + + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.no_template_attribute_encoding), len(stream)) + self.assertEqual(str(self.no_template_attribute_encoding), str(stream)) + + def test_repr(self): + """ + Test that repr can be applied to a Create response payload structure. + """ + payload = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + self.assertEqual( + "CreateResponsePayload(" + "object_type=ObjectType.SYMMETRIC_KEY, " + "unique_identifier='fb4b5b9c-6188-4c63-8142-fe9c328129fc', " + "template_attribute=Struct())", + repr(payload) + ) + + def test_str(self): + """ + Test that str can be applied to a Create response payload structure. + """ + payload = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + self.assertEqual( + '{' + '"object_type": ObjectType.SYMMETRIC_KEY, ' + '"unique_identifier": "fb4b5b9c-6188-4c63-8142-fe9c328129fc", ' + '"template_attribute": Struct()' + '}', + str(payload) + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Create + response payloads with the same data. + """ + a = payloads.CreateResponsePayload() + b = payloads.CreateResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + b = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_object_type(self): + """ + Test that the equality operator returns False when comparing two Create + response payloads with different object types. + """ + a = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY + ) + b = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SECRET_DATA + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two Create + response payloads with different unique identifiers. + """ + a = payloads.CreateResponsePayload(unique_identifier="a") + b = payloads.CreateResponsePayload(unique_identifier="b") + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_template_attribute(self): + """ + Test that the equality operator returns False when comparing two Create + response payloads with different template attributes. + """ + a = payloads.CreateResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + b = payloads.CreateResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two Create + response payloads with different types. + """ + a = payloads.CreateResponsePayload() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Create response payloads with the same data. + """ + a = payloads.CreateResponsePayload() + b = payloads.CreateResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + b = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier="fb4b5b9c-6188-4c63-8142-fe9c328129fc", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_object_type(self): + """ + Test that the inequality operator returns True when comparing two + Create response payloads with different object types. + """ + a = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY + ) + b = payloads.CreateResponsePayload( + object_type=enums.ObjectType.SECRET_DATA + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Create response payloads with different unique identifiers. + """ + a = payloads.CreateResponsePayload(unique_identifier="a") + b = payloads.CreateResponsePayload(unique_identifier="b") + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_template_attribute(self): + """ + Test that the inequality operator returns True when comparing two + Create response payloads with different template attributes. + """ + a = payloads.CreateResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + b = payloads.CreateResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Create response payloads with different types. + """ + a = payloads.CreateResponsePayload() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index 7520590..e460286 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -15,6 +15,7 @@ from testtools import TestCase import binascii +import six from kmip.core.factories.secrets import SecretFactory from kmip.core.factories.attributes import AttributeFactory @@ -245,13 +246,13 @@ class TestRequestMessage(TestCase): object_type = request_payload.object_type msg = "Bad object type type: expected {0}, received {1}" - self.assertIsInstance(object_type, attr.ObjectType, - msg.format(attr.ObjectType, + self.assertIsInstance(object_type, enums.ObjectType, + msg.format(enums.ObjectType, type(object_type))) msg = "Bad object type value: expected {0}, received {1}" - self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, object_type.value, + self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, object_type, msg.format(enums.ObjectType.SYMMETRIC_KEY, - object_type.value)) + object_type)) template_attribute = request_payload.template_attribute msg = "Bad template attribute type: expected {0}, received {1}" @@ -359,7 +360,7 @@ class TestRequestMessage(TestCase): batch_count=batch_count) operation = contents.Operation(enums.Operation.CREATE) - object_type = attr.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + object_type = enums.ObjectType.SYMMETRIC_KEY name = AttributeType.CRYPTOGRAPHIC_ALGORITHM value = CryptoAlgorithmEnum.AES @@ -1363,24 +1364,24 @@ class TestResponseMessage(TestCase): exp_type, rcv_type)) object_type = response_payload.object_type - self.assertIsInstance(object_type, attr.ObjectType, + self.assertIsInstance(object_type, enums.ObjectType, self.msg.format('object type', 'type', - attr.ObjectType, + enums.ObjectType, type(object_type))) - self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, object_type.value, + self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, object_type, self.msg.format('object type', 'value', enums.ObjectType.SYMMETRIC_KEY, - object_type.value)) + object_type)) unique_identifier = response_payload.unique_identifier value = 'fb4b5b9c-6188-4c63-8142-fe9c328129fc' - self.assertIsInstance(unique_identifier, attr.UniqueIdentifier, + self.assertIsInstance(unique_identifier, six.string_types, self.msg.format('unique identifier', 'type', - attr.UniqueIdentifier, + six.string_types, type(unique_identifier))) - self.assertEqual(value, unique_identifier.value, + self.assertEqual(value, unique_identifier, self.msg.format('unique identifier', 'value', - unique_identifier.value, value)) + unique_identifier, value)) def test_create_response_write(self): prot_ver = contents.ProtocolVersion(1, 1) @@ -1394,13 +1395,12 @@ class TestResponseMessage(TestCase): batch_count=batch_count) operation = contents.Operation(enums.Operation.CREATE) result_status = contents.ResultStatus(enums.ResultStatus.SUCCESS) - object_type = attr.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + object_type = enums.ObjectType.SYMMETRIC_KEY uuid = 'fb4b5b9c-6188-4c63-8142-fe9c328129fc' - uniq_id = attr.UniqueIdentifier(uuid) resp_pl = payloads.CreateResponsePayload( object_type=object_type, - unique_identifier=uniq_id + unique_identifier=uuid ) batch_item = messages.ResponseBatchItem(operation=operation, result_status=result_status, diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index 41e1c20..efcf60b 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -229,7 +229,8 @@ class TestProxyKmipClient(testtools.TestCase): status = enums.ResultStatus.SUCCESS result = results.CreateResult( contents.ResultStatus(status), - uuid=attr.UniqueIdentifier(key_id)) + uuid=key_id + ) with ProxyKmipClient() as client: client.proxy.create.return_value = result diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 551bfab..7bd3359 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -2395,7 +2395,7 @@ class TestKmipEngine(testtools.TestCase): attribute_factory = factory.AttributeFactory() # Build Create request - object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + object_type = enums.ObjectType.SYMMETRIC_KEY template_attribute = objects.TemplateAttribute( attributes=[ attribute_factory.create_attribute( @@ -2439,7 +2439,7 @@ class TestKmipEngine(testtools.TestCase): "Processing operation: Create" ) - uid = response_payload.unique_identifier.value + uid = response_payload.unique_identifier self.assertEqual('1', uid) # Retrieve the stored object and verify all attributes were set @@ -2487,7 +2487,7 @@ class TestKmipEngine(testtools.TestCase): e._data_session = e._data_store_session_factory() e._logger = mock.MagicMock() - object_type = attributes.ObjectType(enums.ObjectType.PUBLIC_KEY) + object_type = enums.ObjectType.PUBLIC_KEY payload = payloads.CreateRequestPayload( object_type ) @@ -2520,7 +2520,7 @@ class TestKmipEngine(testtools.TestCase): attribute_factory = factory.AttributeFactory() # Test the error for omitting the Cryptographic Algorithm - object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + object_type = enums.ObjectType.SYMMETRIC_KEY template_attribute = objects.TemplateAttribute( attributes=[ attribute_factory.create_attribute( @@ -2566,7 +2566,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.reset_mock() # Test the error for omitting the Cryptographic Length - object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + object_type = enums.ObjectType.SYMMETRIC_KEY template_attribute = objects.TemplateAttribute( attributes=[ attribute_factory.create_attribute( @@ -2612,7 +2612,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.reset_mock() # Test the error for omitting the Cryptographic Usage Mask - object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + object_type = enums.ObjectType.SYMMETRIC_KEY template_attribute = objects.TemplateAttribute( attributes=[ attribute_factory.create_attribute( @@ -8000,7 +8000,7 @@ class TestKmipEngine(testtools.TestCase): attribute_factory = factory.AttributeFactory() # Build a SymmetricKey for registration. - object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + object_type = enums.ObjectType.SYMMETRIC_KEY template_attribute = objects.TemplateAttribute( attributes=[ attribute_factory.create_attribute( @@ -8042,7 +8042,7 @@ class TestKmipEngine(testtools.TestCase): "Processing operation: Create" ) - uid = response_payload.unique_identifier.value + uid = response_payload.unique_identifier self.assertEqual('1', uid) e._logger.reset_mock()