diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index edd3cc1..7a15fec 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1552,6 +1552,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.CREATE), contents.Operation(enums.Operation.CREATE_KEY_PAIR), contents.Operation(enums.Operation.REGISTER), + contents.Operation(enums.Operation.LOCATE), contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.GET_ATTRIBUTES), contents.Operation(enums.Operation.GET_ATTRIBUTE_LIST), @@ -1560,10 +1561,14 @@ class KmipEngine(object): contents.Operation(enums.Operation.QUERY) ]) - if self._protocol_version == contents.ProtocolVersion.create(1, 1): + if self._protocol_version >= contents.ProtocolVersion.create(1, 1): operations.extend([ contents.Operation(enums.Operation.DISCOVER_VERSIONS) ]) + if self._protocol_version >= contents.ProtocolVersion.create(1, 2): + operations.extend([ + contents.Operation(enums.Operation.MAC) + ]) if enums.QueryFunction.QUERY_OBJECTS in queries: objects = list() diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 3125122..031c2d7 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -4468,14 +4468,12 @@ class TestKmipEngine(testtools.TestCase): *args ) - def test_query(self): + def test_query_1_0(self): """ - Test that a Query request can be processed correctly, for different - versions of KMIP. + Test that a Query request can be processed correctly, for KMIP 1.0. """ e = engine.KmipEngine() - # Test for KMIP 1.0. e._logger = mock.MagicMock() e._protocol_version = contents.ProtocolVersion.create(1, 0) @@ -4497,7 +4495,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(9, len(result.operations)) + self.assertEqual(10, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -4511,29 +4509,33 @@ class TestKmipEngine(testtools.TestCase): result.operations[2].value ) self.assertEqual( - enums.Operation.GET, + enums.Operation.LOCATE, result.operations[3].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTES, + enums.Operation.GET, result.operations[4].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTE_LIST, + enums.Operation.GET_ATTRIBUTES, result.operations[5].value ) self.assertEqual( - enums.Operation.ACTIVATE, + enums.Operation.GET_ATTRIBUTE_LIST, result.operations[6].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.ACTIVATE, result.operations[7].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[8].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[9].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -4544,19 +4546,173 @@ class TestKmipEngine(testtools.TestCase): self.assertEqual(list(), result.application_namespaces) self.assertEqual(list(), result.extension_information) - # Test for KMIP 1.1. + def test_query_1_1(self): + """ + Test that a Query request can be processed correctly, for KMIP 1.1. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() e._protocol_version = contents.ProtocolVersion.create(1, 1) + payload = query.QueryRequestPayload([ + misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS), + misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS), + misc.QueryFunction( + enums.QueryFunction.QUERY_SERVER_INFORMATION + ), + misc.QueryFunction( + enums.QueryFunction.QUERY_APPLICATION_NAMESPACES + ), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP) + ]) + result = e._process_query(payload) e._logger.info.assert_called_once_with("Processing operation: Query") + self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(10, len(result.operations)) + self.assertEqual(11, len(result.operations)) + self.assertEqual( + enums.Operation.CREATE, + result.operations[0].value + ) + self.assertEqual( + enums.Operation.CREATE_KEY_PAIR, + result.operations[1].value + ) + self.assertEqual( + enums.Operation.REGISTER, + result.operations[2].value + ) + self.assertEqual( + enums.Operation.LOCATE, + result.operations[3].value + ) + self.assertEqual( + enums.Operation.GET, + result.operations[4].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTES, + result.operations[5].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTE_LIST, + result.operations[6].value + ) + self.assertEqual( + enums.Operation.ACTIVATE, + result.operations[7].value + ) + self.assertEqual( + enums.Operation.DESTROY, + result.operations[8].value + ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[9].value + ) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, - result.operations[-1].value + result.operations[10].value ) + self.assertEqual(list(), result.object_types) + self.assertIsNotNone(result.vendor_identification) + self.assertEqual( + "PyKMIP {0} Software Server".format(kmip.__version__), + result.vendor_identification.value + ) + self.assertIsNone(result.server_information) + self.assertEqual(list(), result.application_namespaces) + self.assertEqual(list(), result.extension_information) + + def test_query_1_2(self): + """ + Test that a Query request can be processed correctly, for KMIP 1.2. + """ + e = engine.KmipEngine() + + e._logger = mock.MagicMock() + e._protocol_version = contents.ProtocolVersion.create(1, 2) + + payload = query.QueryRequestPayload([ + misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS), + misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS), + misc.QueryFunction( + enums.QueryFunction.QUERY_SERVER_INFORMATION + ), + misc.QueryFunction( + enums.QueryFunction.QUERY_APPLICATION_NAMESPACES + ), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP) + ]) + + result = e._process_query(payload) + + e._logger.info.assert_called_once_with("Processing operation: Query") + self.assertIsInstance(result, query.QueryResponsePayload) + self.assertIsNotNone(result.operations) + self.assertEqual(12, len(result.operations)) + self.assertEqual( + enums.Operation.CREATE, + result.operations[0].value + ) + self.assertEqual( + enums.Operation.CREATE_KEY_PAIR, + result.operations[1].value + ) + self.assertEqual( + enums.Operation.REGISTER, + result.operations[2].value + ) + self.assertEqual( + enums.Operation.LOCATE, + result.operations[3].value + ) + self.assertEqual( + enums.Operation.GET, + result.operations[4].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTES, + result.operations[5].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTE_LIST, + result.operations[6].value + ) + self.assertEqual( + enums.Operation.ACTIVATE, + result.operations[7].value + ) + self.assertEqual( + enums.Operation.DESTROY, + result.operations[8].value + ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[9].value + ) + self.assertEqual( + enums.Operation.DISCOVER_VERSIONS, + result.operations[10].value + ) + self.assertEqual( + enums.Operation.MAC, + result.operations[11].value + ) + self.assertEqual(list(), result.object_types) + self.assertIsNotNone(result.vendor_identification) + self.assertEqual( + "PyKMIP {0} Software Server".format(kmip.__version__), + result.vendor_identification.value + ) + self.assertIsNone(result.server_information) + self.assertEqual(list(), result.application_namespaces) + self.assertEqual(list(), result.extension_information) def test_discover_versions(self): """