2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-20 10:13:18 +00:00

Updating the AttributeValue factory

This change updates the AttributeValue factory, removing and
streamlining code. Support for several basic primitive attributes are
added in addition to a redesigned test suite for the factory.
This commit is contained in:
Peter Hamilton
2015-08-18 14:04:29 -04:00
parent 94646337c3
commit b0bab4b8bc
2 changed files with 433 additions and 347 deletions

View File

@@ -13,26 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.enums import AttributeType
from kmip.core.enums import Tags
from kmip.core.primitives import DateTime
from kmip.core.attributes import ApplicationSpecificInformation
from kmip.core.attributes import ContactInformation
from kmip.core.attributes import CryptographicAlgorithm
from kmip.core.attributes import CryptographicLength
from kmip.core.attributes import CryptographicUsageMask
from kmip.core.attributes import CryptographicParameters
from kmip.core.attributes import CustomAttribute
from kmip.core.attributes import Digest
from kmip.core.attributes import Name
from kmip.core.attributes import ObjectGroup
from kmip.core.attributes import UniqueIdentifier
from kmip.core.attributes import ObjectType
from kmip.core.attributes import OperationPolicyName
from kmip.core.attributes import HashingAlgorithm
from kmip.core import attributes
from kmip.core import enums
from kmip.core import primitives
from kmip.core import utils
@@ -40,184 +23,138 @@ class AttributeValueFactory(object):
def create_attribute_value(self, name, value):
# Switch on the name of the attribute
if name is AttributeType.UNIQUE_IDENTIFIER:
value = self._create_unique_identifier(value)
elif name is AttributeType.NAME:
value = self._create_name(value)
elif name is AttributeType.OBJECT_TYPE:
value = self._create_object_type(value)
elif name is AttributeType.CRYPTOGRAPHIC_ALGORITHM:
value = self._create_cryptographic_algorithm(value)
elif name is AttributeType.CRYPTOGRAPHIC_LENGTH:
value = self._create_cryptographic_length(value)
elif name is AttributeType.CRYPTOGRAPHIC_PARAMETERS:
value = self._create_cryptographic_parameters(value)
elif name is AttributeType.CRYPTOGRAPHIC_DOMAIN_PARAMETERS:
value = self._create_cryptographic_domain_parameters(value)
elif name is AttributeType.CERTIFICATE_TYPE:
value = self._create_certificate_type(value)
elif name is AttributeType.CERTIFICATE_LENGTH:
value = self._create_certificate_length(value)
elif name is AttributeType.X_509_CERTIFICATE_IDENTIFIER:
value = self._create_x_509_certificate_identifier(value)
elif name is AttributeType.X_509_CERTIFICATE_SUBJECT:
value = self._create_x_509_certificate_subject(value)
elif name is AttributeType.X_509_CERTIFICATE_ISSUER:
value = self._create_x_509_certificate_issuer(value)
elif name is AttributeType.CERTIFICATE_IDENTIFIER:
value = self._create_certificate_identifier(value)
elif name is AttributeType.CERTIFICATE_SUBJECT:
value = self._create_certificate_subject(value)
elif name is AttributeType.CERTIFICATE_ISSUER:
value = self._create_certificate_issuer(value)
elif name is AttributeType.DIGITAL_SIGNATURE_ALGORITHM:
value = self._create_digital_signature_algorithm(value)
elif name is AttributeType.DIGEST:
value = self._create_digest()
elif name is AttributeType.OPERATION_POLICY_NAME:
value = self._create_operation_policy_name(value)
elif name is AttributeType.CRYPTOGRAPHIC_USAGE_MASK:
value = self._create_cryptographic_usage_mask(value)
elif name is AttributeType.LEASE_TIME:
value = self._create_lease_time(value)
elif name is AttributeType.USAGE_LIMITS:
value = self._create_usage_limits(value)
elif name is AttributeType.STATE:
value = self._create_state(value)
elif name is AttributeType.INITIAL_DATE:
value = self._create_initial_date(value)
elif name is AttributeType.ACTIVATION_DATE:
value = self._create_activation_date(value)
elif name is AttributeType.PROCESS_START_DATE:
value = self._create_process_start_date(value)
elif name is AttributeType.PROTECT_STOP_DATE:
value = self._create_protect_stop_date(value)
elif name is AttributeType.DEACTIVATION_DATE:
value = self._create_deactivation_date(value)
elif name is AttributeType.DESTROY_DATE:
value = self._create_destroy_date(value)
elif name is AttributeType.COMPROMISE_OCCURRENCE_DATE:
value = self._create_compromise_occurrence_date(value)
elif name is AttributeType.COMPROMISE_DATE:
value = self._create_compromise_date(value)
elif name is AttributeType.REVOCATION_REASON:
value = self._create_revocation_reason(value)
elif name is AttributeType.ARCHIVE_DATE:
value = self._create_archive_date(value)
elif name is AttributeType.OBJECT_GROUP:
value = self._create_object_group(value)
elif name is AttributeType.FRESH:
value = self._create_fresh(value)
elif name is AttributeType.LINK:
value = self._create_link(value)
elif name is AttributeType.APPLICATION_SPECIFIC_INFORMATION:
value = self._create_application_specific_information(value)
elif name is AttributeType.CONTACT_INFORMATION:
value = self._create_contact_information(value)
elif name is AttributeType.LAST_CHANGE_DATE:
value = self._create_last_change_date(value)
elif name is AttributeType.CUSTOM_ATTRIBUTE:
value = self._create_custom_attribute(value)
if name is enums.AttributeType.UNIQUE_IDENTIFIER:
return attributes.UniqueIdentifier(value)
elif name is enums.AttributeType.NAME:
return self._create_name(value)
elif name is enums.AttributeType.OBJECT_TYPE:
return attributes.ObjectType()
elif name is enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM:
return attributes.CryptographicAlgorithm(value)
elif name is enums.AttributeType.CRYPTOGRAPHIC_LENGTH:
return self._create_cryptographic_length(value)
elif name is enums.AttributeType.CRYPTOGRAPHIC_PARAMETERS:
return self._create_cryptographic_parameters(value)
elif name is enums.AttributeType.CRYPTOGRAPHIC_DOMAIN_PARAMETERS:
raise NotImplementedError()
elif name is enums.AttributeType.CERTIFICATE_TYPE:
raise NotImplementedError()
elif name is enums.AttributeType.CERTIFICATE_LENGTH:
return primitives.Integer(value, enums.Tags.CERTIFICATE_LENGTH)
elif name is enums.AttributeType.X_509_CERTIFICATE_IDENTIFIER:
raise NotImplementedError()
elif name is enums.AttributeType.X_509_CERTIFICATE_SUBJECT:
raise NotImplementedError()
elif name is enums.AttributeType.X_509_CERTIFICATE_ISSUER:
raise NotImplementedError()
elif name is enums.AttributeType.CERTIFICATE_IDENTIFIER:
raise NotImplementedError()
elif name is enums.AttributeType.CERTIFICATE_SUBJECT:
raise NotImplementedError()
elif name is enums.AttributeType.CERTIFICATE_ISSUER:
raise NotImplementedError()
elif name is enums.AttributeType.DIGITAL_SIGNATURE_ALGORITHM:
raise NotImplementedError()
elif name is enums.AttributeType.DIGEST:
return attributes.Digest()
elif name is enums.AttributeType.OPERATION_POLICY_NAME:
return attributes.OperationPolicyName(value)
elif name is enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK:
return self._create_cryptographic_usage_mask(value)
elif name is enums.AttributeType.LEASE_TIME:
return primitives.Interval(value, enums.Tags.LEASE_TIME)
elif name is enums.AttributeType.USAGE_LIMITS:
raise NotImplementedError()
elif name is enums.AttributeType.STATE:
raise NotImplementedError()
elif name is enums.AttributeType.INITIAL_DATE:
return primitives.DateTime(value, enums.Tags.INITIAL_DATE)
elif name is enums.AttributeType.ACTIVATION_DATE:
return primitives.DateTime(value, enums.Tags.ACTIVATION_DATE)
elif name is enums.AttributeType.PROCESS_START_DATE:
return primitives.DateTime(value, enums.Tags.PROCESS_START_DATE)
elif name is enums.AttributeType.PROTECT_STOP_DATE:
return primitives.DateTime(value, enums.Tags.PROTECT_STOP_DATE)
elif name is enums.AttributeType.DEACTIVATION_DATE:
return primitives.DateTime(value, enums.Tags.DEACTIVATION_DATE)
elif name is enums.AttributeType.DESTROY_DATE:
return primitives.DateTime(value, enums.Tags.DESTROY_DATE)
elif name is enums.AttributeType.COMPROMISE_OCCURRENCE_DATE:
return primitives.DateTime(
value, enums.Tags.COMPROMISE_OCCURRENCE_DATE)
elif name is enums.AttributeType.COMPROMISE_DATE:
return primitives.DateTime(value, enums.Tags.COMPROMISE_DATE)
elif name is enums.AttributeType.REVOCATION_REASON:
raise NotImplementedError()
elif name is enums.AttributeType.ARCHIVE_DATE:
return primitives.DateTime(value, enums.Tags.ARCHIVE_DATE)
elif name is enums.AttributeType.OBJECT_GROUP:
return self._create_object_group(value)
elif name is enums.AttributeType.FRESH:
return primitives.Boolean(value, enums.Tags.FRESH)
elif name is enums.AttributeType.LINK:
raise NotImplementedError()
elif name is enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION:
return self._create_application_specific_information(value)
elif name is enums.AttributeType.CONTACT_INFORMATION:
return self._create_contact_information(value)
elif name is enums.AttributeType.LAST_CHANGE_DATE:
return primitives.DateTime(value, enums.Tags.LAST_CHANGE_DATE)
elif name is enums.AttributeType.CUSTOM_ATTRIBUTE:
return attributes.CustomAttribute(value)
else:
if not isinstance(name, str):
raise ValueError('Unrecognized attribute type: '
'{0}'.format(name))
elif name.startswith('x-'):
# Custom attribute indicated
value = self._create_custom_attribute(value)
return value
def _create_unique_identifier(self, uuid):
return UniqueIdentifier(uuid)
return attributes.CustomAttribute(value)
def _create_name(self, name):
if name is not None:
name_value = name.name_value
name_type = name.name_type
return Name.create(name_value, name_type)
return attributes.Name.create(name_value, name_type)
else:
return Name()
def _create_object_type(self, obj):
return ObjectType()
def _create_cryptographic_algorithm(self, alg):
return CryptographicAlgorithm(alg)
return attributes.Name()
def _create_cryptographic_length(self, length):
if length is not None and not isinstance(length, int):
msg = utils.build_er_error(CryptographicLength,
msg = utils.build_er_error(attributes.CryptographicLength,
'constructor argument type', int,
type(length))
raise TypeError(msg)
return CryptographicLength(length)
return attributes.CryptographicLength(length)
def _create_cryptographic_parameters(self, params):
block_cipher_mode = None
bcm = None
if 'block_cipher_mode' in params:
block_cipher_mode = CryptographicParameters.BlockCipherMode(
bcm = attributes.CryptographicParameters.BlockCipherMode(
params.get('block_cipher_mode'))
padding_method = None
if 'padding_method' in params:
padding_method = CryptographicParameters.PaddingMethod(
padding_method = attributes.CryptographicParameters.PaddingMethod(
params.get('padding_method'))
key_role_type = None
if 'key_role_type' in params:
key_role_type = CryptographicParameters.KeyRoleType(
key_role_type = attributes.CryptographicParameters.KeyRoleType(
params.get('key_role_type'))
hashing_algorithm = None
if 'hashing_algorithm' in params:
hashing_algorithm = HashingAlgorithm(
hashing_algorithm = attributes.HashingAlgorithm(
params.get("hashing_algorithm"))
return CryptographicParameters(
block_cipher_mode=block_cipher_mode,
return attributes.CryptographicParameters(
block_cipher_mode=bcm,
padding_method=padding_method,
hashing_algorithm=hashing_algorithm,
key_role_type=key_role_type)
def _create_cryptographic_domain_parameters(self, params):
raise NotImplementedError()
def _create_certificate_type(self, cert):
raise NotImplementedError()
def _create_certificate_length(self, length):
raise NotImplementedError()
def _create_x_509_certificate_identifier(self, ident):
raise NotImplementedError()
def _create_x_509_certificate_subject(self, subject):
raise NotImplementedError()
def _create_x_509_certificate_issuer(self, issuer):
raise NotImplementedError()
def _create_certificate_identifier(self, ident):
raise NotImplementedError()
def _create_certificate_subject(self, subject):
raise NotImplementedError()
def _create_certificate_issuer(self, issuer):
raise NotImplementedError()
def _create_digital_signature_algorithm(self, alg):
raise NotImplementedError()
def _create_digest(self):
return Digest()
def _create_operation_policy_name(self, name):
return OperationPolicyName(name)
def _create_cryptographic_usage_mask(self, flags):
mask = None
if flags is not None:
@@ -225,98 +162,49 @@ class AttributeValueFactory(object):
for flag in flags:
mask |= flag.value
return CryptographicUsageMask(mask)
def _create_lease_time(self, lease):
raise NotImplementedError()
def _create_usage_limits(self, limits):
raise NotImplementedError()
def _create_state(self, state):
raise NotImplementedError()
def _create_initial_date(self, date):
return DateTime(value=date, tag=Tags.INITIAL_DATE)
def _create_activation_date(self, date):
return DateTime(value=date, tag=Tags.ACTIVATION_DATE)
def _create_process_start_date(self, date):
return DateTime(value=date, tag=Tags.PROCESS_START_DATE)
def _create_protect_stop_date(self, date):
return DateTime(value=date, tag=Tags.PROTECT_STOP_DATE)
def _create_deactivation_date(self, date):
return DateTime(value=date, tag=Tags.DEACTIVATION_DATE)
def _create_destroy_date(self, date):
return DateTime(value=date, tag=Tags.DESTROY_DATE)
def _create_compromise_occurrence_date(self, date):
return DateTime(value=date, tag=Tags.COMPROMISE_OCCURRENCE_DATE)
def _create_compromise_date(self, date):
return DateTime(value=date, tag=Tags.COMPROMISE_DATE)
def _create_revocation_reason(self, reason):
raise NotImplementedError()
def _create_archive_date(self, date):
return DateTime(value=date, tag=Tags.ARCHIVE_DATE)
return attributes.CryptographicUsageMask(mask)
def _create_object_group(self, group):
if group is not None and not isinstance(group, str):
msg = utils.build_er_error(ObjectGroup,
msg = utils.build_er_error(attributes.ObjectGroup,
'constructor argument type', str,
type(group))
raise TypeError(msg)
return ObjectGroup(group)
def _create_fresh(self, fresh):
raise NotImplementedError()
def _create_link(self, link):
raise NotImplementedError()
return attributes.ObjectGroup(group)
def _create_application_specific_information(self, info):
if info is None:
return ApplicationSpecificInformation()
return attributes.ApplicationSpecificInformation()
else:
application_namespace = info.get('application_namespace')
application_data = info.get('application_data')
if not isinstance(application_namespace, str):
msg = utils.build_er_error(ApplicationSpecificInformation,
'constructor argument type',
str, type(application_namespace))
msg = utils.build_er_error(
attributes.ApplicationSpecificInformation,
'constructor argument type',
str, type(application_namespace))
raise TypeError(msg)
if not isinstance(application_data, str):
msg = utils.build_er_error(ApplicationSpecificInformation,
'constructor argument type',
str, type(application_data))
msg = utils.build_er_error(
attributes.ApplicationSpecificInformation,
'constructor argument type',
str, type(application_data))
raise TypeError(msg)
return ApplicationSpecificInformation.create(application_namespace,
application_data)
return attributes.ApplicationSpecificInformation.create(
application_namespace, application_data)
def _create_contact_information(self, info):
if info is None:
return ContactInformation()
return attributes.ContactInformation()
else:
if not isinstance(info, str):
msg = utils.build_er_error(ContactInformation,
msg = utils.build_er_error(attributes.ContactInformation,
'constructor argument type', str,
type(info))
raise TypeError(msg)
return ContactInformation(info)
def _create_last_change_date(self, date):
raise NotImplementedError()
def _create_custom_attribute(self, data):
return CustomAttribute(data)
return attributes.ContactInformation(info)