mirror of
https://github.com/openkmip/pykmip
synced 2025-12-10 05:13:15 +00:00
Adding server support for the GetAttributeList operation
This change adds support for the GetAttributeList operation. The user can specify the ID of a managed object and get back a list containing the names of all attributes currently set on the object. The user can also omit the ID and the server will default to using the ID placeholder for the object ID. New server tests have been added to cover this feature. The GetAttributeList payloads have also been updated for consistency with other payloads, requiring minor updates in other clients and unit tests.
This commit is contained in:
@@ -44,6 +44,7 @@ from kmip.core.messages.payloads import create_key_pair
|
||||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import get_attributes
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import register
|
||||
@@ -883,6 +884,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_register = mock.MagicMock()
|
||||
e._process_get = mock.MagicMock()
|
||||
e._process_get_attributes = mock.MagicMock()
|
||||
e._process_get_attribute_list = mock.MagicMock()
|
||||
e._process_activate = mock.MagicMock()
|
||||
e._process_destroy = mock.MagicMock()
|
||||
e._process_query = mock.MagicMock()
|
||||
@@ -893,6 +895,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_operation(enums.Operation.REGISTER, None)
|
||||
e._process_operation(enums.Operation.GET, None)
|
||||
e._process_operation(enums.Operation.GET_ATTRIBUTES, None)
|
||||
e._process_operation(enums.Operation.GET_ATTRIBUTE_LIST, None)
|
||||
e._process_operation(enums.Operation.ACTIVATE, None)
|
||||
e._process_operation(enums.Operation.DESTROY, None)
|
||||
e._process_operation(enums.Operation.QUERY, None)
|
||||
@@ -903,6 +906,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_register.assert_called_with(None)
|
||||
e._process_get.assert_called_with(None)
|
||||
e._process_get_attributes.assert_called_with(None)
|
||||
e._process_get_attribute_list.assert_called_with(None)
|
||||
e._process_activate.assert_called_with(None)
|
||||
e._process_destroy.assert_called_with(None)
|
||||
e._process_query.assert_called_with(None)
|
||||
@@ -3866,6 +3870,184 @@ class TestKmipEngine(testtools.TestCase):
|
||||
*args
|
||||
)
|
||||
|
||||
def test_get_attribute_list(self):
|
||||
"""
|
||||
Test that a GetAttributeList request can be processed correctly.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
secret = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
0,
|
||||
b''
|
||||
)
|
||||
|
||||
e._data_session.add(secret)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(
|
||||
unique_identifier='1'
|
||||
)
|
||||
|
||||
response_payload = e._process_get_attribute_list(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_any_call(
|
||||
"Processing operation: GetAttributeList"
|
||||
)
|
||||
self.assertEqual(
|
||||
'1',
|
||||
response_payload.unique_identifier
|
||||
)
|
||||
self.assertEqual(
|
||||
8,
|
||||
len(response_payload.attribute_names)
|
||||
)
|
||||
self.assertIn(
|
||||
"Object Type",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Algorithm",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Length",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Operation Policy Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Usage Mask",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"State",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Unique Identifier",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
|
||||
def test_get_attribute_list_with_no_arguments(self):
|
||||
"""
|
||||
Test that a GetAttributeList request with no arguments can be
|
||||
processed correctly.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
secret = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
0,
|
||||
b''
|
||||
)
|
||||
|
||||
e._data_session.add(secret)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._id_placeholder = '1'
|
||||
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload()
|
||||
|
||||
response_payload = e._process_get_attribute_list(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_any_call(
|
||||
"Processing operation: GetAttributeList"
|
||||
)
|
||||
self.assertEqual(
|
||||
'1',
|
||||
response_payload.unique_identifier
|
||||
)
|
||||
self.assertEqual(
|
||||
8,
|
||||
len(response_payload.attribute_names)
|
||||
)
|
||||
self.assertIn(
|
||||
"Object Type",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Algorithm",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Length",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Operation Policy Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Usage Mask",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"State",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Unique Identifier",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
|
||||
def test_get_attribute_list_not_allowed_by_policy(self):
|
||||
"""
|
||||
Test that an unallowed request is handled correctly by
|
||||
GetAttributeList.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._client_identity = 'test'
|
||||
|
||||
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||
obj_a._owner = 'admin'
|
||||
|
||||
e._data_session.add(obj_a)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
id_a = str(obj_a.unique_identifier)
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(
|
||||
unique_identifier=id_a
|
||||
)
|
||||
|
||||
# Test by specifying the ID of the object whose attributes should
|
||||
# be retrieved.
|
||||
args = [payload]
|
||||
self.assertRaisesRegex(
|
||||
exceptions.ItemNotFound,
|
||||
"Could not locate object: {0}".format(id_a),
|
||||
e._process_get_attribute_list,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_activate(self):
|
||||
"""
|
||||
Test that an Activate request can be processed correctly.
|
||||
@@ -4180,7 +4362,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsInstance(result, query.QueryResponsePayload)
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(8, len(result.operations))
|
||||
self.assertEqual(9, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.CREATE,
|
||||
result.operations[0].value
|
||||
@@ -4202,17 +4384,21 @@ class TestKmipEngine(testtools.TestCase):
|
||||
result.operations[4].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.ACTIVATE,
|
||||
enums.Operation.GET_ATTRIBUTE_LIST,
|
||||
result.operations[5].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.DESTROY,
|
||||
enums.Operation.ACTIVATE,
|
||||
result.operations[6].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
enums.Operation.DESTROY,
|
||||
result.operations[7].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
result.operations[8].value
|
||||
)
|
||||
self.assertEqual(list(), result.object_types)
|
||||
self.assertIsNotNone(result.vendor_identification)
|
||||
self.assertEqual(
|
||||
@@ -4231,7 +4417,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(9, len(result.operations))
|
||||
self.assertEqual(10, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.DISCOVER_VERSIONS,
|
||||
result.operations[-1].value
|
||||
|
||||
Reference in New Issue
Block a user