mirror of
https://github.com/openkmip/pykmip
synced 2025-12-11 13:53:14 +00:00
Merge pull request #197 from OpenKMIP/feat/add-object-ownership
Adding operation policy enforcement to the KMIP server engine
This commit is contained in:
@@ -1468,6 +1468,244 @@ class TestKmipEngine(testtools.TestCase):
|
||||
*args
|
||||
)
|
||||
|
||||
def test_is_allowed_by_operation_policy(self):
|
||||
"""
|
||||
Test that an allowed operation is correctly allowed by the operation
|
||||
policy.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._operation_policies = {
|
||||
'test': {
|
||||
enums.ObjectType.SYMMETRIC_KEY: {
|
||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'test',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertTrue(is_allowed)
|
||||
|
||||
def test_is_allowed_by_operation_policy_blocked(self):
|
||||
"""
|
||||
Test that an unallowed operation is correctly blocked by the operation
|
||||
policy.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._operation_policies = {
|
||||
'test': {
|
||||
enums.ObjectType.SYMMETRIC_KEY: {
|
||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'random',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
|
||||
def test_is_allowed_by_operation_public(self):
|
||||
"""
|
||||
Test that a public operation is allowed by the operation policy.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._operation_policies = {
|
||||
'test': {
|
||||
enums.ObjectType.SYMMETRIC_KEY: {
|
||||
enums.Operation.GET: enums.Policy.ALLOW_ALL
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'test',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertTrue(is_allowed)
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'random',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertTrue(is_allowed)
|
||||
|
||||
def test_is_allowed_by_operation_block_all(self):
|
||||
"""
|
||||
Test that a blocked operation is blocked by the operation policy.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._operation_policies = {
|
||||
'test': {
|
||||
enums.ObjectType.SYMMETRIC_KEY: {
|
||||
enums.Operation.GET: enums.Policy.DISALLOW_ALL
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'test',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'random',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
|
||||
def test_is_allowed_by_operation_safety_check(self):
|
||||
"""
|
||||
Test that an unknown operation is blocked by the operation policy.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._operation_policies = {
|
||||
'test': {
|
||||
enums.ObjectType.SYMMETRIC_KEY: {
|
||||
enums.Operation.GET: 'unknown value'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'test',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
'test',
|
||||
'random',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
|
||||
def test_is_allowed_by_operation_policy_nonexistent_policy(self):
|
||||
"""
|
||||
Test that a check with a non-existent policy yields a logging warning
|
||||
and a blocked operation.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
policy = 'nonexistent-policy'
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
policy,
|
||||
'test',
|
||||
'test',
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
e._logger.warning.assert_called_once_with(
|
||||
"The '{0}' policy does not exist.".format(policy)
|
||||
)
|
||||
|
||||
def test_is_allowed_by_operation_policy_not_object_applicable(self):
|
||||
"""
|
||||
Test that a check for an object with a non-applicable policy yields
|
||||
a logging warning and a blocked operation.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._logger = mock.MagicMock()
|
||||
e._operation_policies = {
|
||||
'test': {
|
||||
enums.ObjectType.SYMMETRIC_KEY: {
|
||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
policy = 'test'
|
||||
object_type = enums.ObjectType.PRIVATE_KEY
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
policy,
|
||||
'test',
|
||||
'test',
|
||||
object_type,
|
||||
enums.Operation.GET
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
e._logger.warning.assert_called_once_with(
|
||||
"The '{0}' policy does not apply to {1} objects.".format(
|
||||
policy,
|
||||
e._get_enum_string(object_type)
|
||||
)
|
||||
)
|
||||
|
||||
def test_is_allowed_by_operation_policy_not_applicable(self):
|
||||
"""
|
||||
Test that a check with a non-applicable policy yields a logging
|
||||
warning and a blocked operation.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._logger = mock.MagicMock()
|
||||
e._operation_policies = {
|
||||
'test': {
|
||||
enums.ObjectType.SYMMETRIC_KEY: {
|
||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
policy = 'test'
|
||||
object_type = enums.ObjectType.SYMMETRIC_KEY
|
||||
operation = enums.Operation.CREATE
|
||||
is_allowed = e._is_allowed_by_operation_policy(
|
||||
policy,
|
||||
'test',
|
||||
'test',
|
||||
object_type,
|
||||
operation
|
||||
)
|
||||
|
||||
self.assertFalse(is_allowed)
|
||||
e._logger.warning.assert_called_once_with(
|
||||
"The '{0}' policy does not apply to {1} operations on {2} "
|
||||
"objects.".format(
|
||||
policy,
|
||||
e._get_enum_string(operation),
|
||||
e._get_enum_string(object_type)
|
||||
)
|
||||
)
|
||||
|
||||
def test_create(self):
|
||||
"""
|
||||
Test that a Create request can be processed correctly.
|
||||
@@ -2809,6 +3047,38 @@ class TestKmipEngine(testtools.TestCase):
|
||||
"Processing operation: Get"
|
||||
)
|
||||
|
||||
def test_get_not_allowed_by_policy(self):
|
||||
"""
|
||||
Test that an unallowed request is handled correctly by Get.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._client_identity = 'test'
|
||||
|
||||
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||
obj_a._owner = 'admin'
|
||||
|
||||
e._data_session.add(obj_a)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
id_a = str(obj_a.unique_identifier)
|
||||
payload = get.GetRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(id_a)
|
||||
)
|
||||
|
||||
# Test by specifying the ID of the object to get.
|
||||
args = [payload]
|
||||
self.assertRaisesRegex(
|
||||
exceptions.ItemNotFound,
|
||||
"Could not locate object: {0}".format(id_a),
|
||||
e._process_get,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_destroy(self):
|
||||
"""
|
||||
Test that a Destroy request can be processed correctly.
|
||||
@@ -2881,6 +3151,38 @@ class TestKmipEngine(testtools.TestCase):
|
||||
|
||||
e._data_session.commit()
|
||||
|
||||
def test_destroy_not_allowed_by_policy(self):
|
||||
"""
|
||||
Test that an unallowed request is handled correctly by Destroy.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._client_identity = 'test'
|
||||
|
||||
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||
obj_a._owner = 'admin'
|
||||
|
||||
e._data_session.add(obj_a)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
id_a = str(obj_a.unique_identifier)
|
||||
payload = destroy.DestroyRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(id_a)
|
||||
)
|
||||
|
||||
# Test by specifying the ID of the object to destroy.
|
||||
args = [payload]
|
||||
self.assertRaisesRegex(
|
||||
exceptions.ItemNotFound,
|
||||
"Could not locate object: {0}".format(id_a),
|
||||
e._process_destroy,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_query(self):
|
||||
"""
|
||||
Test that a Query request can be processed correctly, for different
|
||||
|
||||
Reference in New Issue
Block a user