diff --git a/kmip/demos/pie/locate.py b/kmip/demos/pie/locate.py index bbd1336..a4c4552 100644 --- a/kmip/demos/pie/locate.py +++ b/kmip/demos/pie/locate.py @@ -13,44 +13,64 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core.enums import NameType -from kmip.core.enums import Operation - -from kmip.core.attributes import Name - -from kmip.core.objects import Attribute - -from kmip.demos import utils - -from kmip.pie import client - +import calendar import logging import sys +import time + +from kmip.core import enums +from kmip.core.factories.attributes import AttributeFactory +from kmip.demos import utils +from kmip.pie import client if __name__ == '__main__': logger = utils.build_console_logger(logging.INFO) # Build and parse arguments - parser = utils.build_cli_parser(Operation.LOCATE) + parser = utils.build_cli_parser(enums.Operation.LOCATE) opts, args = parser.parse_args(sys.argv[1:]) config = opts.config name = opts.name + initial_dates = opts.initial_dates - # Exit early if name is not specified - if name is None: - logger.error('No name provided, exiting early from demo') - sys.exit() + attribute_factory = AttributeFactory() - # Build name attribute - # TODO Push this into the AttributeFactory - attribute_name = Attribute.AttributeName('Name') - name_value = Name.NameValue(name) - name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING) - value = Name.create(name_value=name_value, name_type=name_type) - name_obj = Attribute(attribute_name=attribute_name, attribute_value=value) - attributes = [name_obj] + # Build attributes if any are specified + attributes = [] + if name: + attributes.append( + attribute_factory.create_attribute(enums.AttributeType.NAME, name) + ) + for initial_date in initial_dates: + try: + t = time.strptime(initial_date) + except ValueError, TypeError: + logger.error( + "Invalid initial date provided: {}".format(initial_date) + ) + logger.info( + "Date values should be formatted like this: " + "'Tue Jul 23 18:39:01 2019'" + ) + sys.exit(-1) + + try: + t = calendar.timegm(t) + except Exception: + logger.error( + "Failed to convert initial date time tuple " + "to an integer: {}".format(t) + ) + sys.exit(-2) + + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + t + ) + ) # Build the client and connect to the server with client.ProxyKmipClient( diff --git a/kmip/demos/units/locate.py b/kmip/demos/units/locate.py index 7eb6057..d8dff35 100644 --- a/kmip/demos/units/locate.py +++ b/kmip/demos/units/locate.py @@ -13,42 +13,30 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core.enums import CredentialType -from kmip.core.enums import NameType -from kmip.core.enums import Operation -from kmip.core.enums import ResultStatus - -from kmip.core.attributes import Name - -from kmip.core.factories.attributes import AttributeFactory -from kmip.core.factories.credentials import CredentialFactory - -from kmip.core.objects import Attribute - -from kmip.demos import utils - -from kmip.services.kmip_client import KMIPProxy - +import calendar import logging import sys +import time + +from kmip.core import enums +from kmip.core.factories.attributes import AttributeFactory +from kmip.core.factories.credentials import CredentialFactory +from kmip.demos import utils +from kmip.services import kmip_client if __name__ == '__main__': logger = utils.build_console_logger(logging.INFO) # Build and parse arguments - parser = utils.build_cli_parser(Operation.LOCATE) + parser = utils.build_cli_parser(enums.Operation.LOCATE) opts, args = parser.parse_args(sys.argv[1:]) username = opts.username password = opts.password config = opts.config name = opts.name - - # Exit early if the UUID is not specified - if name is None: - logger.error('No name provided, exiting early from demo') - sys.exit() + initial_dates = opts.initial_dates attribute_factory = AttributeFactory() credential_factory = CredentialFactory() @@ -58,34 +46,63 @@ if __name__ == '__main__': if (username is None) and (password is None): credential = None else: - credential_type = CredentialType.USERNAME_AND_PASSWORD - credential_value = {'Username': username, - 'Password': password} - credential = credential_factory.create_credential(credential_type, - credential_value) + credential_type = enums.CredentialType.USERNAME_AND_PASSWORD + credential_value = { + "Username": username, + "Password": password + } + credential = credential_factory.create_credential( + credential_type, + credential_value + ) + # Build the client and connect to the server - client = KMIPProxy(config=config, config_file=opts.config_file) + client = kmip_client.KMIPProxy(config=config, config_file=opts.config_file) client.open() - # Build name attribute - # TODO (peter-hamilton) Push this into the AttributeFactory - attribute_name = Attribute.AttributeName('Name') - name_value = Name.NameValue(name) - name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING) - value = Name.create(name_value=name_value, name_type=name_type) - name_obj = Attribute(attribute_name=attribute_name, attribute_value=value) - attributes = [name_obj] + # Build attributes if any are specified + attributes = [] + if name: + attributes.append( + attribute_factory.create_attribute(enums.AttributeType.NAME, name) + ) + for initial_date in initial_dates: + try: + t = time.strptime(initial_date) + except ValueError: + logger.error( + "Invalid initial date provided: {}".format(initial_date) + ) + logger.info( + "Date values should be formatted like this: " + "'Tue Jul 23 18:39:01 2019'" + ) + sys.exit(-1) - # Locate UUID of specified SYMMETRIC_KEY object - result = client.locate(attributes=attributes, - credential=credential) + try: + t = calendar.timegm(t) + except Exception: + logger.error( + "Failed to convert initial date time tuple " + "to an integer: {}".format(t) + ) + sys.exit(-2) + + attributes.append( + attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + t + ) + ) + + result = client.locate(attributes=attributes, credential=credential) client.close() # Display operation results logger.info('locate() result status: {0}'.format( result.result_status.value)) - if result.result_status.value == ResultStatus.SUCCESS: + if result.result_status.value == enums.ResultStatus.SUCCESS: logger.info('located UUIDs:') for uuid in result.uuids: logger.info('{0}'.format(uuid)) diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index befc789..1224907 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -238,6 +238,20 @@ def build_cli_parser(operation=None): default=None, dest="name", help="Name of secret to retrieve from the KMIP server") + parser.add_option( + "--initial-date", + action="append", + type="str", + default=[], + dest="initial_dates", + help=( + "Initial date(s) in UTC of the secret to retrieve from the " + "KMIP server. Use once to perform an exact date match. Use " + "twice to create a date range that the secret's date should " + "be within. The value format should look like this: " + "'Tue Jul 23 18:39:01 2019'" + ) + ) elif operation is Operation.REGISTER: parser.add_option( "-f", diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 71e4785..f643dee 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -652,7 +652,7 @@ class KmipEngine(object): names.append(name) return names elif attr_name == 'Object Type': - return managed_object._object_type + return managed_object.object_type elif attr_name == 'Cryptographic Algorithm': return managed_object.cryptographic_algorithm elif attr_name == 'Cryptographic Length': @@ -926,6 +926,63 @@ class KmipEngine(object): else: return False + def _is_valid_date(self, date_type, value, start, end): + date_type = date_type.value.lower() + + if start is not None: + if end is not None: + if value < start: + self._logger.debug( + "Failed match: object's {} ({}) is less than " + "the starting {} ({}).".format( + date_type, + time.asctime(time.gmtime(value)), + date_type, + time.asctime(time.gmtime(start)) + ) + ) + return False + elif value > end: + self._logger.debug( + "Failed match: object's {} ({}) is greater than " + "the ending {} ({}).".format( + date_type, + time.asctime(time.gmtime(value)), + date_type, + time.asctime(time.gmtime(end)) + ) + ) + return False + else: + if start != value: + self._logger.debug( + "Failed match: object's {} ({}) does not match " + "the specified {} ({}).".format( + date_type, + time.asctime(time.gmtime(value)), + date_type, + time.asctime(time.gmtime(start)) + ) + ) + return False + return True + + def _track_date_attributes(self, date_type, date_values, value): + if date_values.get("start") is None: + date_values["start"] = value + elif date_values.get("end") is None: + if value > date_values.get("start"): + date_values["end"] = value + else: + date_values["end"] = date_values.get("start") + date_values["start"] = value + else: + raise exceptions.InvalidField( + "Too many {} attributes provided. " + "Include one for an exact match. " + "Include two for a ranged match.".format(date_type.value) + ) + def _get_object_with_access_controls( self, uid, @@ -1549,20 +1606,63 @@ class KmipEngine(object): managed_objects_filtered = [] # Filter the objects based on given attributes. - # TODO: Currently will only filter for 'Name'. - # Needs to add other attributes. for managed_object in managed_objects: - for attribute in payload.attributes: - attribute_name = attribute.attribute_name.value - attribute_value = attribute.attribute_value - attr = self._get_attribute_from_managed_object( - managed_object, attribute_name) - if attribute_name == 'Name': - names = attr - if attribute_value not in names: + self._logger.debug( + "Evaluating object: {}".format( + managed_object.unique_identifier + ) + ) + + add_object = True + initial_date = {} + + for payload_attribute in payload.attributes: + name = payload_attribute.attribute_name.value + value = payload_attribute.attribute_value + attribute = self._get_attribute_from_managed_object( + managed_object, + name + ) + if attribute is None: + continue + elif name == "Name": + if value not in attribute: + self._logger.debug( + "Failed match: " + "the specified name ({}) does not match " + "any of the object's names ({}).".format( + value, + attribute + ) + ) + add_object = False break - # TODO: filtering on other attributes - else: + elif name == enums.AttributeType.INITIAL_DATE.value: + initial_date["value"] = attribute + self._track_date_attributes( + enums.AttributeType.INITIAL_DATE, + initial_date, + value.value + ) + else: + if value != attribute: + add_object = False + break + + if initial_date.get("value"): + add_object &= self._is_valid_date( + enums.AttributeType.INITIAL_DATE, + initial_date.get("value"), + initial_date.get("start"), + initial_date.get("end") + ) + + if add_object: + self._logger.debug( + "Locate filter matched object: {}".format( + managed_object.unique_identifier + ) + ) managed_objects_filtered.append(managed_object) managed_objects = managed_objects_filtered diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index c90ef12..530eb4a 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -14,7 +14,9 @@ # under the License. import logging -from testtools import TestCase +import pytest +import testtools +import time from kmip.core.attributes import CryptographicAlgorithm from kmip.core.attributes import CryptographicLength @@ -53,11 +55,9 @@ from kmip.core.secrets import Certificate from kmip.core.secrets import SecretData from kmip.core.secrets import OpaqueObject -import pytest - @pytest.mark.usefixtures("client") -class TestIntegration(TestCase): +class TestIntegration(testtools.TestCase): def setUp(self): super(TestIntegration, self).setUp() @@ -71,6 +71,11 @@ class TestIntegration(TestCase): def tearDown(self): super(TestIntegration, self).tearDown() + result = self.client.locate() + if result.result_status.value == ResultStatus.SUCCESS: + for uuid in result.uuids: + self.client.destroy(uuid=uuid) + def _create_symmetric_key(self, key_name=None): """ Helper function for creating symmetric keys. Used any time a key @@ -1193,3 +1198,166 @@ class TestIntegration(TestCase): self.assertEqual( ResultStatus.OPERATION_FAILED, result.result_status.value) + + def test_symmetric_key_create_getattributes_locate_destroy(self): + """ + Test that newly created keys can be located based on their attributes. + """ + start_time = int(time.time()) + time.sleep(2) + + key_name = "Integration Test - Create-GetAttributes-Locate-Destroy Key" + result = self._create_symmetric_key(key_name=key_name) + uid_a = result.uuid + + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type) + self.assertIsInstance(uid_a, str) + + time.sleep(2) + mid_time = int(time.time()) + time.sleep(2) + + key_name = "Integration Test - Create-GetAttributes-Locate-Destroy Key" + result = self._create_symmetric_key(key_name=key_name) + uid_b = result.uuid + + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type) + self.assertIsInstance(uid_b, str) + + time.sleep(2) + end_time = int(time.time()) + + # Get the actual "Initial Date" values for each key + result = self.client.get_attributes( + uuid=uid_a, + attribute_names=["Initial Date"] + ) + + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.attributes)) + self.assertEqual( + "Initial Date", + result.attributes[0].attribute_name.value + ) + initial_date_a = result.attributes[0].attribute_value.value + + result = self.client.get_attributes( + uuid=uid_b, + attribute_names=["Initial Date"] + ) + + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.attributes)) + self.assertEqual( + "Initial Date", + result.attributes[0].attribute_name.value + ) + initial_date_b = result.attributes[0].attribute_value.value + + # Test locating each key by its exact "Initial Date" value + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + initial_date_a + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.uuids)) + self.assertEqual(uid_a, result.uuids[0]) + + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + initial_date_b + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.uuids)) + self.assertEqual(uid_b, result.uuids[0]) + + # Test locating each key by a range around its "Initial Date" value + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + start_time + ), + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + mid_time + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.uuids)) + self.assertEqual(uid_a, result.uuids[0]) + + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + mid_time + ), + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + end_time + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(1, len(result.uuids)) + self.assertEqual(uid_b, result.uuids[0]) + + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + start_time + ), + self.attr_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + end_time + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(2, len(result.uuids)) + self.assertIn(uid_a, result.uuids) + self.assertIn(uid_b, result.uuids) + + # Test locating each key based off of its name. + result = self.client.locate( + attributes=[ + self.attr_factory.create_attribute( + enums.AttributeType.NAME, + key_name + ) + ] + ) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + self.assertEqual(2, len(result.uuids)) + self.assertIn(uid_a, result.uuids) + self.assertIn(uid_b, result.uuids) + + # Clean up keys + result = self.client.destroy(uid_a) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + result = self.client.get(uuid=result.uuid.value, credential=None) + self.assertEqual( + ResultStatus.OPERATION_FAILED, + result.result_status.value + ) + + result = self.client.destroy(uid_b) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.value) + result = self.client.get(uuid=result.uuid.value, credential=None) + self.assertEqual( + ResultStatus.OPERATION_FAILED, + result.result_status.value + ) diff --git a/kmip/tests/integration/services/test_proxykmipclient.py b/kmip/tests/integration/services/test_proxykmipclient.py index 652af68..59cb819 100644 --- a/kmip/tests/integration/services/test_proxykmipclient.py +++ b/kmip/tests/integration/services/test_proxykmipclient.py @@ -15,9 +15,11 @@ import six import testtools +import time import pytest from kmip.core import enums +from kmip.core.factories import attributes as attribute_factory from kmip.pie import exceptions from kmip.pie import factory @@ -30,10 +32,15 @@ class TestProxyKmipClientIntegration(testtools.TestCase): def setUp(self): super(TestProxyKmipClientIntegration, self).setUp() self.object_factory = factory.ObjectFactory() + self.attribute_factory = attribute_factory.AttributeFactory() def tearDown(self): super(TestProxyKmipClientIntegration, self).tearDown() + uuids = self.client.locate() + for uuid in uuids: + self.client.destroy(uid=uuid) + def test_symmetric_key_create_get_destroy(self): """ Test that the ProxyKmipClient can create, retrieve, and destroy a @@ -860,3 +867,123 @@ class TestProxyKmipClientIntegration(testtools.TestCase): ) self.client.destroy(public_key_id) self.client.destroy(private_key_id) + + def test_create_getattributes_locate_destroy(self): + """ + Test that the ProxyKmipClient can create symmetric keys and then + locate those keys using their attributes. + """ + start_time = int(time.time()) + time.sleep(2) + + # Create some symmetric keys + a_id = self.client.create(enums.CryptographicAlgorithm.AES, 256) + + time.sleep(2) + mid_time = int(time.time()) + time.sleep(2) + + b_id = self.client.create(enums.CryptographicAlgorithm.AES, 128) + + time.sleep(2) + end_time = int(time.time()) + + self.assertIsInstance(a_id, str) + self.assertIsInstance(b_id, str) + + # Get the "Initial Date" attributes for each key + result_id, result_attributes = self.client.get_attributes( + uid=a_id, + attribute_names=["Initial Date"] + ) + self.assertEqual(1, len(result_attributes)) + self.assertEqual( + "Initial Date", + result_attributes[0].attribute_name.value + ) + initial_date_a = result_attributes[0].attribute_value.value + + result_id, result_attributes = self.client.get_attributes( + uid=b_id, + attribute_names=["Initial Date"] + ) + self.assertEqual(1, len(result_attributes)) + self.assertEqual( + "Initial Date", + result_attributes[0].attribute_name.value + ) + initial_date_b = result_attributes[0].attribute_value.value + + # Test locating each key by its exact "Initial Date" value + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + initial_date_a + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertEqual(a_id, result[0]) + + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + initial_date_b + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertEqual(b_id, result[0]) + + # Test locating each key by a range around its "Initial Date" value + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + start_time + ), + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + mid_time + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertEqual(a_id, result[0]) + + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + mid_time + ), + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + end_time + ) + ] + ) + self.assertEqual(1, len(result)) + self.assertEqual(b_id, result[0]) + + result = self.client.locate( + attributes=[ + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + start_time + ), + self.attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + end_time + ) + ] + ) + self.assertEqual(2, len(result)) + self.assertIn(a_id, result) + self.assertIn(b_id, result) + + # Clean up the keys + self.client.destroy(a_id) + self.client.destroy(b_id) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 847888d..71c0e34 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -4263,6 +4263,155 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_is_valid_date(self): + """ + Test that object date checking yields the correct results. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + # If the date range isn't fully defined, the value is implicitly valid. + self.assertTrue( + e._is_valid_date( + enums.AttributeType.INITIAL_DATE, + 1563564520, + None, + None + ) + ) + self.assertTrue( + e._is_valid_date( + enums.AttributeType.INITIAL_DATE, + 1563564520, + None, + 1563564521 + ) + ) + + # Verify the value is valid for a fully defined, encompassing range. + self.assertTrue( + e._is_valid_date( + enums.AttributeType.INITIAL_DATE, + 1563564520, + 1563564519, + 1563564521 + ) + ) + + # Verify the value is valid for a specific date value. + self.assertTrue( + e._is_valid_date( + enums.AttributeType.INITIAL_DATE, + 1563564520, + 1563564520, + None + ) + ) + + # Verify the value is invalid for a specific date value. + self.assertFalse( + e._is_valid_date( + enums.AttributeType.INITIAL_DATE, + 1563564520, + 1563564519, + None + ) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "object's initial date (Fri Jul 19 19:28:40 2019) does not match " + "the specified initial date (Fri Jul 19 19:28:39 2019)." + ) + e._logger.reset_mock() + + # Verify the value is invalid below a specific date range. + self.assertFalse( + e._is_valid_date( + enums.AttributeType.INITIAL_DATE, + 1563564519, + 1563564520, + 1563564521 + ) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "object's initial date (Fri Jul 19 19:28:39 2019) is less than " + "the starting initial date (Fri Jul 19 19:28:40 2019)." + ) + e._logger.reset_mock() + + # Verify the value is invalid above a specific date range. + self.assertFalse( + e._is_valid_date( + enums.AttributeType.INITIAL_DATE, + 1563564521, + 1563564519, + 1563564520 + ) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "object's initial date (Fri Jul 19 19:28:41 2019) is greater than " + "the ending initial date (Fri Jul 19 19:28:40 2019)." + ) + + def test_track_date_attributes(self): + """ + Test date attribute value tracking with a simple dictionary. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + date_values = {} + + # Verify the first date given is considered the starting range value. + e._track_date_attributes( + enums.AttributeType.INITIAL_DATE, + date_values, + 1563564519 + ) + self.assertEqual(1563564519, date_values["start"]) + + # Verify the second date given is considered the ending range value. + e._track_date_attributes( + enums.AttributeType.INITIAL_DATE, + date_values, + 1563564521 + ) + self.assertEqual(1563564519, date_values["start"]) + self.assertEqual(1563564521, date_values["end"]) + + # Verify that the third date given triggers an exception. + args = (enums.AttributeType.INITIAL_DATE, date_values, 1563564520) + six.assertRaisesRegex( + self, + exceptions.InvalidField, + "Too many Initial Date attributes provided. " + "Include one for an exact match. " + "Include two for a ranged match.", + e._track_date_attributes, + *args + ) + + # Verify that a lower second date is interpreted as the new start date. + date_values = {} + date_values["start"] = 1563564521 + e._track_date_attributes( + enums.AttributeType.INITIAL_DATE, + date_values, + 1563564519 + ) + self.assertEqual(1563564519, date_values["start"]) + self.assertEqual(1563564521, date_values["end"]) + def test_locate(self): """ Test that a Locate request can be processed correctly. @@ -4285,13 +4434,8 @@ class TestKmipEngine(testtools.TestCase): e._data_session.commit() e._data_session = e._data_store_session_factory() - e._logger.info.assert_any_call( - "Processing operation: Locate" - ) - self.assertEqual( - len(response_payload.unique_identifiers), - 0 - ) + e._logger.info.assert_any_call("Processing operation: Locate") + self.assertEqual(len(response_payload.unique_identifiers), 0) # Add the first obj and test the locate e._data_session.add(obj_a) @@ -4306,18 +4450,10 @@ class TestKmipEngine(testtools.TestCase): e._data_session.commit() e._data_session = e._data_store_session_factory() - e._logger.info.assert_any_call( - "Processing operation: Locate" - ) + e._logger.info.assert_any_call("Processing operation: Locate") - self.assertEqual( - len(response_payload.unique_identifiers), - 1 - ) - self.assertEqual( - id_a, - response_payload.unique_identifiers[0] - ) + self.assertEqual(len(response_payload.unique_identifiers), 1) + self.assertEqual(id_a, response_payload.unique_identifiers[0]) # Add the second obj and test the locate e._data_session.add(obj_b) @@ -4332,22 +4468,11 @@ class TestKmipEngine(testtools.TestCase): e._data_session.commit() e._data_session = e._data_store_session_factory() - e._logger.info.assert_any_call( - "Processing operation: Locate" - ) + e._logger.info.assert_any_call("Processing operation: Locate") - self.assertEqual( - len(response_payload.unique_identifiers), - 2 - ) - self.assertIn( - id_a, - response_payload.unique_identifiers - ) - self.assertIn( - id_b, - response_payload.unique_identifiers - ) + self.assertEqual(len(response_payload.unique_identifiers), 2) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) def test_locate_with_name(self): """ @@ -4360,14 +4485,27 @@ class TestKmipEngine(testtools.TestCase): e._is_allowed_by_operation_policy = mock.Mock(return_value=True) e._logger = mock.MagicMock() - key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00\x00') + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) obj_a = pie_objects.SymmetricKey( - enums.CryptographicAlgorithm.AES, 128, key, name='name0') + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name0' + ) obj_b = pie_objects.SymmetricKey( - enums.CryptographicAlgorithm.DES, 128, key, name='name0') + enums.CryptographicAlgorithm.DES, + 128, + key, + name='name0' + ) obj_c = pie_objects.SymmetricKey( - enums.CryptographicAlgorithm.AES, 128, key, name='name1') + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) e._data_session.add(obj_a) e._data_session.add(obj_b) @@ -4383,13 +4521,13 @@ class TestKmipEngine(testtools.TestCase): # Locate the obj with name 'name0' attrs = [ - attribute_factory.create_attribute( - enums.AttributeType.NAME, - attributes.Name.create( - 'name0', - enums.NameType.UNINTERPRETED_TEXT_STRING - ) - ), + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'name0', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) ] payload = payloads.LocateRequestPayload(attributes=attrs) @@ -4398,32 +4536,21 @@ class TestKmipEngine(testtools.TestCase): e._data_session.commit() e._data_session = e._data_store_session_factory() - e._logger.info.assert_any_call( - "Processing operation: Locate" - ) + e._logger.info.assert_any_call("Processing operation: Locate") - self.assertEqual( - len(response_payload.unique_identifiers), - 2 - ) - self.assertIn( - id_a, - response_payload.unique_identifiers - ) - self.assertIn( - id_b, - response_payload.unique_identifiers - ) + self.assertEqual(len(response_payload.unique_identifiers), 2) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) # Locate the obj with name 'name1' attrs = [ - attribute_factory.create_attribute( - enums.AttributeType.NAME, - attributes.Name.create( - 'name1', - enums.NameType.UNINTERPRETED_TEXT_STRING - ) - ), + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'name1', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) ] payload = payloads.LocateRequestPayload(attributes=attrs) @@ -4432,19 +4559,129 @@ class TestKmipEngine(testtools.TestCase): e._data_session.commit() e._data_session = e._data_store_session_factory() - e._logger.info.assert_any_call( - "Processing operation: Locate" + e._logger.info.assert_any_call("Processing operation: Locate") + + self.assertEqual(len(response_payload.unique_identifiers), 1) + self.assertIn(id_c, response_payload.unique_identifiers) + + def test_locate_with_initial_date(self): + """ + Test the Locate operation when 'Initial Date' attributes are given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' ) - self.assertEqual( - len(response_payload.unique_identifiers), - 1 + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' ) - self.assertIn( - id_c, - response_payload.unique_identifiers + obj_a.initial_date = int(time.time()) + obj_a_time_str = time.strftime( + "%a %b %d %H:%M:%S %Y", + time.gmtime(obj_a.initial_date) ) + time.sleep(2) + mid_time = int(time.time()) + mid_time_str = time.strftime( + "%a %b %d %H:%M:%S %Y", + time.gmtime(mid_time) + ) + time.sleep(2) + + obj_b = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.DES, + 128, + key, + name='name2' + ) + obj_b.initial_date = int(time.time()) + obj_b_time_str = time.strftime( + "%a %b %d %H:%M:%S %Y", + time.gmtime(obj_b.initial_date) + ) + + time.sleep(2) + end_time = int(time.time()) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the object with a specific timestamp + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + obj_a.initial_date + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Failed match: object's initial date ({}) does not match " + "the specified initial date ({}).".format( + obj_b_time_str, + obj_a_time_str + ) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + self.assertEqual(len(response_payload.unique_identifiers), 1) + self.assertIn(id_a, response_payload.unique_identifiers) + + # Locate an object with a timestamp range + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + mid_time + ), + attribute_factory.create_attribute( + enums.AttributeType.INITIAL_DATE, + end_time + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Failed match: object's initial date ({}) is less than " + "the starting initial date ({}).".format( + obj_a_time_str, + mid_time_str + ) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + self.assertEqual(len(response_payload.unique_identifiers), 1) + self.assertIn(id_b, response_payload.unique_identifiers) + def test_get(self): """ Test that a Get request can be processed correctly.