2
0
mirror of https://github.com/openkmip/pykmip synced 2026-01-04 01:23:25 +00:00

Update the server session to use the auth plugin framework

This change updates how the server session handles message
processing, adding support for the new authentication plugin
framework. Session unit tests have been updated to account for
this change.
This commit is contained in:
Peter Hamilton
2018-03-25 00:54:43 -04:00
parent f06014e7c6
commit 1a093f141e
5 changed files with 640 additions and 247 deletions

View File

@@ -16,7 +16,6 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
@@ -28,6 +27,7 @@ import time
from kmip.core import enums
from kmip.core import exceptions
from kmip.core import objects
from kmip.core import utils
from kmip.core.messages import contents
@@ -122,20 +122,20 @@ class TestKmipSession(testtools.TestCase):
"""
Test that a KmipSession can be created without errors.
"""
session.KmipSession(None, None, 'name')
session.KmipSession(None, None, None, 'name')
def test_init_without_name(self):
"""
Test that a KmipSession without 'name' can be created without errors.
"""
session.KmipSession(None, None, None)
session.KmipSession(None, None, None, None)
def test_run(self):
"""
Test that the message handling loop is handled properly on normal
execution.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._handle_message_loop = mock.MagicMock(
side_effect=[
@@ -160,7 +160,7 @@ class TestKmipSession(testtools.TestCase):
Test that the correct logging and error handling occurs when the
thread encounters an error with the message handling loop.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
@@ -186,147 +186,9 @@ class TestKmipSession(testtools.TestCase):
kmip_session._connection.close.assert_called_once_with()
kmip_session._logger.info.assert_called_with("Stopping session: name")
def test_get_client_identity(self):
"""
Test that a client identity is obtained from a valid client
certificate.
"""
client_certificate = build_certificate([u'Test Identity'])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity"
)
kmip_session._logger.warning.assert_not_called()
def test_get_client_identity_with_no_certificate(self):
"""
Test that the right error is generated when no certificate is
available to provide the client identity.
"""
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = None
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
is missing its extended key usage extension.
"""
client_certificate = build_certificate([u'Test Identity'], False)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_no_common_name(self):
"""
Test that the right error is generated when the client certificate
does not define a subject common name.
"""
client_certificate = build_certificate([])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The client certificate does not define a subject common "
"name. Session client identity unavailable.",
kmip_session._get_client_identity
)
def test_get_client_identity_with_multiple_common_names(self):
"""
Test that the right client identity is returned when the client
certificate has multiple subject common names.
"""
client_certificate = build_certificate([
u'Test Identity 1',
u'Test Identity 2'
])
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
identity = kmip_session._get_client_identity()
self.assertEqual(u'Test Identity 1', identity)
kmip_session._logger.info.assert_called_once_with(
"Session client identity: Test Identity 1"
)
kmip_session._logger.warning.assert_called_once_with(
"Multiple client identities found. Using the first one processed."
)
def test_get_client_identity_with_incorrect_extended_key_usage(self):
"""
Test that the right error is generated when the client certificate
does not have client authentication set in its extended key usage
extension.
"""
client_certificate = build_certificate(
[u'Test Identity'],
bad_extension=True
)
der_encoding = client_certificate.public_bytes(
serialization.Encoding.DER
)
kmip_session = session.KmipSession(None, None, 'name')
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.getpeercert.return_value = der_encoding
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable.",
kmip_session._get_client_identity
)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop(self, request_mock):
def test_handle_message_loop(self, request_mock, cert_mock):
"""
Test that the correct logging and error handling occurs during the
message handling loop.
@@ -354,12 +216,22 @@ class TestKmipSession(testtools.TestCase):
batch_items=batch_items
)
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session._engine = mock.MagicMock()
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._engine.process_request = mock.MagicMock(
return_value=(message, kmip_session._max_response_size)
)
@@ -376,11 +248,16 @@ class TestKmipSession(testtools.TestCase):
)
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session.authenticate = mock.MagicMock(
return_value=("John Doe", ["Group A"])
)
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.info.assert_not_called()
kmip_session._logger.info.assert_any_call(
"Session client identity: John Doe"
)
kmip_session._logger.debug.assert_any_call(
"Possible session ciphers: 2"
)
@@ -399,19 +276,30 @@ class TestKmipSession(testtools.TestCase):
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage.read',
mock.MagicMock(side_effect=Exception()))
def test_handle_message_loop_with_parse_failure(self):
def test_handle_message_loop_with_parse_failure(self, cert_mock):
"""
Test that the correct logging and error handling occurs during the
message handling loop.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
@@ -427,18 +315,31 @@ class TestKmipSession(testtools.TestCase):
kmip_session._logger.error.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_with_response_too_long(self, request_mock):
def test_handle_message_loop_with_response_too_long(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs during the
message handling loop.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
@@ -448,24 +349,36 @@ class TestKmipSession(testtools.TestCase):
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
# kmip_session._logger.info.assert_not_called()
self.assertTrue(kmip_session._logger.warning.called)
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_with_unexpected_error(self, request_mock):
def test_handle_message_loop_with_unexpected_error(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs when an
unexpected error is generated while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(kmip_engine, None, 'name')
kmip_session._get_client_identity = mock.MagicMock()
kmip_session._get_client_identity.return_value = 'test'
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.return_value = (
'test',
['group A', 'group B']
)
kmip_session._engine = mock.MagicMock()
test_exception = Exception("Unexpected error.")
kmip_session._engine.process_request = mock.MagicMock(
@@ -479,13 +392,406 @@ class TestKmipSession(testtools.TestCase):
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
# kmip_session._logger.info.assert_not_called()
kmip_session._logger.warning.assert_called_once_with(
"An unexpected error occurred while processing request."
)
kmip_session._logger.exception.assert_called_once_with(test_exception)
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_with_authentication_failure(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs when an
authentication error is generated while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=False
)
kmip_session.authenticate = mock.MagicMock()
kmip_session.authenticate.side_effect = exceptions.PermissionDenied(
"Authentication failed."
)
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
fake_version = contents.ProtocolVersion(1, 2)
fake_credential = objects.Credential(
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
credential_value=objects.UsernamePasswordCredential(
username="John Doe",
password="secret"
)
)
fake_header = messages.RequestHeader(
protocol_version=fake_version,
authentication=contents.Authentication(
credentials=[fake_credential]
)
)
fake_request = messages.RequestMessage()
fake_request.request_header = fake_header
fake_request.read = mock.MagicMock()
request_mock.return_value = fake_request
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
fake_request.read.assert_called_once_with(data)
kmip_session.authenticate.assert_called_once_with(
"test_certificate",
fake_request
)
kmip_session._logger.warning.assert_called_once_with(
"Authentication failed."
)
kmip_session._engine.build_error_response.assert_called_once_with(
fake_version,
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"An error occurred during client authentication. "
"See server logs for more information."
)
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_no_certificate(self,
request_mock,
cert_mock):
"""
Test that the correct logging and error handling occurs when no
certificate is encountered while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = None
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=True
)
kmip_session.authenticate = mock.MagicMock()
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.warning(
"Failure verifying the client certificate."
)
kmip_session._logger.exception.assert_called_once_with(
exceptions.PermissionDenied(
"The client certificate could not be loaded from the session "
"connection."
)
)
kmip_session._engine.build_error_response.assert_called_once_with(
contents.ProtocolVersion(1, 0),
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"Error verifying the client certificate. "
"See server logs for more information."
)
self.assertTrue(kmip_session._send_response.called)
@mock.patch(
'kmip.services.server.auth.get_extended_key_usage_from_certificate'
)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_no_certificate_extension(self,
request_mock,
cert_mock,
ext_mock):
"""
Test that the correct logging and error handling occurs when an
invalid certificate is encountered while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
ext_mock.return_value = None
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=True
)
kmip_session.authenticate = mock.MagicMock()
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.warning(
"Failure verifying the client certificate."
)
kmip_session._logger.exception.assert_called_once_with(
exceptions.PermissionDenied(
"The extended key usage extension is missing from the client "
"certificate."
)
)
kmip_session._engine.build_error_response.assert_called_once_with(
contents.ProtocolVersion(1, 0),
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"Error verifying the client certificate. "
"See server logs for more information."
)
self.assertTrue(kmip_session._send_response.called)
@mock.patch(
'kmip.services.server.auth.get_extended_key_usage_from_certificate'
)
@mock.patch('kmip.services.server.auth.get_certificate_from_connection')
@mock.patch('kmip.core.messages.messages.RequestMessage')
def test_handle_message_loop_invalid_certificate_extension(self,
request_mock,
cert_mock,
ext_mock):
"""
Test that the correct logging and error handling occurs when an
invalid certificate is encountered while processing a request.
"""
data = utils.BytearrayStream(())
cert_mock.return_value = 'test_certificate'
ext_mock.return_value = []
kmip_engine = engine.KmipEngine()
kmip_engine._logger = mock.MagicMock()
kmip_session = session.KmipSession(
kmip_engine,
None,
None,
name='name',
enable_tls_client_auth=True
)
kmip_session.authenticate = mock.MagicMock()
kmip_session._engine = mock.MagicMock()
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.warning(
"Failure verifying the client certificate."
)
kmip_session._logger.exception.assert_called_once_with(
exceptions.PermissionDenied(
"The extended key usage extension is not marked for client "
"authentication in the client certificate."
)
)
kmip_session._engine.build_error_response.assert_called_once_with(
contents.ProtocolVersion(1, 0),
enums.ResultReason.AUTHENTICATION_NOT_SUCCESSFUL,
"Error verifying the client certificate. "
"See server logs for more information."
)
self.assertTrue(kmip_session._send_response.called)
@mock.patch(
"kmip.services.server.auth.get_client_identity_from_certificate"
)
def test_authenticate(self, mock_get):
"""
Test that the session correctly uses the authentication plugin
framework to authenticate new connections.
"""
mock_get.return_value = "John Doe"
kmip_session = session.KmipSession(
None,
None,
None,
name='TestSession'
)
kmip_session._logger = mock.MagicMock()
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader()
)
session_identity = kmip_session.authenticate(
"fake_certificate",
fake_request
)
kmip_session._logger.debug.assert_any_call(
"No authentication plugins are enabled. The client identity will "
"be extracted from the client certificate."
)
mock_get.assert_any_call("fake_certificate")
kmip_session._logger.debug.assert_any_call(
"Extraction succeeded for client identity: John Doe"
)
self.assertEqual(("John Doe", None), session_identity)
@mock.patch("kmip.services.server.auth.SLUGSConnector")
def test_authenticate_against_slugs(self, mock_connector):
"""
Test that the session correctly handles authentication with SLUGS.
"""
mock_instance = mock.MagicMock()
mock_instance.authenticate.return_value = ("John Doe", ["Group A"])
mock_connector.return_value = mock_instance
kmip_session = session.KmipSession(
None,
None,
("127.0.0.1", 48026),
name='TestSession',
auth_settings=[(
"auth:slugs",
{"enabled": "True", "url": "test_url"}
)]
)
kmip_session._logger = mock.MagicMock()
fake_credential = objects.Credential(
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
credential_value=objects.UsernamePasswordCredential(
username="John Doe",
password="secret"
)
)
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader(
authentication=contents.Authentication(
credentials=[fake_credential]
)
)
)
result = kmip_session.authenticate(
"fake_certificate",
fake_request
)
mock_connector.assert_any_call("test_url")
kmip_session._logger.debug.assert_any_call(
"Authenticating with plugin: auth:slugs"
)
mock_instance.authenticate.assert_any_call(
"fake_certificate",
(("127.0.0.1", 48026), kmip_session._session_time),
fake_request.request_header.authentication.credentials
)
kmip_session._logger.debug(
"Authentication succeeded for client identity: John Doe"
)
self.assertEqual(2, len(result))
self.assertEqual("John Doe", result[0])
self.assertEqual(["Group A"], result[1])
@mock.patch("kmip.services.server.auth.SLUGSConnector")
def test_authenticate_against_slugs_with_failure(self, mock_connector):
"""
Test that the session correctly handles a SLUGS authentication error.
"""
mock_instance = mock.MagicMock()
test_exception = exceptions.PermissionDenied(
"Unrecognized user ID: John Doe"
)
mock_instance.authenticate.side_effect = test_exception
mock_connector.return_value = mock_instance
kmip_session = session.KmipSession(
None,
None,
("127.0.0.1", 48026),
name='TestSession',
auth_settings=[(
"auth:slugs",
{"enabled": "True", "url": "test_url"}
)]
)
kmip_session._logger = mock.MagicMock()
fake_credential = objects.Credential(
credential_type=enums.CredentialType.USERNAME_AND_PASSWORD,
credential_value=objects.UsernamePasswordCredential(
username="John Doe",
password="secret"
)
)
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader(
authentication=contents.Authentication(
credentials=[fake_credential]
)
)
)
args = ("fake_certificate", fake_request)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Authentication failed.",
kmip_session.authenticate,
*args
)
mock_connector.assert_any_call("test_url")
kmip_session._logger.debug.assert_any_call(
"Authenticating with plugin: auth:slugs"
)
kmip_session._logger.warning.assert_any_call("Authentication failed.")
kmip_session._logger.exception.assert_any_call(test_exception)
def test_authenticate_against_unrecognized_plugin(self):
"""
Test that the session correctly handles an unrecognized plugin
configuration.
"""
kmip_session = session.KmipSession(
None,
None,
None,
name='TestSession',
auth_settings=[("auth:unrecognized", {})]
)
kmip_session._logger = mock.MagicMock()
fake_request = messages.RequestMessage(
request_header=messages.RequestHeader()
)
args = ("fake_certificate", fake_request)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"Authentication failed.",
kmip_session.authenticate,
*args
)
kmip_session._logger.warning.assert_any_call(
"Authentication plugin 'auth:unrecognized' is not supported."
)
def test_receive_request(self):
"""
Test that the session can correctly receive and parse a message
@@ -494,7 +800,7 @@ class TestKmipSession(testtools.TestCase):
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
expected = utils.BytearrayStream((content))
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._receive_bytes = mock.MagicMock(
side_effect=[content, b'']
)
@@ -512,7 +818,7 @@ class TestKmipSession(testtools.TestCase):
"""
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._connection = mock.MagicMock()
kmip_session._connection.recv = mock.MagicMock(
side_effect=[content, content]
@@ -542,7 +848,7 @@ class TestKmipSession(testtools.TestCase):
"""
content = b'\x00\x00\x00\x00\x00\x00\x00\x00'
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._connection = mock.MagicMock()
kmip_session._connection.recv = mock.MagicMock(
side_effect=[content, content, None]
@@ -563,7 +869,7 @@ class TestKmipSession(testtools.TestCase):
))
buffer_empty = utils.BytearrayStream()
kmip_session = session.KmipSession(None, None, 'name')
kmip_session = session.KmipSession(None, None, None, 'name')
kmip_session._connection = mock.MagicMock()
kmip_session._send_response(buffer_empty.buffer)