2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-15 15:53:36 +00:00

server: implement 'discover-versions'

This commit is contained in:
Viktor Tarasov
2016-02-16 11:21:25 +01:00
parent 52c7103681
commit c9df034e48
3 changed files with 225 additions and 0 deletions

View File

@@ -33,6 +33,7 @@ from kmip.core.factories.secrets import SecretFactory
from kmip.core.messages.contents import ResultStatus
from kmip.core.messages.contents import ResultReason
from kmip.core.messages.contents import ResultMessage
from kmip.core.messages.contents import ProtocolVersion
from kmip.core.misc import KeyFormatType
@@ -48,6 +49,7 @@ from kmip.services.results import GetResult
from kmip.services.results import OperationResult
from kmip.services.results import RegisterResult
from kmip.services.results import LocateResult
from kmip.services.results import DiscoverVersionsResult
class KMIP(object):
@@ -85,6 +87,9 @@ class KMIP(object):
credential=None):
raise NotImplementedError()
def discover_versions(self, protocol_versions=None):
raise NotImplementedError()
class KMIPImpl(KMIP):
@@ -95,6 +100,10 @@ class KMIPImpl(KMIP):
self.secret_factory = SecretFactory()
self.attribute_factory = AttributeFactory()
self.repo = MemRepo()
self.protocol_versions = [
ProtocolVersion.create(1, 1),
ProtocolVersion.create(1, 0)
]
def create(self, object_type, template_attribute, credential=None):
self.logger.debug('create() called')
@@ -279,6 +288,32 @@ class KMIPImpl(KMIP):
return LocateResult(ResultStatus(RS.OPERATION_FAILED),
result_reason=reason, result_message=msg)
def discover_versions(self, protocol_versions=None):
self.logger.debug(
"discover_versions(protocol_versions={0}) called".format(
protocol_versions))
msg = 'get protocol versions supported by server'
result_versions = list()
if protocol_versions:
msg += " and client; client versions {0}".format(protocol_versions)
for version in protocol_versions:
if version in self.protocol_versions:
result_versions.append(version)
else:
result_versions = self.protocol_versions
self.logger.debug(msg)
try:
return DiscoverVersionsResult(ResultStatus(RS.SUCCESS),
protocol_versions=result_versions)
except Exception:
msg = ResultMessage('DiscoverVersions Operation Failed')
reason = ResultReason(ResultReasonEnum.GENERAL_FAILURE)
return DiscoverVersionsResult(ResultStatus(RS.OPERATION_FAILED),
result_reason=reason,
result_message=msg)
def _validate_req_field(self, attrs, name, expected, msg, required=True):
self.logger.debug('Validating attribute %s' % name)
seen = False