2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-10 13:23:15 +00:00

Merge pull request #271 from vbnmmnbv/revoke_server

Add initial Revoke operation support for server.
This commit is contained in:
Peter Hamilton
2017-04-21 10:31:09 -04:00
committed by GitHub
2 changed files with 329 additions and 0 deletions

View File

@@ -31,6 +31,7 @@ from kmip.core import enums
from kmip.core import exceptions
from kmip.core import misc
from kmip.core import objects
from kmip.core import primitives
from kmip.core import secrets
from kmip.core.factories import attributes as factory
@@ -39,6 +40,7 @@ from kmip.core.messages import contents
from kmip.core.messages import messages
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
@@ -4361,6 +4363,275 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_revoke(self):
"""
Test that an Revoke request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
managed_object.state = enums.State.ACTIVE
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
object_id = str(managed_object.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
reason_compromise = objects.RevocationReason(
code=enums.RevocationReasonCode.KEY_COMPROMISE)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
# Test that reason UNSPECIFIED will put object into state
# DEACTIVATED
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_unspecified,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.DEACTIVATED, symmetric_key.state)
# Test that reason KEY_COMPROMISE will put object not in DESTROYED
# state into state COMPROMISED
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_compromise,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.COMPROMISED, symmetric_key.state)
# Test that reason KEY_COMPROMISE will put object in DESTROYED
# state into state DESTROYED_COMPROMISED
symmetric_key.state = enums.State.DESTROYED
e._data_session.commit()
e._data_session = e._data_store_session_factory()
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_compromise,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.DESTROYED_COMPROMISED,
symmetric_key.state)
# Test that the ID placeholder can also be used to specify revocation.
symmetric_key.state = enums.State.ACTIVE
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._id_placeholder = str(object_id)
payload = revoke.RevokeRequestPayload(
revocation_reason=reason_unspecified,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.DEACTIVATED, symmetric_key.state)
def test_revoke_on_not_active_object(self):
"""
Test that the right error is generated when an revocation request is
received for an object that is not active with the reason other than
KEY_COMPROMISE.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
self.assertEqual(enums.State.PRE_ACTIVE, managed_object.state)
object_id = str(managed_object.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_unspecified,
compromise_date=date)
args = (payload, )
regex = "The object is not active and cannot be revoked with " \
"reason other than KEY_COMPROMISE"
self.assertRaisesRegexp(
exceptions.IllegalOperation,
regex,
e._process_revoke,
*args
)
def test_revoke_on_static_object(self):
"""
Test that the right error is generated when an revoke request is
received for an object that cannot be revoked.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.OpaqueObject(
b'',
enums.OpaqueDataType.NONE
)
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
object_id = str(managed_object.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_unspecified,
compromise_date=date)
args = (payload,)
name = enums.ObjectType.OPAQUE_DATA.name
regex = "An {0} object has no state and cannot be revoked.".format(
''.join(
[x.capitalize() for x in name.split('_')]
)
)
self.assertRaisesRegexp(
exceptions.IllegalOperation,
regex,
e._process_revoke,
*args
)
def test_revoke_not_allowed_by_policy(self):
"""
Test that an unallowed request is handled correctly by Revoke.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._client_identity = 'test'
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_a._owner = 'admin'
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a),
revocation_reason=reason_unspecified,
compromise_date=date)
args = [payload]
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate object: {0}".format(id_a),
e._process_revoke,
*args
)
def test_destroy(self):
"""
Test that a Destroy request can be processed correctly.