mirror of
https://github.com/openkmip/pykmip
synced 2025-12-22 19:23:27 +00:00
Add Name attribute filtering of locate for server
This commit is contained in:
@@ -3553,6 +3553,101 @@ class TestKmipEngine(testtools.TestCase):
|
||||
[uid.value for uid in response_payload.unique_identifiers]
|
||||
)
|
||||
|
||||
def test_locate_with_name(self):
|
||||
"""
|
||||
Test locate operation when 'Name' attribute is given.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00')
|
||||
obj_a = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES, 128, key, name='name0')
|
||||
obj_b = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.DES, 128, key, name='name0')
|
||||
obj_c = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES, 128, key, name='name1')
|
||||
|
||||
e._data_session.add(obj_a)
|
||||
e._data_session.add(obj_b)
|
||||
e._data_session.add(obj_c)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
id_a = str(obj_a.unique_identifier)
|
||||
id_b = str(obj_b.unique_identifier)
|
||||
id_c = str(obj_c.unique_identifier)
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
# Locate the obj with name 'name0'
|
||||
attrs = [
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'name0',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
payload = locate.LocateRequestPayload(attributes=attrs)
|
||||
e._logger.reset_mock()
|
||||
response_payload = e._process_locate(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_any_call(
|
||||
"Processing operation: Locate"
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
len(response_payload.unique_identifiers),
|
||||
2
|
||||
)
|
||||
self.assertIn(
|
||||
id_a,
|
||||
[uid.value for uid in response_payload.unique_identifiers]
|
||||
)
|
||||
self.assertIn(
|
||||
id_b,
|
||||
[uid.value for uid in response_payload.unique_identifiers]
|
||||
)
|
||||
|
||||
# Locate the obj with name 'name1'
|
||||
attrs = [
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'name1',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
payload = locate.LocateRequestPayload(attributes=attrs)
|
||||
e._logger.reset_mock()
|
||||
response_payload = e._process_locate(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_any_call(
|
||||
"Processing operation: Locate"
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
len(response_payload.unique_identifiers),
|
||||
1
|
||||
)
|
||||
self.assertIn(
|
||||
id_c,
|
||||
[uid.value for uid in response_payload.unique_identifiers]
|
||||
)
|
||||
|
||||
def test_get(self):
|
||||
"""
|
||||
Test that a Get request can be processed correctly.
|
||||
|
||||
Reference in New Issue
Block a user