2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-28 22:23:50 +00:00

Update the Query payloads

This change upgrades the Query payloads, fixing error messages,
comments, local variables, and internal payload structure to
bring Query support up to KMIP 1.4 standards, in addition to
compliance with the current payload format. The corresponding
unit test suite has been completely rewritten to reflect these
changes.

This change prepares the Query payloads for future updates to
support KMIP 2.0.
This commit is contained in:
Peter Hamilton
2019-04-22 16:59:26 -04:00
committed by Peter Hamilton
parent b968378eb8
commit 314dd8761e
6 changed files with 4834 additions and 820 deletions

View File

@@ -6421,82 +6421,80 @@ class TestKmipEngine(testtools.TestCase):
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion(1, 0)
payload = payloads.QueryRequestPayload([
misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS),
misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS),
misc.QueryFunction(
enums.QueryFunction.QUERY_SERVER_INFORMATION
),
misc.QueryFunction(
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES
),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP)
])
payload = payloads.QueryRequestPayload(
query_functions=[
enums.QueryFunction.QUERY_OPERATIONS,
enums.QueryFunction.QUERY_OBJECTS,
enums.QueryFunction.QUERY_SERVER_INFORMATION,
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES,
enums.QueryFunction.QUERY_EXTENSION_LIST,
enums.QueryFunction.QUERY_EXTENSION_MAP
]
)
result = e._process_query(payload)
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, payloads.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertIsInstance(result.operations, list)
self.assertEqual(12, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
result.operations[0]
)
self.assertEqual(
enums.Operation.CREATE_KEY_PAIR,
result.operations[1].value
result.operations[1]
)
self.assertEqual(
enums.Operation.REGISTER,
result.operations[2].value
result.operations[2]
)
self.assertEqual(
enums.Operation.DERIVE_KEY,
result.operations[3].value
result.operations[3]
)
self.assertEqual(
enums.Operation.LOCATE,
result.operations[4].value
result.operations[4]
)
self.assertEqual(
enums.Operation.GET,
result.operations[5].value
result.operations[5]
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
result.operations[6].value
result.operations[6]
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value
result.operations[7]
)
self.assertEqual(
enums.Operation.ACTIVATE,
result.operations[8].value
result.operations[8]
)
self.assertEqual(
enums.Operation.REVOKE,
result.operations[9].value
result.operations[9]
)
self.assertEqual(
enums.Operation.DESTROY,
result.operations[10].value
result.operations[10]
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[11].value
result.operations[11]
)
self.assertEqual(list(), result.object_types)
self.assertIsNone(result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
"PyKMIP {0} Software Server".format(kmip.__version__),
result.vendor_identification.value
result.vendor_identification
)
self.assertIsNone(result.server_information)
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
self.assertIsNone(result.application_namespaces)
self.assertIsNone(result.extension_information)
def test_query_1_1(self):
"""
@@ -6507,86 +6505,84 @@ class TestKmipEngine(testtools.TestCase):
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion(1, 1)
payload = payloads.QueryRequestPayload([
misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS),
misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS),
misc.QueryFunction(
enums.QueryFunction.QUERY_SERVER_INFORMATION
),
misc.QueryFunction(
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES
),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP)
])
payload = payloads.QueryRequestPayload(
query_functions=[
enums.QueryFunction.QUERY_OPERATIONS,
enums.QueryFunction.QUERY_OBJECTS,
enums.QueryFunction.QUERY_SERVER_INFORMATION,
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES,
enums.QueryFunction.QUERY_EXTENSION_LIST,
enums.QueryFunction.QUERY_EXTENSION_MAP
]
)
result = e._process_query(payload)
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, payloads.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertIsInstance(result.operations, list)
self.assertEqual(13, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
result.operations[0]
)
self.assertEqual(
enums.Operation.CREATE_KEY_PAIR,
result.operations[1].value
result.operations[1]
)
self.assertEqual(
enums.Operation.REGISTER,
result.operations[2].value
result.operations[2]
)
self.assertEqual(
enums.Operation.DERIVE_KEY,
result.operations[3].value
result.operations[3]
)
self.assertEqual(
enums.Operation.LOCATE,
result.operations[4].value
result.operations[4]
)
self.assertEqual(
enums.Operation.GET,
result.operations[5].value
result.operations[5]
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
result.operations[6].value
result.operations[6]
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value
result.operations[7]
)
self.assertEqual(
enums.Operation.ACTIVATE,
result.operations[8].value
result.operations[8]
)
self.assertEqual(
enums.Operation.REVOKE,
result.operations[9].value
result.operations[9]
)
self.assertEqual(
enums.Operation.DESTROY,
result.operations[10].value
result.operations[10]
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[11].value
result.operations[11]
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[12].value
result.operations[12]
)
self.assertEqual(list(), result.object_types)
self.assertIsNone(result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
"PyKMIP {0} Software Server".format(kmip.__version__),
result.vendor_identification.value
result.vendor_identification
)
self.assertIsNone(result.server_information)
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
self.assertIsNone(result.application_namespaces)
self.assertIsNone(result.extension_information)
def test_query_1_2(self):
"""
@@ -6597,106 +6593,104 @@ class TestKmipEngine(testtools.TestCase):
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion(1, 2)
payload = payloads.QueryRequestPayload([
misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS),
misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS),
misc.QueryFunction(
enums.QueryFunction.QUERY_SERVER_INFORMATION
),
misc.QueryFunction(
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES
),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP)
])
payload = payloads.QueryRequestPayload(
query_functions=[
enums.QueryFunction.QUERY_OPERATIONS,
enums.QueryFunction.QUERY_OBJECTS,
enums.QueryFunction.QUERY_SERVER_INFORMATION,
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES,
enums.QueryFunction.QUERY_EXTENSION_LIST,
enums.QueryFunction.QUERY_EXTENSION_MAP
]
)
result = e._process_query(payload)
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, payloads.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertIsInstance(result.operations, list)
self.assertEqual(18, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
result.operations[0]
)
self.assertEqual(
enums.Operation.CREATE_KEY_PAIR,
result.operations[1].value
result.operations[1]
)
self.assertEqual(
enums.Operation.REGISTER,
result.operations[2].value
result.operations[2]
)
self.assertEqual(
enums.Operation.DERIVE_KEY,
result.operations[3].value
result.operations[3]
)
self.assertEqual(
enums.Operation.LOCATE,
result.operations[4].value
result.operations[4]
)
self.assertEqual(
enums.Operation.GET,
result.operations[5].value
result.operations[5]
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
result.operations[6].value
result.operations[6]
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value
result.operations[7]
)
self.assertEqual(
enums.Operation.ACTIVATE,
result.operations[8].value
result.operations[8]
)
self.assertEqual(
enums.Operation.REVOKE,
result.operations[9].value
result.operations[9]
)
self.assertEqual(
enums.Operation.DESTROY,
result.operations[10].value
result.operations[10]
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[11].value
result.operations[11]
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[12].value
result.operations[12]
)
self.assertEqual(
enums.Operation.ENCRYPT,
result.operations[13].value
result.operations[13]
)
self.assertEqual(
enums.Operation.DECRYPT,
result.operations[14].value
result.operations[14]
)
self.assertEqual(
enums.Operation.SIGN,
result.operations[15].value
result.operations[15]
)
self.assertEqual(
enums.Operation.SIGNATURE_VERIFY,
result.operations[16].value
result.operations[16]
)
self.assertEqual(
enums.Operation.MAC,
result.operations[17].value
result.operations[17]
)
self.assertEqual(list(), result.object_types)
self.assertIsNone(result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
"PyKMIP {0} Software Server".format(kmip.__version__),
result.vendor_identification.value
result.vendor_identification
)
self.assertIsNone(result.server_information)
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
self.assertIsNone(result.application_namespaces)
self.assertIsNone(result.extension_information)
def test_discover_versions(self):
"""