diff --git a/kmip/pie/objects.py b/kmip/pie/objects.py index 6623ffe..7567b82 100644 --- a/kmip/pie/objects.py +++ b/kmip/pie/objects.py @@ -14,7 +14,7 @@ # under the License. from abc import abstractmethod -from sqlalchemy import Column, event, ForeignKey, Integer, VARCHAR +from sqlalchemy import Column, event, ForeignKey, Integer, VARBINARY from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship @@ -46,7 +46,7 @@ class ManagedObject(sql.Base): __tablename__ = 'managed_objects' unique_identifier = Column('uid', Integer, primary_key=True) _object_type = Column('object_type', sql.EnumType(enums.ObjectType)) - value = Column('value', VARCHAR(1024)) + value = Column('value', VARBINARY(1024)) name_index = Column(Integer, default=0) _names = relationship('ManagedObjectName', back_populates='mo', cascade='all, delete-orphan') @@ -142,6 +142,16 @@ class CryptographicObject(ManagedObject): describing how the CryptographicObject will be used. """ + __tablename__ = 'crypto_objects' + unique_identifier = Column('uid', Integer, + ForeignKey('managed_objects.uid'), + primary_key=True) + cryptographic_usage_masks = Column('cryptographic_usage_mask', + sql.UsageMaskType) + __mapper_args__ = { + 'polymorphic_identity': 0x80000001 + } + @abstractmethod def __init__(self): """ @@ -188,6 +198,20 @@ class Key(CryptographicObject): the key value. """ + __tablename__ = 'keys' + unique_identifier = Column('uid', Integer, + ForeignKey('crypto_objects.uid'), + primary_key=True) + cryptographic_algorithm = Column( + 'cryptographic_algorithm', sql.EnumType(enums.CryptographicAlgorithm)) + cryptographic_length = Column('cryptographic_length', Integer) + key_format_type = Column( + 'key_format_type', sql.EnumType(enums.KeyFormatType)) + + __mapper_args__ = { + 'polymorphic_identity': 0x80000002 + } + @abstractmethod def __init__(self): """ @@ -226,6 +250,15 @@ class SymmetricKey(Key): names: The string names of the SymmetricKey. """ + __tablename__ = 'symmetric_keys' + unique_identifier = Column('uid', Integer, + ForeignKey('keys.uid'), + primary_key=True) + + __mapper_args__ = { + 'polymorphic_identity': enums.ObjectType.SYMMETRIC_KEY + } + def __init__(self, algorithm, length, value, masks=None, name='Symmetric Key'): """ @@ -252,9 +285,7 @@ class SymmetricKey(Key): self.names = [name] if masks: - self.cryptographic_usage_masks = masks - else: - self.cryptographic_usage_masks = list() + self.cryptographic_usage_masks.extend(masks) # All remaining attributes are not considered part of the public API # and are subject to change. @@ -282,8 +313,6 @@ class SymmetricKey(Key): "enumeration") elif not isinstance(self.cryptographic_length, six.integer_types): raise TypeError("key length must be an integer") - elif not isinstance(self.cryptographic_usage_masks, list): - raise TypeError("key usage masks must be a list") mask_count = len(self.cryptographic_usage_masks) for i in range(mask_count): @@ -337,6 +366,10 @@ class SymmetricKey(Key): return NotImplemented +event.listen(SymmetricKey._names, 'append', + sql.attribute_append_factory("name_index"), retval=False) + + class PublicKey(Key): """ The PublicKey class of the simplified KMIP object hierarchy. @@ -389,8 +422,6 @@ class PublicKey(Key): if masks: self.cryptographic_usage_masks = masks - else: - self.cryptographic_usage_masks = list() # All remaining attributes are not considered part of the public API # and are subject to change. @@ -422,8 +453,6 @@ class PublicKey(Key): elif self.key_format_type not in self._valid_formats: raise ValueError("key format type must be one of {0}".format( self._valid_formats)) - elif not isinstance(self.cryptographic_usage_masks, list): - raise TypeError("key usage masks must be a list") # TODO (peter-hamilton) Verify that the key bytes match the key format @@ -529,8 +558,6 @@ class PrivateKey(Key): if masks: self.cryptographic_usage_masks = masks - else: - self.cryptographic_usage_masks = list() # All remaining attributes are not considered part of the public API # and are subject to change. @@ -562,8 +589,6 @@ class PrivateKey(Key): elif self.key_format_type not in self._valid_formats: raise ValueError("key format type must be one of {0}".format( self._valid_formats)) - elif not isinstance(self.cryptographic_usage_masks, list): - raise TypeError("key usage masks must be a list") # TODO (peter-hamilton) Verify that the key bytes match the key format @@ -658,8 +683,6 @@ class Certificate(CryptographicObject): if masks: self.cryptographic_usage_masks = masks - else: - self.cryptographic_usage_masks = list() # All remaining attributes are not considered part of the public API # and are subject to change. @@ -687,8 +710,6 @@ class Certificate(CryptographicObject): enums.CertificateTypeEnum): raise TypeError("certificate type must be a CertificateTypeEnum " "enumeration") - elif not isinstance(self.cryptographic_usage_masks, list): - raise TypeError("certificate usage masks must be a list") mask_count = len(self.cryptographic_usage_masks) for i in range(mask_count): @@ -808,8 +829,6 @@ class SecretData(CryptographicObject): if masks: self.cryptographic_usage_masks = masks - else: - self.cryptographic_usage_masks = list() # All remaining attributes are not considered part of the public API # and are subject to change. @@ -831,8 +850,6 @@ class SecretData(CryptographicObject): elif not isinstance(self.data_type, enums.SecretDataType): raise TypeError("secret data type must be a SecretDataType " "enumeration") - elif not isinstance(self.cryptographic_usage_masks, list): - raise TypeError("secret data usage masks must be a list") mask_count = len(self.cryptographic_usage_masks) for i in range(mask_count): diff --git a/kmip/pie/sqltypes.py b/kmip/pie/sqltypes.py index 66529ce..e5d1413 100644 --- a/kmip/pie/sqltypes.py +++ b/kmip/pie/sqltypes.py @@ -33,6 +33,49 @@ def attribute_append_factory(index_attribute): return attribute_append +class UsageMaskType(types.TypeDecorator): + """ + Converts a list of enums.CryptographicUsageMask Enums in an integer + bitmask. This allows the database to only store an integer instead of a + list of enumbs. This also does the reverse of converting an integer bit + mask into a list enums.CryptographicUsageMask Enums. + """ + + impl = types.Integer + + def process_bind_param(self, value, dialect): + """ + Returns the integer value of the usage mask bitmask. This value is + stored in the database. + + Args: + value(list): list of enums in the + usage mask + dialect(string): SQL dialect + """ + bitmask = 0x00 + for e in value: + bitmask = bitmask | e.value + return bitmask + + def process_result_value(self, value, dialect): + """ + Returns a new list of enums.CryptographicUsageMask Enums. This converts + the integer value into the list of enums. + + Args: + value(int): The integer value stored in the database that is used + to create the list of enums.CryptographicUsageMask Enums. + dialect(string): SQL dialect + """ + masks = list() + if value: + for e in enums.CryptographicUsageMask: + if e.value & value: + masks.append(e) + return masks + + class EnumType(types.TypeDecorator): """ Converts a Python enum to an integer before storing it in the database. diff --git a/kmip/tests/unit/pie/objects/test_symmetric_key.py b/kmip/tests/unit/pie/objects/test_symmetric_key.py index 3b24e9d..441996c 100644 --- a/kmip/tests/unit/pie/objects/test_symmetric_key.py +++ b/kmip/tests/unit/pie/objects/test_symmetric_key.py @@ -17,7 +17,10 @@ import binascii import testtools from kmip.core import enums -from kmip.pie import objects +from kmip.pie import sqltypes +from kmip.pie.objects import ManagedObject, SymmetricKey +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker class TestSymmetricKey(testtools.TestCase): @@ -44,6 +47,8 @@ class TestSymmetricKey(testtools.TestCase): b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F' b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E' b'\x1F') + self.engine = create_engine('sqlite:///:memory:', echo=True) + sqltypes.Base.metadata.create_all(self.engine) def tearDown(self): super(TestSymmetricKey, self).tearDown() @@ -52,7 +57,7 @@ class TestSymmetricKey(testtools.TestCase): """ Test that a SymmetricKey object can be instantiated. """ - key = objects.SymmetricKey( + key = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) self.assertEqual(key.cryptographic_algorithm, @@ -66,7 +71,7 @@ class TestSymmetricKey(testtools.TestCase): """ Test that a SymmetricKey object can be instantiated with all arguments. """ - key = objects.SymmetricKey( + key = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, @@ -88,7 +93,7 @@ class TestSymmetricKey(testtools.TestCase): Test that the object type can be retrieved from the SymmetricKey. """ expected = enums.ObjectType.SYMMETRIC_KEY - key = objects.SymmetricKey( + key = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) observed = key.object_type @@ -101,7 +106,7 @@ class TestSymmetricKey(testtools.TestCase): """ args = ('invalid', 128, self.bytes_128a) - self.assertRaises(TypeError, objects.SymmetricKey, *args) + self.assertRaises(TypeError, SymmetricKey, *args) def test_validate_on_invalid_length(self): """ @@ -110,7 +115,7 @@ class TestSymmetricKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.AES, 'invalid', self.bytes_128a) - self.assertRaises(TypeError, objects.SymmetricKey, *args) + self.assertRaises(TypeError, SymmetricKey, *args) def test_validate_on_invalid_value(self): """ @@ -119,7 +124,7 @@ class TestSymmetricKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.AES, 128, 0) - self.assertRaises(TypeError, objects.SymmetricKey, *args) + self.assertRaises(TypeError, SymmetricKey, *args) def test_validate_on_invalid_masks(self): """ @@ -129,7 +134,7 @@ class TestSymmetricKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) kwargs = {'masks': 'invalid'} - self.assertRaises(TypeError, objects.SymmetricKey, *args, **kwargs) + self.assertRaises(TypeError, SymmetricKey, *args, **kwargs) def test_validate_on_invalid_mask(self): """ @@ -139,7 +144,7 @@ class TestSymmetricKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) kwargs = {'masks': ['invalid']} - self.assertRaises(TypeError, objects.SymmetricKey, *args, **kwargs) + self.assertRaises(TypeError, SymmetricKey, *args, **kwargs) def test_validate_on_invalid_name(self): """ @@ -149,7 +154,7 @@ class TestSymmetricKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) kwargs = {'name': 0} - self.assertRaises(TypeError, objects.SymmetricKey, *args, **kwargs) + self.assertRaises(TypeError, SymmetricKey, *args, **kwargs) def test_validate_on_invalid_length_value(self): """ @@ -158,7 +163,7 @@ class TestSymmetricKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.AES, 256, self.bytes_128a) - self.assertRaises(ValueError, objects.SymmetricKey, *args) + self.assertRaises(ValueError, SymmetricKey, *args) def test_validate_on_invalid_value_length(self): """ @@ -167,13 +172,13 @@ class TestSymmetricKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_256a) - self.assertRaises(ValueError, objects.SymmetricKey, *args) + self.assertRaises(ValueError, SymmetricKey, *args) def test_repr(self): """ Test that repr can be applied to a SymmetricKey. """ - key = objects.SymmetricKey( + key = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) args = "algorithm={0}, length={1}, value={2}".format( @@ -188,7 +193,7 @@ class TestSymmetricKey(testtools.TestCase): """ Test that str can be applied to a SymmetricKey. """ - key = objects.SymmetricKey( + key = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) expected = str(binascii.hexlify(self.bytes_128a)) observed = str(key) @@ -200,9 +205,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the equality operator returns True when comparing two SymmetricKey objects with the same data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) self.assertTrue(a == b) @@ -213,9 +218,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the equality operator returns False when comparing two SymmetricKey objects with different data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.RSA, 128, self.bytes_128a) self.assertFalse(a == b) @@ -226,9 +231,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the equality operator returns False when comparing two SymmetricKey objects with different data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.AES, 256, self.bytes_256a) b.value = self.bytes_128a @@ -240,9 +245,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the equality operator returns False when comparing two SymmetricKey objects with different data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128b) self.assertFalse(a == b) @@ -253,7 +258,7 @@ class TestSymmetricKey(testtools.TestCase): Test that the equality operator returns False when comparing a SymmetricKey object to a non-SymmetricKey object. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) b = "invalid" @@ -265,9 +270,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the inequality operator returns False when comparing two SymmetricKey objects with the same internal data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) self.assertFalse(a != b) @@ -278,9 +283,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the inequality operator returns True when comparing two SymmetricKey objects with different data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.RSA, 128, self.bytes_128a) self.assertTrue(a != b) @@ -291,9 +296,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the inequality operator returns True when comparing two SymmetricKey objects with different data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.AES, 256, self.bytes_256a) self.assertTrue(a != b) @@ -304,9 +309,9 @@ class TestSymmetricKey(testtools.TestCase): Test that the inequality operator returns True when comparing two SymmetricKey objects with different data. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) - b = objects.SymmetricKey( + b = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128b) self.assertTrue(a != b) @@ -317,9 +322,295 @@ class TestSymmetricKey(testtools.TestCase): Test that the equality operator returns True when comparing a SymmetricKey object to a non-SymmetricKey object. """ - a = objects.SymmetricKey( + a = SymmetricKey( enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) b = "invalid" self.assertTrue(a != b) self.assertTrue(b != a) + + def test_save(self): + """ + Test that the object can be saved using SQLAlchemy. This will add it to + the database, verify that no exceptions are thrown, and check that its + unique identifier was set. + """ + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + self.assertIsNotNone(key.unique_identifier) + + def test_get(self): + """ + Test that the object can be saved and then retrieved using SQLAlchemy. + This adds is to the database and then retrieves it by ID and verifies + some of the attributes. + """ + test_name = 'bowser' + masks = [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, + masks=masks, + name=test_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEqual(1, len(get_obj.names)) + self.assertEqual([test_name], get_obj.names) + self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, get_obj.object_type) + self.assertEqual(self.bytes_128a, get_obj.value) + self.assertEqual(enums.CryptographicAlgorithm.AES, + get_obj.cryptographic_algorithm) + self.assertEqual(128, get_obj.cryptographic_length) + self.assertEqual(enums.KeyFormatType.RAW, get_obj.key_format_type) + self.assertEqual(masks, get_obj.cryptographic_usage_masks) + + def test_add_multiple_names(self): + """ + Test that multiple names can be added to a managed object. This + verifies a few properties. First this verifies that names can be added + using simple strings. It also verifies that the index for each + subsequent string is set accordingly. Finally this tests that the names + can be saved and retrieved from the database. + """ + expected_names = ['bowser', 'frumpy', 'big fat cat'] + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, + name=expected_names[0]) + key.names.append(expected_names[1]) + key.names.append(expected_names[2]) + self.assertEquals(3, key.name_index) + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_name(self): + """ + Tests that a name can be removed from the list of names. This will + verify that the list of names is correct. It will verify that updating + this object removes the name from the database. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, + name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + key.names.pop(remove_index) + self.assertEquals(3, key.name_index) + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_names, key.names) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_and_add_name(self): + """ + Tests that names can be removed from the list of names and more added. + This will verify that the list of names is correct. It will verify that + updating this object removes the name from the database. It will verify + that the indices for the removed names are not reused. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, + name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + key.names.pop() + key.names.pop() + key.names.append('dog') + self.assertEquals(4, key.name_index) + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + self.assertEquals(expected_names, key.names) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_add_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will add a + name to it in one session, and then retrieve it in another session to + verify that it has all of the correct names. + + This test and the subsequent test_udpate_* methods are different than + the name tests above because these are updating objects already stored + in the database. This tests will simulate what happens when the KMIP + client calls an add attribute method. + """ + first_name = 'bowser' + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, + name=first_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + added_name = 'frumpy' + expected_names = [first_name, added_name] + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_key = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.append(added_name) + session.commit() + + session = Session() + get_obj = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will + remove a name from it in one session, and then retrieve it in another + session to verify that it has all of the correct names. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, + name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_key = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.pop(remove_index) + session.commit() + + session = Session() + get_obj = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_and_add_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will + remove a name and add another one to it in one session, and then + retrieve it in another session to verify that it has all of the correct + names. This simulates multiple operation being sent for the same + object. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + key = SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, self.bytes_128a, + name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + update_key = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.pop() + update_key.names.pop() + update_key.names.append('dog') + session.commit() + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + + session = Session() + get_obj = session.query(SymmetricKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names)