diff --git a/kmip/core/messages/messages.py b/kmip/core/messages/messages.py index a7cf9f2..fd4e811 100644 --- a/kmip/core/messages/messages.py +++ b/kmip/core/messages/messages.py @@ -23,6 +23,7 @@ from kmip.core.messages.contents import BatchErrorContinuationOption from kmip.core.factories.payloads.request import RequestPayloadFactory from kmip.core.factories.payloads.response import ResponsePayloadFactory +from kmip.core import primitives from kmip.core.primitives import Struct from kmip.core.utils import BytearrayStream @@ -199,7 +200,8 @@ class RequestBatchItem(Struct): operation=None, unique_batch_item_id=None, request_payload=None, - message_extension=None): + message_extension=None, + ephemeral=None): super(RequestBatchItem, self).__init__(tag=Tags.REQUEST_BATCH_ITEM) self.payload_factory = RequestPayloadFactory() @@ -208,6 +210,26 @@ class RequestBatchItem(Struct): self.unique_batch_item_id = unique_batch_item_id self.request_payload = request_payload self.message_extension = message_extension + self.ephemeral = ephemeral + + @property + def ephemeral(self): + if self._ephemeral: + return self._ephemeral.value + return None + + @ephemeral.setter + def ephemeral(self, value): + if value is None: + self._ephemeral = None + elif isinstance(value, bool): + ephemeral = primitives.Boolean( + value=value, + tag=enums.Tags.EPHEMERAL + ) + self._ephemeral = ephemeral + else: + raise TypeError("The ephemeral value must be a boolean.") def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): super(RequestBatchItem, self).read( @@ -220,6 +242,12 @@ class RequestBatchItem(Struct): self.operation = contents.Operation() self.operation.read(tstream, kmip_version=kmip_version) + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self.is_tag_next(enums.Tags.EPHEMERAL, tstream): + ephemeral = primitives.Boolean(tag=enums.Tags.EPHEMERAL) + ephemeral.read(tstream, kmip_version=kmip_version) + self._ephemeral = ephemeral + # Read the unique batch item ID if it is present if self.is_tag_next(Tags.UNIQUE_BATCH_ITEM_ID, tstream): self.unique_batch_item_id = contents.UniqueBatchItemID() @@ -244,6 +272,10 @@ class RequestBatchItem(Struct): # Write the contents of the batch item to the stream self.operation.write(tstream, kmip_version=kmip_version) + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self._ephemeral: + self._ephemeral.write(tstream, kmip_version=kmip_version) + if self.unique_batch_item_id is not None: self.unique_batch_item_id.write(tstream, kmip_version=kmip_version) diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index 1ab7212..ad25de0 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import testtools from testtools import TestCase import binascii import six @@ -53,6 +54,108 @@ from kmip.core import utils from kmip.core.utils import BytearrayStream +class TestRequestBatchItem(testtools.TestCase): + + def setUp(self): + super(TestRequestBatchItem, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, + # Section 3.1.1. + # + # This encoding matches the following set of values: + # Request Batch Item + # Operation - Destroy + # Request Payload + # Unique Identifier - fb4b5b9c-6188-4c63-8142-fe9c328129fc + self.encoding_kmip_2_0 = utils.BytearrayStream( + b'\x42\x00\x0F\x01\x00\x00\x00\x58' + b'\x42\x00\x5C\x05\x00\x00\x00\x04\x00\x00\x00\x14\x00\x00\x00\x00' + b'\x42\x01\x54\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\x79\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63' + b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31' + b'\x32\x39\x66\x63\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestRequestBatchItem, self).tearDown() + + def test_invalid_ephemeral(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the ephemeral value of a RequestBatchItem. + """ + kwargs = {"ephemeral": "invalid"} + self.assertRaisesRegex( + TypeError, + "The ephemeral value must be a boolean.", + messages.RequestBatchItem, + **kwargs + ) + + args = ( + messages.RequestBatchItem(), + "ephemeral", + "invalid" + ) + self.assertRaisesRegex( + TypeError, + "The ephemeral value must be a boolean.", + setattr, + *args + ) + + def test_read_kmip_2_0(self): + """ + Test that a RequestBatchItem structure can be correctly read in + from a data stream when including KMIP 2.0 fields. + """ + request_batch_item = messages.RequestBatchItem() + + self.assertIsNone(request_batch_item.operation) + self.assertIsNone(request_batch_item.ephemeral) + + request_batch_item.read( + self.encoding_kmip_2_0, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual( + enums.Operation.DESTROY, + request_batch_item.operation.value + ) + self.assertTrue(request_batch_item.ephemeral) + # self.assertEqual( + # payloads.DestroyRequestPayload( + # unique_identifier=attr.UniqueIdentifier( + # value="fb4b5b9c-6188-4c63-8142-fe9c328129fc" + # ) + # ), + # request_batch_item.request_payload + # ) + + def test_write_kmip_2_0(self): + request_batch_item = messages.RequestBatchItem( + operation=contents.Operation(enums.Operation.DESTROY), + ephemeral=True, + request_payload=payloads.DestroyRequestPayload( + unique_identifier=attr.UniqueIdentifier( + "fb4b5b9c-6188-4c63-8142-fe9c328129fc" + ) + ) + ) + + buffer = utils.BytearrayStream() + request_batch_item.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual(len(self.encoding_kmip_2_0), len(buffer)) + self.assertEqual(str(self.encoding_kmip_2_0), str(buffer)) + + class TestRequestMessage(TestCase): def setUp(self):