diff --git a/kmip/core/messages/payloads/register.py b/kmip/core/messages/payloads/register.py index fceca96..525dc4c 100644 --- a/kmip/core/messages/payloads/register.py +++ b/kmip/core/messages/payloads/register.py @@ -174,17 +174,37 @@ class RegisterRequestPayload(primitives.Struct): "type." ) - if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer): - self._template_attribute = objects.TemplateAttribute() - self._template_attribute.read( - local_buffer, - kmip_version=kmip_version - ) + if kmip_version < enums.KMIPVersion.KMIP_2_0: + if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer): + self._template_attribute = objects.TemplateAttribute() + self._template_attribute.read( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidKmipEncoding( + "The Register request payload encoding is missing the " + "template attribute." + ) else: - raise exceptions.InvalidKmipEncoding( - "The Register request payload encoding is missing the " - "template attribute." - ) + # NOTE (ph) For now, leave attributes natively in TemplateAttribute + # form and just convert to the KMIP 2.0 Attributes form as needed + # for encoding/decoding purposes. Changing the payload to require + # the new Attributes structure will trigger a bunch of second-order + # effects across the client and server codebases that is beyond + # the scope of updating the Register payloads to support KMIP 2.0. + if self.is_tag_next(enums.Tags.ATTRIBUTES, local_buffer): + attributes = objects.Attributes() + attributes.read(local_buffer, kmip_version=kmip_version) + value = objects.convert_attributes_to_template_attribute( + attributes + ) + self._template_attribute = value + else: + raise exceptions.InvalidKmipEncoding( + "The Register request payload encoding is missing the " + "attributes structure." + ) managed_object = self.secret_factory.create(self.object_type) @@ -224,16 +244,34 @@ class RegisterRequestPayload(primitives.Struct): "field." ) - if self._template_attribute: - self._template_attribute.write( - local_buffer, - kmip_version=kmip_version - ) + if kmip_version < enums.KMIPVersion.KMIP_2_0: + if self._template_attribute: + self._template_attribute.write( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidField( + "The Register request payload is missing the template " + "attribute field." + ) else: - raise exceptions.InvalidField( - "The Register request payload is missing the template " - "attribute field." - ) + # NOTE (ph) For now, leave attributes natively in TemplateAttribute + # form and just convert to the KMIP 2.0 Attributes form as needed + # for encoding/decoding purposes. Changing the payload to require + # the new Attributes structure will trigger a bunch of second-order + # effects across the client and server codebases that is beyond + # the scope of updating the Register payloads to support KMIP 2.0. + if self._template_attribute: + attributes = objects.convert_template_attribute_to_attributes( + self._template_attribute + ) + attributes.write(local_buffer, kmip_version=kmip_version) + else: + raise exceptions.InvalidField( + "The Register request payload is missing the template " + "attribute field." + ) if self._managed_object: self._managed_object.write(local_buffer, kmip_version=kmip_version) @@ -391,12 +429,13 @@ class RegisterResponsePayload(primitives.Struct): "identifier." ) - if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer): - self._template_attribute = objects.TemplateAttribute() - self._template_attribute.read( - local_buffer, - kmip_version=kmip_version - ) + if kmip_version < enums.KMIPVersion.KMIP_2_0: + if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer): + self._template_attribute = objects.TemplateAttribute() + self._template_attribute.read( + local_buffer, + kmip_version=kmip_version + ) self.is_oversized(local_buffer) @@ -427,11 +466,12 @@ class RegisterResponsePayload(primitives.Struct): "identifier field." ) - if self._template_attribute: - self._template_attribute.write( - local_buffer, - kmip_version=kmip_version - ) + if kmip_version < enums.KMIPVersion.KMIP_2_0: + if self._template_attribute: + self._template_attribute.write( + local_buffer, + kmip_version=kmip_version + ) self.length = local_buffer.length() super(RegisterResponsePayload, self).write( diff --git a/kmip/tests/unit/core/messages/payloads/test_register.py b/kmip/tests/unit/core/messages/payloads/test_register.py index 69151e5..20aa48d 100644 --- a/kmip/tests/unit/core/messages/payloads/test_register.py +++ b/kmip/tests/unit/core/messages/payloads/test_register.py @@ -142,6 +142,31 @@ class TestRegisterRequestPayload(testtools.TestCase): b'\x00\x00' ) + # Encoding obtained from the KMIP 1.1 testing document, Section 13.2.2. + # Modified to exclude the Link attribute. Manually converted into the + # KMIP 2.0 format. + # + # TODO (ph) Add the Link attribute back in once Links are supported. + # + # This encoding matches the following set of values: + # Request Payload + # Object Type - Certificate + # Attributes + # Cryptographic Usage Mask - Sign | Verify + # Certificate + # Certificate Type - X.509 + # Certificate Value - See comment for the full encoding. + self.full_encoding_with_attributes = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x03\x60' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x01\x25\x01\x00\x00\x00\x10' + b'\x42\x00\x2C\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x13\x01\x00\x00\x03\x30' + b'\x42\x00\x1D\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x1E\x08\x00\x00\x03\x16' + self.certificate_value + + b'\x00\x00' + ) + # Encoding obtained from the KMIP 1.1 testing document, Section 13.2.2. # Modified to exclude the Link attribute. # @@ -326,6 +351,48 @@ class TestRegisterRequestPayload(testtools.TestCase): payload.managed_object ) + def test_read_kmip_2_0(self): + """ + Test that a Register request payload can be read from a data stream + encoded with the KMIP 2.0 format. + """ + payload = payloads.RegisterRequestPayload() + + self.assertIsNone(payload.object_type) + self.assertIsNone(payload.template_attribute) + self.assertIsNone(payload.managed_object) + + payload.read( + self.full_encoding_with_attributes, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual(enums.ObjectType.CERTIFICATE, payload.object_type) + self.assertEqual( + objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Cryptographic Usage Mask" + ), + attribute_value=primitives.Integer( + enums.CryptographicUsageMask.SIGN.value | + enums.CryptographicUsageMask.VERIFY.value, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ), + payload.template_attribute + ) + self.assertEqual( + secrets.Certificate( + certificate_type=enums.CertificateType.X_509, + certificate_value=self.certificate_value + ), + payload.managed_object + ) + def test_read_missing_object_type(self): """ Test that an InvalidKmipEncoding error is raised during the decoding @@ -360,6 +427,25 @@ class TestRegisterRequestPayload(testtools.TestCase): *args ) + def test_read_missing_attributes(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of a Register request payload when the attributes structure is missing + from the encoding. + """ + payload = payloads.RegisterRequestPayload() + + args = (self.no_template_attribute_encoding, ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0} + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The Register request payload encoding is missing the attributes " + "structure.", + payload.read, + *args, + **kwargs + ) + def test_read_missing_managed_object(self): """ Test that an InvalidKmipEncoding error is raised during the decoding @@ -409,6 +495,39 @@ class TestRegisterRequestPayload(testtools.TestCase): self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) + def test_write_kmip_2_0(self): + """ + Test that a Register request payload can be written to a data stream + encoded with the KMIP 2.0 format. + """ + payload = payloads.RegisterRequestPayload( + object_type=enums.ObjectType.CERTIFICATE, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "Cryptographic Usage Mask" + ), + attribute_value=primitives.Integer( + enums.CryptographicUsageMask.SIGN.value | + enums.CryptographicUsageMask.VERIFY.value, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + ] + ), + managed_object=secrets.Certificate( + certificate_type=enums.CertificateType.X_509, + certificate_value=self.certificate_value + ) + ) + + stream = utils.BytearrayStream() + payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0) + + self.assertEqual(len(self.full_encoding_with_attributes), len(stream)) + self.assertEqual(str(self.full_encoding_with_attributes), str(stream)) + def test_write_missing_object_type(self): """ Test that an InvalidField error is raised during the encoding of a @@ -466,6 +585,31 @@ class TestRegisterRequestPayload(testtools.TestCase): *args ) + def test_write_missing_attributes(self): + """ + Test that an InvalidField error is raised during the encoding of a + Register request payload when the payload is missing the attributes + structure. + """ + payload = payloads.RegisterRequestPayload( + object_type=enums.ObjectType.CERTIFICATE, + managed_object=secrets.Certificate( + certificate_type=enums.CertificateType.X_509, + certificate_value=self.certificate_value + ) + ) + + args = (utils.BytearrayStream(), ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0} + self.assertRaisesRegex( + exceptions.InvalidField, + "The Register request payload is missing the template attribute " + "field.", + payload.write, + *args, + **kwargs + ) + def test_write_missing_managed_object(self): """ Test that an InvalidField error is raised during the encoding of a @@ -1020,6 +1164,27 @@ class TestRegisterResponsePayload(testtools.TestCase): payload.template_attribute ) + def test_read_kmip_2_0(self): + """ + Test that a Register response payload can be read from a data stream + encoded with the KMIP 2.0 format. + """ + payload = payloads.RegisterResponsePayload() + + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload.template_attribute) + + payload.read( + self.no_template_attribute_encoding, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual( + "7091d0bf-548a-4d4a-93a6-6dd71cf75221", + payload.unique_identifier + ) + self.assertIsNone(payload.template_attribute) + def test_read_missing_unique_identifier(self): """ Test that an InvalidKmipEncoding error is raised during the decoding @@ -1086,6 +1251,35 @@ class TestRegisterResponsePayload(testtools.TestCase): self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) + def test_write_kmip_2_0(self): + """ + Test that a Register response payload can be written to a data stream + encoded with the KMIP 2.0 format. + """ + payload = payloads.RegisterResponsePayload( + unique_identifier="7091d0bf-548a-4d4a-93a6-6dd71cf75221", + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + "State" + ), + attribute_value=primitives.Enumeration( + enums.State, + value=enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + ] + ) + ) + + stream = utils.BytearrayStream() + payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0) + + self.assertEqual(len(self.no_template_attribute_encoding), len(stream)) + self.assertEqual(str(self.no_template_attribute_encoding), str(stream)) + def test_write_missing_unique_identifier(self): """ Test that an InvalidField error is raised during the encoding of a