2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-10 13:23:15 +00:00

Adding KmipEngine support for Destroy

This change adds support for the Destroy operation to the KmipEngine.
New exceptions and test cases are included.
This commit is contained in:
Peter
2016-03-08 15:34:12 -05:00
parent 81222e23f1
commit 27befcb85c
3 changed files with 343 additions and 5 deletions

View File

@@ -14,11 +14,16 @@
# under the License.
import mock
import sqlalchemy
from sqlalchemy.orm import exc
import testtools
import time
import kmip
from kmip.core import attributes
from kmip.core import enums
from kmip.core import exceptions
from kmip.core import misc
@@ -27,9 +32,13 @@ from kmip.core import objects
from kmip.core.messages import contents
from kmip.core.messages import messages
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import query
from kmip.pie import objects as pie_objects
from kmip.pie import sqltypes
from kmip.services.server import engine
@@ -50,6 +59,14 @@ class TestKmipEngine(testtools.TestCase):
def setUp(self):
super(TestKmipEngine, self).setUp()
self.engine = sqlalchemy.create_engine(
'sqlite:///:memory:',
)
sqltypes.Base.metadata.create_all(self.engine)
self.session_factory = sqlalchemy.orm.sessionmaker(
bind=self.engine
)
def tearDown(self):
super(TestKmipEngine, self).tearDown()
@@ -621,12 +638,15 @@ class TestKmipEngine(testtools.TestCase):
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._process_destroy = mock.MagicMock()
e._process_query = mock.MagicMock()
e._process_discover_versions = mock.MagicMock()
e._process_operation(enums.Operation.DESTROY, None)
e._process_operation(enums.Operation.QUERY, None)
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
e._process_destroy.assert_called_with(None)
e._process_query.assert_called_with(None)
e._process_discover_versions.assert_called_with(None)
@@ -649,6 +669,178 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_destroy(self):
"""
Test that a Destroy request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
e._data_session.add(obj_a)
e._data_session.add(obj_b)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
id_b = str(obj_b.unique_identifier)
# Test by specifying the ID of the object to destroy.
payload = destroy.DestroyRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a)
)
response_payload = e._process_destroy(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_called_once_with(
"Processing operation: Destroy"
)
self.assertEqual(str(id_a), response_payload.unique_identifier.value)
self.assertRaises(
exc.NoResultFound,
e._data_session.query(pie_objects.OpaqueObject).filter(
pie_objects.ManagedObject.unique_identifier == id_a
).one
)
e._data_session.commit()
e._data_store_session_factory()
e._logger.reset_mock()
e._id_placeholder = str(id_b)
# Test by using the ID placeholder to specify the object to destroy.
payload = destroy.DestroyRequestPayload()
response_payload = e._process_destroy(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_called_once_with(
"Processing operation: Destroy"
)
self.assertEqual(str(id_b), response_payload.unique_identifier.value)
self.assertRaises(
exc.NoResultFound,
e._data_session.query(pie_objects.OpaqueObject).filter(
pie_objects.ManagedObject.unique_identifier == id_b
).one
)
e._data_session.commit()
def test_destroy_missing_object(self):
"""
Test that an ItemNotFound error is generated when attempting to
destroy an object that does not exist.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
payload = destroy.DestroyRequestPayload(
unique_identifier=attributes.UniqueIdentifier('1')
)
args = (payload, )
regex = "Could not locate object: 1"
self.assertRaisesRegexp(
exceptions.ItemNotFound,
regex,
e._process_destroy,
*args
)
e._data_session.commit()
e._logger.info.assert_called_once_with(
"Processing operation: Destroy"
)
e._logger.warning.assert_called_once_with(
"Could not identify object type for object: 1"
)
self.assertTrue(e._logger.exception.called)
def test_destroy_multiple_objects(self):
"""
Test that a sqlalchemy.orm.exc.MultipleResultsFound error is generated
when multiple objects map to the same object ID.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
test_exception = exc.MultipleResultsFound()
e._data_session.query = mock.MagicMock(side_effect=test_exception)
e._logger = mock.MagicMock()
payload = destroy.DestroyRequestPayload(
unique_identifier=attributes.UniqueIdentifier('1')
)
args = (payload, )
self.assertRaises(
exc.MultipleResultsFound,
e._process_destroy,
*args
)
e._data_session.commit()
e._logger.info.assert_called_once_with(
"Processing operation: Destroy"
)
e._logger.warning.assert_called_once_with(
"Multiple objects found for ID: 1"
)
def test_destroy_unsupported_object_type(self):
"""
Test that an InvalidField error is generated when attempting to
destroy an unsupported object type.
"""
e = engine.KmipEngine()
e._object_map = {enums.ObjectType.OPAQUE_DATA: None}
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
payload = destroy.DestroyRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a)
)
args = (payload, )
name = enums.ObjectType.OPAQUE_DATA.name
regex = "The {0} object type is not supported.".format(
''.join(
[x.capitalize() for x in name[9:].split('_')]
)
)
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_destroy,
*args
)
e._data_session.commit()
e._logger.info.assert_called_once_with(
"Processing operation: Destroy"
)
def test_query(self):
"""
Test that a Query request can be processed correctly, for different
@@ -678,8 +870,15 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(1, len(result.operations))
self.assertEqual(enums.Operation.QUERY, result.operations[0].value)
self.assertEqual(2, len(result.operations))
self.assertEqual(
enums.Operation.DESTROY,
result.operations[0].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[1].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@@ -698,11 +897,18 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsNotNone(result.operations)
self.assertEqual(2, len(result.operations))
self.assertEqual(enums.Operation.QUERY, result.operations[0].value)
self.assertEqual(3, len(result.operations))
self.assertEqual(
enums.Operation.DESTROY,
result.operations[0].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[1].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[1].value
result.operations[2].value
)
def test_discover_versions(self):