From 4a9690165af9e8b472373d870cc6d9c774ad9364 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Thu, 9 May 2019 10:28:36 -0400 Subject: [PATCH] Update the RequestBatchItem to support the ephemeral field This change updates the RequestBatchItem to support the new ephemeral field added in KMIP 2.0. Unit tests have been added to cover the change. --- kmip/core/messages/messages.py | 34 +++++- .../tests/unit/core/messages/test_messages.py | 103 ++++++++++++++++++ 2 files changed, 136 insertions(+), 1 deletion(-) diff --git a/kmip/core/messages/messages.py b/kmip/core/messages/messages.py index a7cf9f2..fd4e811 100644 --- a/kmip/core/messages/messages.py +++ b/kmip/core/messages/messages.py @@ -23,6 +23,7 @@ from kmip.core.messages.contents import BatchErrorContinuationOption from kmip.core.factories.payloads.request import RequestPayloadFactory from kmip.core.factories.payloads.response import ResponsePayloadFactory +from kmip.core import primitives from kmip.core.primitives import Struct from kmip.core.utils import BytearrayStream @@ -199,7 +200,8 @@ class RequestBatchItem(Struct): operation=None, unique_batch_item_id=None, request_payload=None, - message_extension=None): + message_extension=None, + ephemeral=None): super(RequestBatchItem, self).__init__(tag=Tags.REQUEST_BATCH_ITEM) self.payload_factory = RequestPayloadFactory() @@ -208,6 +210,26 @@ class RequestBatchItem(Struct): self.unique_batch_item_id = unique_batch_item_id self.request_payload = request_payload self.message_extension = message_extension + self.ephemeral = ephemeral + + @property + def ephemeral(self): + if self._ephemeral: + return self._ephemeral.value + return None + + @ephemeral.setter + def ephemeral(self, value): + if value is None: + self._ephemeral = None + elif isinstance(value, bool): + ephemeral = primitives.Boolean( + value=value, + tag=enums.Tags.EPHEMERAL + ) + self._ephemeral = ephemeral + else: + raise TypeError("The ephemeral value must be a boolean.") def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): super(RequestBatchItem, self).read( @@ -220,6 +242,12 @@ class RequestBatchItem(Struct): self.operation = contents.Operation() self.operation.read(tstream, kmip_version=kmip_version) + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self.is_tag_next(enums.Tags.EPHEMERAL, tstream): + ephemeral = primitives.Boolean(tag=enums.Tags.EPHEMERAL) + ephemeral.read(tstream, kmip_version=kmip_version) + self._ephemeral = ephemeral + # Read the unique batch item ID if it is present if self.is_tag_next(Tags.UNIQUE_BATCH_ITEM_ID, tstream): self.unique_batch_item_id = contents.UniqueBatchItemID() @@ -244,6 +272,10 @@ class RequestBatchItem(Struct): # Write the contents of the batch item to the stream self.operation.write(tstream, kmip_version=kmip_version) + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self._ephemeral: + self._ephemeral.write(tstream, kmip_version=kmip_version) + if self.unique_batch_item_id is not None: self.unique_batch_item_id.write(tstream, kmip_version=kmip_version) diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index 1ab7212..ad25de0 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import testtools from testtools import TestCase import binascii import six @@ -53,6 +54,108 @@ from kmip.core import utils from kmip.core.utils import BytearrayStream +class TestRequestBatchItem(testtools.TestCase): + + def setUp(self): + super(TestRequestBatchItem, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, + # Section 3.1.1. + # + # This encoding matches the following set of values: + # Request Batch Item + # Operation - Destroy + # Request Payload + # Unique Identifier - fb4b5b9c-6188-4c63-8142-fe9c328129fc + self.encoding_kmip_2_0 = utils.BytearrayStream( + b'\x42\x00\x0F\x01\x00\x00\x00\x58' + b'\x42\x00\x5C\x05\x00\x00\x00\x04\x00\x00\x00\x14\x00\x00\x00\x00' + b'\x42\x01\x54\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\x79\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63' + b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31' + b'\x32\x39\x66\x63\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestRequestBatchItem, self).tearDown() + + def test_invalid_ephemeral(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the ephemeral value of a RequestBatchItem. + """ + kwargs = {"ephemeral": "invalid"} + self.assertRaisesRegex( + TypeError, + "The ephemeral value must be a boolean.", + messages.RequestBatchItem, + **kwargs + ) + + args = ( + messages.RequestBatchItem(), + "ephemeral", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The ephemeral value must be a boolean.", + setattr, + *args + ) + + def test_read_kmip_2_0(self): + """ + Test that a RequestBatchItem structure can be correctly read in + from a data stream when including KMIP 2.0 fields. + """ + request_batch_item = messages.RequestBatchItem() + + self.assertIsNone(request_batch_item.operation) + self.assertIsNone(request_batch_item.ephemeral) + + request_batch_item.read( + self.encoding_kmip_2_0, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual( + enums.Operation.DESTROY, + request_batch_item.operation.value + ) + self.assertTrue(request_batch_item.ephemeral) + # self.assertEqual( + # payloads.DestroyRequestPayload( + # unique_identifier=attr.UniqueIdentifier( + # value="fb4b5b9c-6188-4c63-8142-fe9c328129fc" + # ) + # ), + # request_batch_item.request_payload + # ) + + def test_write_kmip_2_0(self): + request_batch_item = messages.RequestBatchItem( + operation=contents.Operation(enums.Operation.DESTROY), + ephemeral=True, + request_payload=payloads.DestroyRequestPayload( + unique_identifier=attr.UniqueIdentifier( + "fb4b5b9c-6188-4c63-8142-fe9c328129fc" + ) + ) + ) + + buffer = utils.BytearrayStream() + request_batch_item.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual(len(self.encoding_kmip_2_0), len(buffer)) + self.assertEqual(str(self.encoding_kmip_2_0), str(buffer)) + + class TestRequestMessage(TestCase): def setUp(self):