diff --git a/kmip/core/enums.py b/kmip/core/enums.py index 16b2c5b..deffd6e 100644 --- a/kmip/core/enums.py +++ b/kmip/core/enums.py @@ -16,6 +16,7 @@ # In case of new content, remove the following line to enable flake8 tests # flake8: noqa +import copy import enum @@ -473,11 +474,12 @@ class KeyWrapType(enum.Enum): class KMIPVersion(enum.Enum): - KMIP_1_0 = "KMIP 1.0" - KMIP_1_1 = "KMIP 1.1" - KMIP_1_2 = "KMIP 1.2" - KMIP_1_3 = "KMIP 1.3" - KMIP_1_4 = "KMIP 1.4" + KMIP_1_0 = 1.0 + KMIP_1_1 = 1.1 + KMIP_1_2 = 1.2 + KMIP_1_3 = 1.3 + KMIP_1_4 = 1.4 + KMIP_2_0 = 2.0 class LinkType(enum.Enum): @@ -1304,7 +1306,7 @@ class Tags(enum.Enum): ISSUER_DISTINGUISHED_NAME = 0x4200B2 SUBJECT_ALTERNATIVE_NAME = 0x4200B3 SUBJECT_DISTINGUISHED_NAME = 0x4200B4 - X_509_CERTIFICATE_IDENTIFER = 0x4200B5 + X_509_CERTIFICATE_IDENTIFIER = 0x4200B5 X_509_CERTIFICATE_ISSUER = 0x4200B6 X_509_CERTIFICATE_SUBJECT = 0x4200B7 KEY_VALUE_LOCATION = 0x4200B8 @@ -1558,3 +1560,168 @@ class WrappingMethod(enum.Enum): ENCRYPT_THEN_MAC_SIGN = 0x00000003 MAC_SIGN_THEN_ENCRYPT = 0x00000004 TR_31 = 0x00000005 + + +def is_enum_value(enum_class, potential_value): + """ + A utility function that checks if the enumeration class contains the + provided value. + + Args: + enum_class (class): One of the enumeration classes found in this file. + potential_value (int, string): A potential value of the enumeration + class. + + Returns: + True: if the potential value is a valid value of the enumeration class + False: otherwise + """ + try: + enum_class(potential_value) + except ValueError: + return False + + return True + + +def is_attribute(tag, kmip_version=None): + """ + A utility function that checks if the tag is a valid attribute tag. + + Args: + tag (enum): A Tags enumeration that may or may not correspond to a + KMIP attribute type. + kmip_version (enum): The KMIPVersion enumeration that should be used + when checking if the tag is a valid attribute tag. Optional, + defaults to None. If None, the tag is compared with all possible + attribute tags across all KMIP versions. Otherwise, only the + attribute tags for a specific KMIP version are checked. + + Returns: + True: if the tag is a valid attribute tag + False: otherwise + """ + kmip_1_0_attribute_tags = [ + Tags.UNIQUE_IDENTIFIER, + Tags.NAME, + Tags.OBJECT_TYPE, + Tags.CRYPTOGRAPHIC_ALGORITHM, + Tags.CRYPTOGRAPHIC_LENGTH, + Tags.CRYPTOGRAPHIC_PARAMETERS, + Tags.CRYPTOGRAPHIC_DOMAIN_PARAMETERS, + Tags.CERTIFICATE_TYPE, + Tags.CERTIFICATE_IDENTIFIER, + Tags.CERTIFICATE_SUBJECT, + Tags.CERTIFICATE_ISSUER, + Tags.DIGEST, + Tags.OPERATION_POLICY_NAME, + Tags.CRYPTOGRAPHIC_USAGE_MASK, + Tags.LEASE_TIME, + Tags.USAGE_LIMITS, + Tags.STATE, + Tags.INITIAL_DATE, + Tags.ACTIVATION_DATE, + Tags.PROCESS_START_DATE, + Tags.PROTECT_STOP_DATE, + Tags.DEACTIVATION_DATE, + Tags.DESTROY_DATE, + Tags.COMPROMISE_OCCURRENCE_DATE, + Tags.COMPROMISE_DATE, + Tags.REVOCATION_REASON, + Tags.ARCHIVE_DATE, + Tags.OBJECT_GROUP, + Tags.LINK, + Tags.APPLICATION_SPECIFIC_INFORMATION, + Tags.CONTACT_INFORMATION, + Tags.LAST_CHANGE_DATE, + Tags.CUSTOM_ATTRIBUTE + ] + kmip_1_1_attribute_tags = copy.deepcopy(kmip_1_0_attribute_tags) + [ + Tags.CERTIFICATE_LENGTH, + Tags.X_509_CERTIFICATE_IDENTIFIER, + Tags.X_509_CERTIFICATE_SUBJECT, + Tags.X_509_CERTIFICATE_ISSUER, + Tags.DIGITAL_SIGNATURE_ALGORITHM, + Tags.FRESH + ] + kmip_1_2_attribute_tags = copy.deepcopy(kmip_1_1_attribute_tags) + [ + Tags.ALTERNATIVE_NAME, + Tags.KEY_VALUE_PRESENT, + Tags.KEY_VALUE_LOCATION, + Tags.ORIGINAL_CREATION_DATE + ] + kmip_1_3_attribute_tags = copy.deepcopy(kmip_1_2_attribute_tags) + [ + Tags.RANDOM_NUMBER_GENERATOR + ] + kmip_1_4_attribute_tags = copy.deepcopy(kmip_1_3_attribute_tags) + [ + Tags.PKCS12_FRIENDLY_NAME, + Tags.DESCRIPTION, + Tags.COMMENT, + Tags.SENSITIVE, + Tags.ALWAYS_SENSITIVE, + Tags.EXTRACTABLE, + Tags.NEVER_EXTRACTABLE + ] + kmip_2_0_attribute_tags = copy.deepcopy(kmip_1_4_attribute_tags) + [ + Tags.CERTIFICATE_SUBJECT_CN, + Tags.CERTIFICATE_SUBJECT_O, + Tags.CERTIFICATE_SUBJECT_OU, + Tags.CERTIFICATE_SUBJECT_EMAIL, + Tags.CERTIFICATE_SUBJECT_C, + Tags.CERTIFICATE_SUBJECT_ST, + Tags.CERTIFICATE_SUBJECT_L, + Tags.CERTIFICATE_SUBJECT_UID, + Tags.CERTIFICATE_SUBJECT_SERIAL_NUMBER, + Tags.CERTIFICATE_SUBJECT_TITLE, + Tags.CERTIFICATE_SUBJECT_DC, + Tags.CERTIFICATE_SUBJECT_DN_QUALIFIER, + Tags.CERTIFICATE_ISSUER_CN, + Tags.CERTIFICATE_ISSUER_O, + Tags.CERTIFICATE_ISSUER_OU, + Tags.CERTIFICATE_ISSUER_EMAIL, + Tags.CERTIFICATE_ISSUER_C, + Tags.CERTIFICATE_ISSUER_ST, + Tags.CERTIFICATE_ISSUER_L, + Tags.CERTIFICATE_ISSUER_UID, + Tags.CERTIFICATE_ISSUER_SERIAL_NUMBER, + Tags.CERTIFICATE_ISSUER_TITLE, + Tags.CERTIFICATE_ISSUER_DC, + Tags.CERTIFICATE_ISSUER_DN_QUALIFIER, + Tags.KEY_FORMAT_TYPE, + Tags.NIST_KEY_TYPE, + Tags.OPAQUE_DATA_TYPE, + Tags.PROTECTION_LEVEL, + Tags.PROTECTION_PERIOD, + Tags.PROTECTION_STORAGE_MASK, + Tags.QUANTUM_SAFE, + Tags.SHORT_UNIQUE_IDENTIFIER, + Tags.ATTRIBUTE + ] + kmip_2_0_attribute_tags.remove(Tags.CERTIFICATE_IDENTIFIER) + kmip_2_0_attribute_tags.remove(Tags.CERTIFICATE_SUBJECT) + kmip_2_0_attribute_tags.remove(Tags.CERTIFICATE_ISSUER) + kmip_2_0_attribute_tags.remove(Tags.OPERATION_POLICY_NAME) + kmip_2_0_attribute_tags.remove(Tags.CUSTOM_ATTRIBUTE) + + if kmip_version == KMIPVersion.KMIP_1_0: + return tag in kmip_1_0_attribute_tags + elif kmip_version == KMIPVersion.KMIP_1_1: + return tag in kmip_1_1_attribute_tags + elif kmip_version == KMIPVersion.KMIP_1_2: + return tag in kmip_1_2_attribute_tags + elif kmip_version == KMIPVersion.KMIP_1_3: + return tag in kmip_1_3_attribute_tags + elif kmip_version == KMIPVersion.KMIP_1_4: + return tag in kmip_1_4_attribute_tags + elif kmip_version == KMIPVersion.KMIP_2_0: + return tag in kmip_2_0_attribute_tags + else: + all_attribute_tags = set( + kmip_1_0_attribute_tags + + kmip_1_1_attribute_tags + + kmip_1_2_attribute_tags + + kmip_1_3_attribute_tags + + kmip_1_4_attribute_tags + + kmip_2_0_attribute_tags + ) + return tag in all_attribute_tags diff --git a/kmip/core/exceptions.py b/kmip/core/exceptions.py index dbc1d8b..9c1e952 100644 --- a/kmip/core/exceptions.py +++ b/kmip/core/exceptions.py @@ -264,6 +264,13 @@ class PermissionDenied(KmipError): ) +class AttributeNotSupported(Exception): + """ + An error generated when an unsupported attribute is processed. + """ + pass + + class ConfigurationError(Exception): """ An error generated when a problem occurs with a client or server diff --git a/kmip/core/factories/attribute_values.py b/kmip/core/factories/attribute_values.py index eb149b4..59bb278 100644 --- a/kmip/core/factories/attribute_values.py +++ b/kmip/core/factories/attribute_values.py @@ -110,6 +110,90 @@ class AttributeValueFactory(object): # Custom attribute indicated return attributes.CustomAttribute(value) + def create_attribute_value_by_enum(self, enum, value): + # Switch on the name of the attribute + if enum is enums.Tags.UNIQUE_IDENTIFIER: + return attributes.UniqueIdentifier(value) + elif enum is enums.Tags.NAME: + return self._create_name(value) + elif enum is enums.Tags.OBJECT_TYPE: + return attributes.ObjectType(value) + elif enum is enums.Tags.CRYPTOGRAPHIC_ALGORITHM: + return attributes.CryptographicAlgorithm(value) + elif enum is enums.Tags.CRYPTOGRAPHIC_LENGTH: + return self._create_cryptographic_length(value) + elif enum is enums.Tags.CRYPTOGRAPHIC_PARAMETERS: + return self._create_cryptographic_parameters(value) + elif enum is enums.Tags.CRYPTOGRAPHIC_DOMAIN_PARAMETERS: + raise NotImplementedError() + elif enum is enums.Tags.CERTIFICATE_TYPE: + raise NotImplementedError() + elif enum is enums.Tags.CERTIFICATE_LENGTH: + return primitives.Integer(value, enums.Tags.CERTIFICATE_LENGTH) + elif enum is enums.Tags.X_509_CERTIFICATE_IDENTIFIER: + raise NotImplementedError() + elif enum is enums.Tags.X_509_CERTIFICATE_SUBJECT: + raise NotImplementedError() + elif enum is enums.Tags.X_509_CERTIFICATE_ISSUER: + raise NotImplementedError() + elif enum is enums.Tags.CERTIFICATE_IDENTIFIER: + raise NotImplementedError() + elif enum is enums.Tags.CERTIFICATE_SUBJECT: + raise NotImplementedError() + elif enum is enums.Tags.CERTIFICATE_ISSUER: + raise NotImplementedError() + elif enum is enums.Tags.DIGITAL_SIGNATURE_ALGORITHM: + raise NotImplementedError() + elif enum is enums.Tags.DIGEST: + return attributes.Digest() + elif enum is enums.Tags.OPERATION_POLICY_NAME: + return attributes.OperationPolicyName(value) + elif enum is enums.Tags.CRYPTOGRAPHIC_USAGE_MASK: + return self._create_cryptographic_usage_mask(value) + elif enum is enums.Tags.LEASE_TIME: + return primitives.Interval(value, enums.Tags.LEASE_TIME) + elif enum is enums.Tags.USAGE_LIMITS: + raise NotImplementedError() + elif enum is enums.Tags.STATE: + return attributes.State(value) + elif enum is enums.Tags.INITIAL_DATE: + return primitives.DateTime(value, enums.Tags.INITIAL_DATE) + elif enum is enums.Tags.ACTIVATION_DATE: + return primitives.DateTime(value, enums.Tags.ACTIVATION_DATE) + elif enum is enums.Tags.PROCESS_START_DATE: + return primitives.DateTime(value, enums.Tags.PROCESS_START_DATE) + elif enum is enums.Tags.PROTECT_STOP_DATE: + return primitives.DateTime(value, enums.Tags.PROTECT_STOP_DATE) + elif enum is enums.Tags.DEACTIVATION_DATE: + return primitives.DateTime(value, enums.Tags.DEACTIVATION_DATE) + elif enum is enums.Tags.DESTROY_DATE: + return primitives.DateTime(value, enums.Tags.DESTROY_DATE) + elif enum is enums.Tags.COMPROMISE_OCCURRENCE_DATE: + return primitives.DateTime( + value, enums.Tags.COMPROMISE_OCCURRENCE_DATE) + elif enum is enums.Tags.COMPROMISE_DATE: + return primitives.DateTime(value, enums.Tags.COMPROMISE_DATE) + elif enum is enums.Tags.REVOCATION_REASON: + raise NotImplementedError() + elif enum is enums.Tags.ARCHIVE_DATE: + return primitives.DateTime(value, enums.Tags.ARCHIVE_DATE) + elif enum is enums.Tags.OBJECT_GROUP: + return self._create_object_group(value) + elif enum is enums.Tags.FRESH: + return primitives.Boolean(value, enums.Tags.FRESH) + elif enum is enums.Tags.LINK: + raise NotImplementedError() + elif enum is enums.Tags.APPLICATION_SPECIFIC_INFORMATION: + return self._create_application_specific_information(value) + elif enum is enums.Tags.CONTACT_INFORMATION: + return self._create_contact_information(value) + elif enum is enums.Tags.LAST_CHANGE_DATE: + return primitives.DateTime(value, enums.Tags.LAST_CHANGE_DATE) + elif enum is enums.Tags.CUSTOM_ATTRIBUTE: + return attributes.CustomAttribute(value) + else: + raise ValueError("Unrecognized attribute type: {}".format(enum)) + def _create_name(self, name): if name is not None: if isinstance(name, attributes.Name): diff --git a/kmip/core/objects.py b/kmip/core/objects.py index e33848c..718499d 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -16,6 +16,7 @@ import abc import six from six.moves import xrange +import struct from kmip.core import attributes from kmip.core.attributes import CryptographicParameters @@ -174,6 +175,183 @@ class Attribute(Struct): return NotImplemented +class Attributes(primitives.Struct): + """ + A collection of KMIP attributes. + + This is intended for use with KMIP 2.0+ and replaces the old + TemplateAttribute-style used for older KMIP versions. + + Attributes: + attributes: A list of attribute objects. + tag: A Tags enumeration specifying what type of Attributes structure + is in use. Valid values include: + * Tags.ATTRIBUTES + * Tags.COMMON_ATTRIBUTES + * Tags.PRIVATE_KEY_ATTRIBUTES + * Tags.PUBLIC_KEY_ATTRIBUTES + """ + + def __init__(self, attributes=None, tag=enums.Tags.ATTRIBUTES): + """ + Construct an Attributes structure. + + Args: + attributes (list): A list of attribute objects. Each object must + be some form of primitive, derived from Base. Optional, + defaults to None which is interpreted as an empty list. + tag (enum): A Tags enumeration specifying what type of Attributes + structure is in use. Valid values include: + * Tags.ATTRIBUTES + * Tags.COMMON_ATTRIBUTES + * Tags.PRIVATE_KEY_ATTRIBUTES + * Tags.PUBLIC_KEY_ATTRIBUTES + Optional, defaults to Tags.ATTRIBUTES. + """ + super(Attributes, self).__init__(tag=tag) + + self._factory = AttributeValueFactory() + + self._attributes = [] + self.attributes = attributes + + @property + def attributes(self): + return self._attributes + + @attributes.setter + def attributes(self, value): + if (value is None) or (value == []): + self._attributes = [] + elif isinstance(value, list): + for i, attribute in enumerate(value): + if isinstance(attribute, primitives.Base): + if not enums.is_attribute(attribute.tag): + raise TypeError( + "Item {} must be a supported attribute.".format( + i + 1 + ) + ) + else: + raise TypeError( + "Item {} must be a Base object, not a {}.".format( + i + 1, + type(attribute) + ) + ) + self._attributes = value + else: + raise TypeError("Attributes must be a list of Base objects.") + + def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_2_0): + """ + Read the data stream and decode the Attributes structure into its + parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method. + kmip_version (enum): A KMIPVersion enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 2.0. + + Raises: + AttributeNotSupported: Raised if an unsupported attribute is + encountered while decoding. + """ + super(Attributes, self).read(input_stream, kmip_version=kmip_version) + local_stream = BytearrayStream(input_stream.read(self.length)) + + while True: + if len(local_stream) < 3: + break + tag = struct.unpack('!I', b'\x00' + local_stream.peek(3))[0] + if enums.is_enum_value(enums.Tags, tag): + tag = enums.Tags(tag) + if not enums.is_attribute(tag, kmip_version=kmip_version): + raise exceptions.AttributeNotSupported( + "Attribute {} is not supported by KMIP {}.".format( + tag.name, + kmip_version.value + ) + ) + value = self._factory.create_attribute_value_by_enum(tag, None) + value.read(local_stream, kmip_version=kmip_version) + self._attributes.append(value) + else: + break + + self.is_oversized(local_stream) + + def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_2_0): + """ + Write the Attributes structure encoding to the data stream. + + Args: + output_stream (stream): A data stream in which to encode + Attributes structure data, supporting a write method. + kmip_version (enum): A KMIPVersion enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 2.0. + + Raises: + AttributeNotSupported: Raised if an unsupported attribute is + found in the attribute list while encoding. + """ + local_stream = BytearrayStream() + + for attribute in self._attributes: + tag = attribute.tag + if not enums.is_attribute(tag, kmip_version=kmip_version): + raise exceptions.AttributeNotSupported( + "Attribute {} is not supported by KMIP {}.".format( + tag.name, + kmip_version.value + ) + ) + attribute.write(local_stream, kmip_version=kmip_version) + + self.length = local_stream.length() + super(Attributes, self).write(output_stream, kmip_version=kmip_version) + output_stream.write(local_stream.buffer) + + def __repr__(self): + values = ", ".join([repr(x) for x in self.attributes]) + return "Attributes(attributes=[{}], tag={})".format( + values, + self.tag + ) + + def __str__(self): + values = ", ".join([str(x) for x in self.attributes]) + value = '"attributes": [{}]'.format(values) + return '{' + value + '}' + + def __eq__(self, other): + if not isinstance(other, Attributes): + return NotImplemented + + if len(self.attributes) != len(other.attributes): + return False + + # TODO (ph) Allow order independence? + + for i in six.moves.range(len(self.attributes)): + a = self.attributes[i] + b = other.attributes[i] + + if a != b: + return False + + return True + + def __ne__(self, other): + if isinstance(other, Attributes): + return not (self == other) + else: + return NotImplemented + + class Nonce(primitives.Struct): """ A struct representing a Nonce object. diff --git a/kmip/tests/unit/core/objects/test_objects.py b/kmip/tests/unit/core/objects/test_objects.py index 127597c..e705b34 100644 --- a/kmip/tests/unit/core/objects/test_objects.py +++ b/kmip/tests/unit/core/objects/test_objects.py @@ -26,6 +26,8 @@ from kmip.core.enums import KeyRoleType from kmip.core.enums import PaddingMethod from kmip.core.enums import Tags +from kmip.core import exceptions + from kmip.core.factories.attributes import AttributeValueFactory from kmip.core import objects @@ -35,6 +37,8 @@ from kmip.core.objects import ExtensionTag from kmip.core.objects import ExtensionType from kmip.core.objects import KeyMaterialStruct +from kmip.core import primitives + from kmip.core import utils from kmip.core.utils import BytearrayStream @@ -132,6 +136,609 @@ class TestAttributeClass(TestCase): self.assertTrue(self.attributeObj_a != self.attributeObj_b) +class TestAttributes(TestCase): + + def setUp(self): + super(TestAttributes, self).setUp() + + # This encoding matches the following set of values: + # Attributes + # Cryptographic Algorithm - AES + # Cryptographic Length - 128 + self.full_encoding = utils.BytearrayStream( + b'\x42\x01\x25\x01\x00\x00\x00\x20' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x01\x25\x01\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Attributes + # Cryptographic Algorithm - AES + # Non-existent Tag + self.invalid_encoding = utils.BytearrayStream( + b'\x42\x01\x25\x01\x00\x00\x00\x20' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\xFF\xFF\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Attributes + # Operation Policy Name - b4faee10-aa2a-4446-8ad4-0881f3422959 + self.unsupported_encoding = utils.BytearrayStream( + b'\x42\x01\x25\x01\x00\x00\x00\x30' + b'\x42\x00\x5D\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Private Key Attributes + # Cryptographic Algorithm - AES + # Cryptographic Length - 128 + self.alt_encoding = utils.BytearrayStream( + b'\x42\x01\x27\x01\x00\x00\x00\x20' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestAttributes, self).tearDown() + + def test_unrecognized_attributes(self): + """ + Test that a TypeError is raised when an unrecognized attribute is + included in the attribute list. Note that this unrecognized attribute + is a valid PyKMIP object derived from Base, it just isn't an attribute. + """ + kwargs = { + 'attributes': [ + primitives.Enumeration( + enums.WrappingMethod, + enums.WrappingMethod.ENCRYPT, + enums.Tags.WRAPPING_METHOD + ) + ] + } + self.assertRaisesRegex( + TypeError, + "Item 1 must be a supported attribute.", + objects.Attributes, + **kwargs + ) + + attrs = objects.Attributes() + args = ( + attrs, + 'attributes', + [ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Enumeration( + enums.WrappingMethod, + enums.WrappingMethod.ENCRYPT, + enums.Tags.WRAPPING_METHOD + ) + ] + ) + self.assertRaisesRegex( + TypeError, + "Item 2 must be a supported attribute.", + setattr, + *args + ) + + def test_invalid_attributes(self): + """ + Test that a TypeError is raised when an invalid value is included + in the attribute list. Note that the value is not a valid PyKMIP + object derived from Base and therefore cannot be an attribute. + """ + kwargs = { + 'attributes': [0] + } + self.assertRaisesRegex( + TypeError, + "Item 1 must be a Base object, not a {}.".format(type(0)), + objects.Attributes, + **kwargs + ) + + attrs = objects.Attributes() + args = ( + attrs, + 'attributes', + [ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Enumeration( + enums.KeyFormatType, + enums.KeyFormatType.RAW, + enums.Tags.KEY_FORMAT_TYPE + ), + 1 + ] + ) + self.assertRaisesRegex( + TypeError, + "Item 3 must be a Base object, not a {}.".format(type(0)), + setattr, + *args + ) + + def test_invalid_attributes_list(self): + """ + Test that a TypeError is raised when an invalid attribute list is + used with the Attributes structure. + """ + kwargs = { + 'attributes': 'invalid' + } + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Base objects.", + objects.Attributes, + **kwargs + ) + + attrs = objects.Attributes() + args = ( + attrs, + 'attributes', + 'invalid' + ) + self.assertRaisesRegex( + TypeError, + "Attributes must be a list of Base objects.", + setattr, + *args + ) + + def test_read(self): + """ + Test that an Attributes structure can be correctly read in from a data + stream. + """ + attrs = objects.Attributes() + + self.assertEqual([], attrs.attributes) + + attrs.read(self.full_encoding) + + self.assertEqual(2, len(attrs.attributes)) + + attr_1 = attrs.attributes[0] + self.assertIsInstance(attr_1, primitives.Enumeration) + self.assertEqual(enums.CryptographicAlgorithm.AES, attr_1.value) + + attr_2 = attrs.attributes[1] + self.assertIsInstance(attr_2, primitives.Integer) + self.assertEqual(128, attr_2.value) + + def test_read_no_attributes(self): + """ + Test that an empty Attributes structure can be correctly read in from + a data stream. + """ + attrs = objects.Attributes() + + self.assertEqual([], attrs.attributes) + + attrs.read(self.empty_encoding) + + self.assertEqual([], attrs.attributes) + + def test_read_invalid_attribute(self): + """ + Test that an unrecognized tag is correctly handled when reading in an + Attributes structure from a data stream. Specifically, structure + parsing should stop and an error should be raised indicating that more + encoding data is available but could not be parsed. + """ + attrs = objects.Attributes() + + self.assertEqual([], attrs.attributes) + + args = (self.invalid_encoding, ) + self.assertRaisesRegex( + exceptions.StreamNotEmptyError, + "Invalid length used to read Base, bytes remaining: 16", + attrs.read, + *args + ) + + def test_read_unsupported_attribute(self): + """ + Test that an AttributeNotSupported error is raised when an unsupported + attribute is parsed while reading in an Attributes structure from a + data stream. This can occur when an older attribute is no longer + supported by a newer version of KMIP, or vice versa. + """ + attrs = objects.Attributes() + + self.assertEqual([], attrs.attributes) + + args = (self.unsupported_encoding, ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0} + self.assertRaisesRegex( + exceptions.AttributeNotSupported, + "Attribute OPERATION_POLICY_NAME is not supported by KMIP 2.0.", + attrs.read, + *args, + **kwargs + ) + + def test_read_alternative_tag(self): + """ + Test that an Attributes structure can be correctly read in from a data + stream with an alternative tag. This can occur if a variant of the + Attributes structure is being used, like the Common Attributes, Public + Key Attributes, or Private Key Attributes structures. + """ + attrs = objects.Attributes(tag=enums.Tags.PRIVATE_KEY_ATTRIBUTES) + + self.assertEqual([], attrs.attributes) + + attrs.read(self.alt_encoding) + + self.assertEqual(2, len(attrs.attributes)) + + attr_1 = attrs.attributes[0] + self.assertIsInstance(attr_1, primitives.Enumeration) + self.assertEqual(enums.CryptographicAlgorithm.AES, attr_1.value) + + attr_2 = attrs.attributes[1] + self.assertIsInstance(attr_2, primitives.Integer) + self.assertEqual(128, attr_2.value) + + def test_write(self): + """ + Test that an Attributes structure can be correctly written to a data + stream. + """ + attrs = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + + stream = utils.BytearrayStream() + attrs.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_no_attributes(self): + """ + Test that an empty Attributes structure can be correctly written to + a data stream. + """ + attrs = objects.Attributes() + + stream = utils.BytearrayStream() + attrs.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_write_unsupported_attribute(self): + """ + Test that an AttributeNotSupported error is raised when an unsupported + attribute is found while writing an Attributes structure to a data + stream. This can occur when an older attribute is no longer supported + by a newer version of KMIP, or vice versa. + """ + attrs = objects.Attributes(attributes=[ + primitives.TextString( + "default", + tag=enums.Tags.OPERATION_POLICY_NAME + ) + ]) + + stream = utils.BytearrayStream() + args = (stream, ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0} + self.assertRaisesRegex( + exceptions.AttributeNotSupported, + "Attribute OPERATION_POLICY_NAME is not supported by KMIP 2.0.", + attrs.write, + *args, + **kwargs + ) + + def test_write_alternative_tag(self): + """ + Test that an Attributes structure can be correctly written to a data + stream with an alternative tag. This can occur if a variant of the + Attributes structure is being used, like the Common Attributes, Public + Key Attributes, or Private Key Attributes structures. + """ + attrs = objects.Attributes( + attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ], + tag=enums.Tags.PRIVATE_KEY_ATTRIBUTES + ) + + stream = utils.BytearrayStream() + attrs.write(stream) + + self.assertEqual(len(self.alt_encoding), len(stream)) + self.assertEqual(str(self.alt_encoding), str(stream)) + + def test_repr(self): + """ + Test that repr can be applied to an Attributes structure. + """ + attrs = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + self.assertEqual( + "Attributes(attributes=[" + "Enumeration(" + "enum=CryptographicAlgorithm, " + "value=CryptographicAlgorithm.AES, " + "tag=Tags.CRYPTOGRAPHIC_ALGORITHM), " + "Integer(value=128)], " + "tag=Tags.ATTRIBUTES)", + repr(attrs) + ) + + def test_repr_alternative_tag(self): + """ + Test that repr can be applied to an Attribute structure with an + alternative tag. This can occur if a variant of the Attributes + structure is being used, like the Common Attributes, Public Key + Attributes, or Private Key Attributes structure. + """ + attrs = objects.Attributes(tag=enums.Tags.COMMON_ATTRIBUTES) + self.assertEqual( + "Attributes(attributes=[], tag=Tags.COMMON_ATTRIBUTES)", + repr(attrs) + ) + + def test_str(self): + """ + Test that str can be applied to an Attributes structure. + """ + attrs = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + self.assertEqual( + '{"attributes": [CryptographicAlgorithm.AES, 128]}', + str(attrs) + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + identical Attributes structures. + """ + a = objects.Attributes() + b = objects.Attributes() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + b = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_attributes(self): + """ + Test that the equality operator returns False when comparing two + Attributes structures with different attributes lists. + """ + a = objects.Attributes() + b = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + a = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + b = objects.Attributes(attributes=[ + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ), + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ]) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing an + Attributes structure with another type. + """ + a = objects.Attributes() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + identical Attributes structures. + """ + a = objects.Attributes() + b = objects.Attributes() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + b = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_attributes(self): + """ + Test that the inequality operator returns True when comparing two + Attributes structures with different attributes lists. + """ + a = objects.Attributes() + b = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + a = objects.Attributes(attributes=[ + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ), + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ]) + b = objects.Attributes(attributes=[ + primitives.Integer( + 128, + enums.Tags.CRYPTOGRAPHIC_LENGTH + ), + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ]) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing an + Attributes structure with another type. + """ + a = objects.Attributes() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + class TestKeyMaterialStruct(TestCase): """ A test suite for the KeyMaterialStruct. diff --git a/kmip/tests/unit/core/test_enums.py b/kmip/tests/unit/core/test_enums.py new file mode 100644 index 0000000..6d9b3e6 --- /dev/null +++ b/kmip/tests/unit/core/test_enums.py @@ -0,0 +1,324 @@ +# Copyright (c) 2019 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import enums + + +class TestEnumUtilityFunctions(testtools.TestCase): + + def setUp(self): + super(TestEnumUtilityFunctions, self).setUp() + + def tearDown(self): + super(TestEnumUtilityFunctions, self).tearDown() + + def test_is_enum_value(self): + result = enums.is_enum_value( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES + ) + self.assertTrue(result) + + result = enums.is_enum_value( + enums.WrappingMethod, + 'invalid' + ) + self.assertFalse(result) + + def test_is_attribute(self): + # Test an attribute introduced in KMIP 1.0 + result = enums.is_attribute(enums.Tags.UNIQUE_IDENTIFIER) + self.assertTrue(result) + + # Test an attribute introduced in KMIP 1.1 + result = enums.is_attribute(enums.Tags.FRESH) + self.assertTrue(result) + + # Test an attribute introduced in KMIP 1.2 + result = enums.is_attribute(enums.Tags.KEY_VALUE_PRESENT) + self.assertTrue(result) + + # Test an attribute introduced in KMIP 1.3 + result = enums.is_attribute(enums.Tags.RANDOM_NUMBER_GENERATOR) + self.assertTrue(result) + + # Test an attribute introduced in KMIP 1.4 + result = enums.is_attribute(enums.Tags.COMMENT) + self.assertTrue(result) + + # Test an attribute introduced in KMIP 2.0 + result = enums.is_attribute(enums.Tags.QUANTUM_SAFE) + self.assertTrue(result) + + def test_is_attribute_added_in_kmip_1_0(self): + result = enums.is_attribute( + enums.Tags.UNIQUE_IDENTIFIER, + enums.KMIPVersion.KMIP_1_0 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.UNIQUE_IDENTIFIER, + enums.KMIPVersion.KMIP_1_1 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.UNIQUE_IDENTIFIER, + enums.KMIPVersion.KMIP_1_2 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.UNIQUE_IDENTIFIER, + enums.KMIPVersion.KMIP_1_3 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.UNIQUE_IDENTIFIER, + enums.KMIPVersion.KMIP_1_4 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.UNIQUE_IDENTIFIER, + enums.KMIPVersion.KMIP_2_0 + ) + self.assertTrue(result) + + def test_is_attribute_added_in_kmip_1_1(self): + result = enums.is_attribute( + enums.Tags.FRESH, + enums.KMIPVersion.KMIP_1_0 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.FRESH, + enums.KMIPVersion.KMIP_1_1 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.FRESH, + enums.KMIPVersion.KMIP_1_2 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.FRESH, + enums.KMIPVersion.KMIP_1_3 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.FRESH, + enums.KMIPVersion.KMIP_1_4 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.FRESH, + enums.KMIPVersion.KMIP_2_0 + ) + self.assertTrue(result) + + def test_is_attribute_added_in_kmip_1_2(self): + result = enums.is_attribute( + enums.Tags.KEY_VALUE_PRESENT, + enums.KMIPVersion.KMIP_1_0 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.KEY_VALUE_PRESENT, + enums.KMIPVersion.KMIP_1_1 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.KEY_VALUE_PRESENT, + enums.KMIPVersion.KMIP_1_2 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.KEY_VALUE_PRESENT, + enums.KMIPVersion.KMIP_1_3 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.KEY_VALUE_PRESENT, + enums.KMIPVersion.KMIP_1_4 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.KEY_VALUE_PRESENT, + enums.KMIPVersion.KMIP_2_0 + ) + self.assertTrue(result) + + def test_is_attribute_added_in_kmip_1_3(self): + result = enums.is_attribute( + enums.Tags.RANDOM_NUMBER_GENERATOR, + enums.KMIPVersion.KMIP_1_0 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.RANDOM_NUMBER_GENERATOR, + enums.KMIPVersion.KMIP_1_1 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.RANDOM_NUMBER_GENERATOR, + enums.KMIPVersion.KMIP_1_2 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.RANDOM_NUMBER_GENERATOR, + enums.KMIPVersion.KMIP_1_3 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.RANDOM_NUMBER_GENERATOR, + enums.KMIPVersion.KMIP_1_4 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.RANDOM_NUMBER_GENERATOR, + enums.KMIPVersion.KMIP_2_0 + ) + self.assertTrue(result) + + def test_is_attribute_added_in_kmip_1_4(self): + result = enums.is_attribute( + enums.Tags.COMMENT, + enums.KMIPVersion.KMIP_1_0 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.COMMENT, + enums.KMIPVersion.KMIP_1_1 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.COMMENT, + enums.KMIPVersion.KMIP_1_2 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.COMMENT, + enums.KMIPVersion.KMIP_1_3 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.COMMENT, + enums.KMIPVersion.KMIP_1_4 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.COMMENT, + enums.KMIPVersion.KMIP_2_0 + ) + self.assertTrue(result) + + def test_is_attribute_added_in_kmip_2_0(self): + result = enums.is_attribute( + enums.Tags.QUANTUM_SAFE, + enums.KMIPVersion.KMIP_1_0 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.QUANTUM_SAFE, + enums.KMIPVersion.KMIP_1_1 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.QUANTUM_SAFE, + enums.KMIPVersion.KMIP_1_2 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.QUANTUM_SAFE, + enums.KMIPVersion.KMIP_1_3 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.QUANTUM_SAFE, + enums.KMIPVersion.KMIP_1_4 + ) + self.assertFalse(result) + + result = enums.is_attribute( + enums.Tags.QUANTUM_SAFE, + enums.KMIPVersion.KMIP_2_0 + ) + self.assertTrue(result) + + def test_is_attribute_removed_in_kmip_2_0(self): + result = enums.is_attribute( + enums.Tags.CUSTOM_ATTRIBUTE, + enums.KMIPVersion.KMIP_1_0 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.CUSTOM_ATTRIBUTE, + enums.KMIPVersion.KMIP_1_1 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.CUSTOM_ATTRIBUTE, + enums.KMIPVersion.KMIP_1_2 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.CUSTOM_ATTRIBUTE, + enums.KMIPVersion.KMIP_1_3 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.CUSTOM_ATTRIBUTE, + enums.KMIPVersion.KMIP_1_4 + ) + self.assertTrue(result) + + result = enums.is_attribute( + enums.Tags.CUSTOM_ATTRIBUTE, + enums.KMIPVersion.KMIP_2_0 + ) + self.assertFalse(result)