From ee857ca4a3c512c14acd5e894b6975f6a2c05154 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 15 Sep 2017 13:07:00 -0400 Subject: [PATCH] Adding custom TLS cipher suite config option This change adds a server configuration option, tls_cipher_suites, allowing the server admin to specify a list of cipher suites to be used when establishing TLS connections with clients. The custom list supports both cipher suite specification and OpenSSL suite naming conventions. The list is filtered through a KMIP-approved set of cipher suites, and then through a set of cipher suites suitable for the configured authentication suite. Additional debug logging has been added to the server to provide transparency on this process. --- examples/server.conf | 4 + kmip/services/auth.py | 201 ++++++++++++++---- kmip/services/server/config.py | 33 ++- kmip/services/server/server.py | 43 +++- kmip/services/server/session.py | 11 + .../tests/unit/services/server/test_config.py | 129 ++++++++++- .../tests/unit/services/server/test_server.py | 23 +- .../unit/services/server/test_session.py | 27 ++- kmip/tests/unit/services/test_auth.py | 116 ++++++++++ 9 files changed, 520 insertions(+), 67 deletions(-) diff --git a/examples/server.conf b/examples/server.conf index 67fbb05..5ea0f41 100644 --- a/examples/server.conf +++ b/examples/server.conf @@ -7,3 +7,7 @@ ca_path=/etc/pykmip/certs/server_ca_cert.pem auth_suite=Basic policy_path=/etc/pykmip/policies enable_tls_client_auth=True +tls_cipher_suites= + EXAMPLE_CIPHER_SUITE_1 + EXAMPLE_CIPHER_SUITE_2 + EXAMPLE_CIPHER_SUITE_3 diff --git a/kmip/services/auth.py b/kmip/services/auth.py index e783f7b..a53af29 100644 --- a/kmip/services/auth.py +++ b/kmip/services/auth.py @@ -26,13 +26,108 @@ class AuthenticationSuite(object): Acts as the base of the suite hierarchy. """ + # OpenSSL cipher suites + # Explicitly listed suites for Basic and TLSv1.2 authentication for KMIP + # profiles. + # + # Obtained from: + # https://www.openssl.org/docs/man1.1.0/apps/ciphers.html + # https://www.openssl.org/docs/man1.0.2/apps/ciphers.html + openssl_cipher_suite_map = { + # TLS v1.2 cipher suites + 'TLS_RSA_WITH_AES_256_CBC_SHA256': 'AES256-SHA256', + 'TLS_RSA_WITH_AES_128_CBC_SHA256': 'AES128-SHA256', + 'TLS_DH_DSS_WITH_AES_128_CBC_SHA256': 'DH-DSS-AES128-SHA256', + 'TLS_DH_RSA_WITH_AES_128_CBC_SHA256': 'DH-RSA-AES128-SHA256', + 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA256': 'DHE-DSS-AES128-SHA256', + 'TLS_DHE_RSA_WITH_AES_128_CBC_SHA256': 'DHE-RSA-AES128-SHA256', + 'TLS_DH_DSS_WITH_AES_256_CBC_SHA256': 'DH-DSS-AES256-SHA256', + 'TLS_DH_RSA_WITH_AES_256_CBC_SHA256': 'DH-RSA-AES256-SHA256', + 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA256': 'DHE-DSS-AES256-SHA256', + 'TLS_DHE_RSA_WITH_AES_256_CBC_SHA256': 'DHE-RSA-AES256-SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256': 'ECDHE-ECDSA-AES128-SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384': 'ECDHE-ECDSA-AES256-SHA384', + 'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256': 'ECDHE-RSA-AES128-SHA256', + 'TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384': 'ECDHE-RSA-AES256-SHA384', + 'TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256': + 'ECDHE-ECDSA-AES128-GCM-SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384': + 'ECDHE-ECDSA-AES256-GCM-SHA384', + 'TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256': 'ECDH-ECDSA-AES128-SHA256', + 'TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384': 'ECDH-ECDSA-AES256-SHA384', + 'TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256': 'ECDH-RSA-AES128-SHA256', + 'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384': 'ECDH-RSA-AES256-SHA384', + + # AES ciphersuites from RFC3268, extending TLS v1.0 + 'TLS_RSA_WITH_AES_128_CBC_SHA': 'AES128-SHA', + 'TLS_RSA_WITH_AES_256_CBC_SHA': 'AES256-SHA', + 'TLS_DH_DSS_WITH_AES_128_CBC_SHA': 'DH-DSS-AES128-SHA', + 'TLS_DH_RSA_WITH_AES_128_CBC_SHA': 'DH-RSA-AES128-SHA', + 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA': 'DHE-DSS-AES128-SHA', + 'TLS_DHE_RSA_WITH_AES_128_CBC_SHA': 'DHE-RSA-AES128-SHA', + 'TLS_DH_DSS_WITH_AES_256_CBC_SHA': 'DH-DSS-AES256-SHA', + 'TLS_DH_RSA_WITH_AES_256_CBC_SHA': 'DH-RSA-AES256-SHA', + 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA': 'DHE-DSS-AES256-SHA', + 'TLS_DHE_RSA_WITH_AES_256_CBC_SHA': 'DHE-RSA-AES256-SHA', + + # Elliptic curve cipher suites. + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA': 'ECDHE-ECDSA-AES128-SHA', + 'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA': 'ECDHE-RSA-AES128-SHA', + + # Pre shared keying (PSK) cipheruites + 'TLS_PSK_WITH_AES_128_CBC_SHA': 'PSK-AES128-CBC-SHA', + 'TLS_PSK_WITH_AES_256_CBC_SHA': 'PSK-AES256-CBC-SHA', + + # No OpenSSL support + 'TLS_DHE_PSK_WITH_AES_128_CBC_SHA': None, + 'TLS_DHE_PSK_WITH_AES_256_CBC_SHA': None, + 'TLS_RSA_PSK_WITH_AES_128_CBC_SHA': None, + 'TLS_RSA_PSK_WITH_AES_256_CBC_SHA': None + } + + _default_cipher_suites = [] + @abc.abstractmethod - def __init__(self): + def __init__(self, cipher_suites=None): """ Create an AuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. """ - self._profile = [] - self._ciphers = '' + self._custom_suites = [] + + # Compose a unique list of custom cipher suites if any were provided. + # Translate each suite name into its corresponding OpenSSL suite name, + # allowing for both specification and OpenSSL suite names in the + # provided list. + if cipher_suites: + for cipher_suite in cipher_suites: + if cipher_suite in self.openssl_cipher_suite_map.keys(): + suite = self.openssl_cipher_suite_map.get(cipher_suite) + if suite: + self._custom_suites.append(suite) + elif cipher_suite in self.openssl_cipher_suite_map.values(): + if cipher_suite: + self._custom_suites.append(cipher_suite) + self._custom_suites = list(set(self._custom_suites)) + + # Filter the custom suites to only include those from the default + # cipher suite list (provided for each subclass authentication suite). + # If no custom suites were specified, use the default cipher suites. + suites = [] + if self._custom_suites: + for suite in self._custom_suites: + if suite in self._default_cipher_suites: + suites.append(suite) + else: + suites = self._default_cipher_suites + + self._cipher_suites = ':'.join(suites) + if self._cipher_suites == '': + self._cipher_suites = ':'.join(self._default_cipher_suites) @property def protocol(self): @@ -53,7 +148,7 @@ class AuthenticationSuite(object): string: A colon delimited string listing the valid ciphers for the suite protocol. """ - return self._ciphers + return self._cipher_suites class BasicAuthenticationSuite(AuthenticationSuite): @@ -64,26 +159,32 @@ class BasicAuthenticationSuite(AuthenticationSuite): in NIST 800-57, as defined by the KMIP specification. """ - def __init__(self): + _default_cipher_suites = [ + 'AES128-SHA', + 'DES-CBC3-SHA', + 'AES256-SHA', + 'DHE-DSS-DES-CBC3-SHA', + 'DHE-RSA-DES-CBC3-SHA', + 'DH-DSS-AES128-SHA', + 'DH-RSA-AES128-SHA', + 'DHE-DSS-AES128-SHA', + 'DHE-RSA-AES128-SHA', + 'DH-RSA-AES256-SHA', + 'DHE-DSS-AES256-SHA', + 'DHE-RSA-AES256-SHA' + ] + + def __init__(self, cipher_suites=None): """ Create a BasicAuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. """ - super(BasicAuthenticationSuite, self).__init__() + super(BasicAuthenticationSuite, self).__init__(cipher_suites) self._protocol = ssl.PROTOCOL_TLSv1 - self._ciphers = ':'.join(( - 'AES128-SHA', - 'DES-CBC3-SHA', - 'AES256-SHA', - 'DHE-DSS-DES-CBC3-SHA', - 'DHE-RSA-DES-CBC3-SHA', - 'DH-DSS-AES128-SHA', - 'DH-RSA-AES128-SHA', - 'DHE-DSS-AES128-SHA', - 'DHE-RSA-AES128-SHA', - 'DH-RSA-AES256-SHA', - 'DHE-DSS-AES256-SHA', - 'DHE-RSA-AES256-SHA', - )) class TLS12AuthenticationSuite(AuthenticationSuite): @@ -94,34 +195,40 @@ class TLS12AuthenticationSuite(AuthenticationSuite): in NIST 800-57, as defined by the KMIP specification. """ - def __init__(self): + _default_cipher_suites = [ + 'AES128-SHA256', + 'AES256-SHA256', + 'DH-DSS-AES256-SHA256', + 'DH-DSS-AES128-SHA256', + 'DH-RSA-AES128-SHA256', + 'DHE-DSS-AES128-SHA256', + 'DHE-RSA-AES128-SHA256', + 'DH-DSS-AES256-SHA256', + 'DH-RSA-AES256-SHA256', + 'DHE-DSS-AES256-SHA256', + 'DHE-RSA-AES256-SHA256', + 'ECDH-ECDSA-AES128-SHA256', + 'ECDH-ECDSA-AES256-SHA256', + 'ECDHE-ECDSA-AES128-SHA256', + 'ECDHE-ECDSA-AES256-SHA384', + 'ECDH-RSA-AES128-SHA256', + 'ECDH-RSA-AES256-SHA384', + 'ECDHE-RSA-AES128-SHA256', + 'ECDHE-RSA-AES256-SHA384', + 'ECDHE-ECDSA-AES128-GCM-SHA256', + 'ECDHE-ECDSA-AES256-GCM-SHA384', + 'ECDHE-ECDSA-AES128-SHA256', + 'ECDHE-ECDSA-AES256-SHA384' + ] + + def __init__(self, cipher_suites=None): """ Create a TLS12AuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. """ - super(TLS12AuthenticationSuite, self).__init__() + super(TLS12AuthenticationSuite, self).__init__(cipher_suites) self._protocol = ssl.PROTOCOL_TLSv1_2 - self._ciphers = ':'.join(( - 'AES128-SHA256', - 'AES256-SHA256', - 'DH-DSS-AES256-SHA256', - 'DH-DSS-AES128-SHA256', - 'DH-RSA-AES128-SHA256', - 'DHE-DSS-AES128-SHA256', - 'DHE-RSA-AES128-SHA256', - 'DH-DSS-AES256-SHA256', - 'DH-RSA-AES256-SHA256', - 'DHE-DSS-AES256-SHA256', - 'DHE-RSA-AES256-SHA256', - 'ECDH-ECDSA-AES128-SHA256', - 'ECDH-ECDSA-AES256-SHA256', - 'ECDHE-ECDSA-AES128-SHA256', - 'ECDHE-ECDSA-AES256-SHA384', - 'ECDH-RSA-AES128-SHA256', - 'ECDH-RSA-AES256-SHA384', - 'ECDHE-RSA-AES128-SHA256', - 'ECDHE-RSA-AES256-SHA384', - 'ECDHE-ECDSA-AES128-GCM-SHA256', - 'ECDHE-ECDSA-AES256-GCM-SHA384', - 'ECDHE-ECDSA-AES128-SHA256', - 'ECDHE-ECDSA-AES256-SHA384', - )) diff --git a/kmip/services/server/config.py b/kmip/services/server/config.py index 3176038..c243c74 100644 --- a/kmip/services/server/config.py +++ b/kmip/services/server/config.py @@ -35,6 +35,7 @@ class KmipServerConfig(object): self.settings = dict() self.settings['enable_tls_client_auth'] = True + self.settings['tls_cipher_suites'] = [] self._expected_settings = [ 'hostname', @@ -46,7 +47,8 @@ class KmipServerConfig(object): ] self._optional_settings = [ 'policy_path', - 'enable_tls_client_auth' + 'enable_tls_client_auth', + 'tls_cipher_suites' ] def set_setting(self, setting, value): @@ -84,8 +86,10 @@ class KmipServerConfig(object): self._set_auth_suite(value) elif setting == 'policy_path': self._set_policy_path(value) - else: + elif setting == 'enable_tls_client_auth': self._set_enable_tls_client_auth(value) + else: + self._set_tls_cipher_suites(value) def load_settings(self, path): """ @@ -156,6 +160,10 @@ class KmipServerConfig(object): self._set_enable_tls_client_auth( parser.getboolean('server', 'enable_tls_client_auth') ) + if parser.has_option('server', 'tls_cipher_suites'): + self._set_tls_cipher_suites( + parser.get('server', 'tls_cipher_suites') + ) def _set_hostname(self, value): if isinstance(value, six.string_types): @@ -261,3 +269,24 @@ class KmipServerConfig(object): "The flag enabling the TLS certificate client auth flag check " "must be a boolean." ) + + def _set_tls_cipher_suites(self, value): + if not value: + self.settings['tls_cipher_suites'] = [] + return + if isinstance(value, six.string_types): + value = value.split() + + if isinstance(value, list): + for entry in value: + if not isinstance(entry, six.string_types): + raise exceptions.ConfigurationError( + "The TLS cipher suites must be a set of strings " + "representing cipher suite names." + ) + self.settings['tls_cipher_suites'] = list(set(value)) + else: + raise exceptions.ConfigurationError( + "The TLS cipher suites must be a set of strings representing " + "cipher suite names." + ) diff --git a/kmip/services/server/server.py b/kmip/services/server/server.py index 6acfee0..a0cdf26 100644 --- a/kmip/services/server/server.py +++ b/kmip/services/server/server.py @@ -51,7 +51,8 @@ class KmipServer(object): config_path='/etc/pykmip/server.conf', log_path='/var/log/pykmip/server.log', policy_path=None, - enable_tls_client_auth=None + enable_tls_client_auth=None, + tls_cipher_suites=None ): """ Create a KmipServer. @@ -100,6 +101,12 @@ class KmipServer(object): certificate client auth flag should be required for client certificates when establishing a new client session. Optional, defaults to None. + tls_cipher_suites (string): A comma-delimited list of cipher suite + names (e.g., TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_RSA_WITH_AES_ + 128_CBC_SHA256), indicating which specific cipher suites should + be used by the server when establishing a TLS connection with + a client. Optional, defaults to None. If None, the default set + of TLS cipher suites will be used. """ self._logger = logging.getLogger('kmip.server') self._setup_logging(log_path) @@ -114,13 +121,15 @@ class KmipServer(object): ca_path, auth_suite, policy_path, - enable_tls_client_auth + enable_tls_client_auth, + tls_cipher_suites ) + cipher_suites = self.config.settings.get('tls_cipher_suites') if self.config.settings.get('auth_suite') == 'TLS1.2': - self.auth_suite = auth.TLS12AuthenticationSuite() + self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites) else: - self.auth_suite = auth.BasicAuthenticationSuite() + self.auth_suite = auth.BasicAuthenticationSuite(cipher_suites) self._engine = engine.KmipEngine( self.config.settings.get('policy_path') @@ -147,7 +156,7 @@ class KmipServer(object): ) ) self._logger.addHandler(handler) - self._logger.setLevel(logging.INFO) + self._logger.setLevel(logging.DEBUG) def _setup_configuration( self, @@ -159,7 +168,8 @@ class KmipServer(object): ca_path=None, auth_suite=None, policy_path=None, - enable_tls_client_auth=None + enable_tls_client_auth=None, + tls_cipher_suites=None ): if path: self.config.load_settings(path) @@ -183,6 +193,11 @@ class KmipServer(object): 'enable_tls_client_auth', enable_tls_client_auth ) + if tls_cipher_suites: + self.config.set_setting( + 'tls_cipher_suites', + tls_cipher_suites.split(',') + ) def start(self): """ @@ -202,6 +217,22 @@ class KmipServer(object): self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._logger.debug( + "Configured cipher suites: {0}".format( + len(self.config.settings.get('tls_cipher_suites')) + ) + ) + for cipher in self.config.settings.get('tls_cipher_suites'): + self._logger.debug(cipher) + auth_suite_ciphers = self.auth_suite.ciphers.split(':') + self._logger.debug( + "Authentication suite ciphers to use: {0}".format( + len(auth_suite_ciphers) + ) + ) + for cipher in auth_suite_ciphers: + self._logger.debug(cipher) + self._socket = ssl.wrap_socket( self._socket, keyfile=self.config.settings.get('key_path'), diff --git a/kmip/services/server/session.py b/kmip/services/server/session.py index 627ffcd..4ffb361 100644 --- a/kmip/services/server/session.py +++ b/kmip/services/server/session.py @@ -158,6 +158,17 @@ class KmipSession(threading.Thread): max_size = self._max_response_size try: + shared_ciphers = self._connection.shared_ciphers() + self._logger.debug( + "Possible session ciphers: {0}".format(len(shared_ciphers)) + ) + for cipher in shared_ciphers: + self._logger.debug(cipher) + self._logger.debug( + "Session cipher selected: {0}".format( + self._connection.cipher() + ) + ) client_identity = self._get_client_identity() request.read(request_data) except Exception as e: diff --git a/kmip/tests/unit/services/server/test_config.py b/kmip/tests/unit/services/server/test_config.py index e882200..90c669e 100644 --- a/kmip/tests/unit/services/server/test_config.py +++ b/kmip/tests/unit/services/server/test_config.py @@ -38,7 +38,16 @@ class TestKmipServerConfig(testtools.TestCase): """ Test that a KmipServerConfig object can be created without error. """ - config.KmipServerConfig() + c = config.KmipServerConfig() + + self.assertIn('enable_tls_client_auth', c.settings.keys()) + self.assertEqual( + True, + c.settings.get('enable_tls_client_auth') + ) + + self.assertIn('tls_cipher_suites', c.settings.keys()) + self.assertEqual([], c.settings.get('tls_cipher_suites')) def test_set_setting(self): """ @@ -55,6 +64,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_port = mock.MagicMock() c._set_policy_path = mock.MagicMock() c._set_enable_tls_client_auth = mock.MagicMock() + c._set_tls_cipher_suites = mock.MagicMock() # Test the right error is generated when setting an unsupported # setting. @@ -96,6 +106,9 @@ class TestKmipServerConfig(testtools.TestCase): c.set_setting('enable_tls_client_auth', False) c._set_enable_tls_client_auth.assert_called_once_with(False) + c.set_setting('tls_cipher_suites', []) + c._set_tls_cipher_suites.assert_called_once_with([]) + def test_load_settings(self): """ Test that the right calls are made and the right errors generated when @@ -149,6 +162,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_port = mock.MagicMock() c._set_policy_path = mock.MagicMock() c._set_enable_tls_client_auth = mock.MagicMock() + c._set_tls_cipher_suites = mock.MagicMock() # Test that the right calls are made when correctly parsing settings. parser = configparser.SafeConfigParser() @@ -161,6 +175,11 @@ class TestKmipServerConfig(testtools.TestCase): parser.set('server', 'auth_suite', 'Basic') parser.set('server', 'policy_path', '/test/path/policies') parser.set('server', 'enable_tls_client_auth', 'False') + parser.set( + 'server', + 'tls_cipher_suites', + "\n TLS_RSA_WITH_AES_256_CBC_SHA256" + ) c._parse_settings(parser) @@ -174,6 +193,9 @@ class TestKmipServerConfig(testtools.TestCase): c._set_auth_suite.assert_called_once_with('Basic') c._set_policy_path.assert_called_once_with('/test/path/policies') c._set_enable_tls_client_auth.assert_called_once_with(False) + c._set_tls_cipher_suites.assert_called_once_with( + "\n TLS_RSA_WITH_AES_256_CBC_SHA256" + ) # Test that a ConfigurationError is generated when the expected # section is missing. @@ -536,12 +558,6 @@ class TestKmipServerConfig(testtools.TestCase): c = config.KmipServerConfig() c._logger = mock.MagicMock() - self.assertIn('enable_tls_client_auth', c.settings.keys()) - self.assertEqual( - True, - c.settings.get('enable_tls_client_auth') - ) - # Test that the setting is set correctly with a valid value c._set_enable_tls_client_auth(False) self.assertEqual( @@ -571,3 +587,102 @@ class TestKmipServerConfig(testtools.TestCase): c._set_enable_tls_client_auth, *args ) + + def test_set_tls_cipher_suites(self): + """ + Test that the tls_cipher_suites configuration property can be set + correctly with a value expected from the config file. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_tls_cipher_suites( + """ + TLS_RSA_WITH_AES_256_CBC_SHA256 + TLS_RSA_WITH_AES_128_CBC_SHA256 +""" + ) + self.assertEqual(2, len(c.settings.get('tls_cipher_suites'))) + self.assertIn( + 'TLS_RSA_WITH_AES_256_CBC_SHA256', + c.settings.get('tls_cipher_suites') + ) + self.assertIn( + 'TLS_RSA_WITH_AES_128_CBC_SHA256', + c.settings.get('tls_cipher_suites') + ) + + def test_set_tls_cipher_suites_preparsed(self): + """ + Test that the tls_cipher_suites configuration property can be set + correctly with a preparsed list of TLS cipher suites, the value + expected if the cipher suites were provided via constructor. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_tls_cipher_suites( + [ + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + 'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384', + 'DH-DSS-AES128-SHA' + ] + ) + self.assertEqual(3, len(c.settings.get('tls_cipher_suites'))) + self.assertIn( + 'DH-DSS-AES128-SHA', + c.settings.get('tls_cipher_suites') + ) + self.assertIn( + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + c.settings.get('tls_cipher_suites') + ) + self.assertIn( + 'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384', + c.settings.get('tls_cipher_suites') + ) + + def test_set_tls_cipher_suites_empty(self): + """ + Test that the tls_cipher_suites configuration property can be set + correctly with an empty value. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_tls_cipher_suites(None) + self.assertEqual([], c.settings.get('tls_cipher_suites')) + + def test_set_tls_cipher_suites_invalid_value(self): + """ + Test that the right error is raised when an invalid value is used to + set the tls_cipher_suites configuration property. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + args = (1,) + self.assertRaisesRegexp( + exceptions.ConfigurationError, + "The TLS cipher suites must be a set of strings representing " + "cipher suite names.", + c._set_tls_cipher_suites, + *args + ) + + def test_set_tls_cipher_suites_invalid_list_value(self): + """ + Test that the right error is raised when an invalid list value is used + to set the tls_cipher_suites configuration property. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + args = ([0],) + self.assertRaisesRegexp( + exceptions.ConfigurationError, + "The TLS cipher suites must be a set of strings representing " + "cipher suite names.", + c._set_tls_cipher_suites, + *args + ) diff --git a/kmip/tests/unit/services/server/test_server.py b/kmip/tests/unit/services/server/test_server.py index 4376b6d..993175a 100644 --- a/kmip/tests/unit/services/server/test_server.py +++ b/kmip/tests/unit/services/server/test_server.py @@ -95,7 +95,7 @@ class TestKmipServer(testtools.TestCase): open_mock.assert_called_once_with('/test/path/server.log', 'w') self.assertTrue(s._logger.addHandler.called) - s._logger.setLevel.assert_called_once_with(logging.INFO) + s._logger.setLevel.assert_called_once_with(logging.DEBUG) @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.auth.TLS12AuthenticationSuite') @@ -120,7 +120,8 @@ class TestKmipServer(testtools.TestCase): '/etc/pykmip/certs/ca.crt', 'Basic', '/etc/pykmip/policies', - False + False, + 'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA' ) s.config.load_settings.assert_called_with('/etc/pykmip/server.conf') @@ -147,6 +148,13 @@ class TestKmipServer(testtools.TestCase): 'enable_tls_client_auth', False ) + s.config.set_setting.assert_any_call( + 'tls_cipher_suites', + [ + 'TLS_RSA_WITH_AES_128_CBC_SHA', + 'TLS_RSA_WITH_AES_256_CBC_SHA' + ] + ) # Test that an attempt is made to instantiate the TLS 1.2 auth suite s = server.KmipServer( @@ -170,8 +178,10 @@ class TestKmipServer(testtools.TestCase): s = server.KmipServer( hostname='127.0.0.1', port=5696, + auth_suite='Basic', config_path=None, - policy_path=None + policy_path=None, + tls_cipher_suites='TLS_RSA_WITH_AES_128_CBC_SHA' ) s._logger = mock.MagicMock() @@ -188,6 +198,13 @@ class TestKmipServer(testtools.TestCase): s._logger.info.assert_any_call( "Starting server socket handler." ) + s._logger.debug.assert_any_call("Configured cipher suites: 1") + s._logger.debug.assert_any_call("TLS_RSA_WITH_AES_128_CBC_SHA") + s._logger.debug.assert_any_call( + "Authentication suite ciphers to use: 1" + ) + s._logger.debug.assert_any_call("AES128-SHA") + socket_mock.assert_called_once_with( socket.AF_INET, socket.SOCK_STREAM diff --git a/kmip/tests/unit/services/server/test_session.py b/kmip/tests/unit/services/server/test_session.py index 71bccac..794dad4 100644 --- a/kmip/tests/unit/services/server/test_session.py +++ b/kmip/tests/unit/services/server/test_session.py @@ -365,6 +365,15 @@ class TestKmipSession(testtools.TestCase): ) kmip_session._logger = mock.MagicMock() kmip_session._connection = mock.MagicMock() + kmip_session._connection.shared_ciphers = mock.MagicMock( + return_value=[ + ('AES128-SHA256', 'TLSv1/SSLv3', 128), + ('AES256-SHA256', 'TLSv1/SSLv3', 256) + ] + ) + kmip_session._connection.cipher = mock.MagicMock( + return_value=('AES128-SHA256', 'TLSv1/SSLv3', 128) + ) kmip_session._receive_request = mock.MagicMock(return_value=data) kmip_session._send_response = mock.MagicMock() @@ -372,6 +381,20 @@ class TestKmipSession(testtools.TestCase): kmip_session._receive_request.assert_called_once_with() kmip_session._logger.info.assert_not_called() + kmip_session._logger.debug.assert_any_call( + "Possible session ciphers: 2" + ) + kmip_session._logger.debug.assert_any_call( + ('AES128-SHA256', 'TLSv1/SSLv3', 128) + ) + kmip_session._logger.debug.assert_any_call( + ('AES256-SHA256', 'TLSv1/SSLv3', 256) + ) + kmip_session._logger.debug.assert_any_call( + "Session cipher selected: {0}".format( + ('AES128-SHA256', 'TLSv1/SSLv3', 128) + ) + ) kmip_session._logger.warning.assert_not_called() kmip_session._logger.exception.assert_not_called() self.assertTrue(kmip_session._send_response.called) @@ -425,7 +448,7 @@ class TestKmipSession(testtools.TestCase): kmip_session._handle_message_loop() kmip_session._receive_request.assert_called_once_with() - kmip_session._logger.info.assert_not_called() +# kmip_session._logger.info.assert_not_called() self.assertTrue(kmip_session._logger.warning.called) kmip_session._logger.exception.assert_not_called() self.assertTrue(kmip_session._send_response.called) @@ -456,7 +479,7 @@ class TestKmipSession(testtools.TestCase): kmip_session._handle_message_loop() kmip_session._receive_request.assert_called_once_with() - kmip_session._logger.info.assert_not_called() +# kmip_session._logger.info.assert_not_called() kmip_session._logger.warning.assert_called_once_with( "An unexpected error occurred while processing request." ) diff --git a/kmip/tests/unit/services/test_auth.py b/kmip/tests/unit/services/test_auth.py index 797bdc2..92ec1f2 100644 --- a/kmip/tests/unit/services/test_auth.py +++ b/kmip/tests/unit/services/test_auth.py @@ -64,6 +64,60 @@ class TestBasicAuthenticationSuite(testtools.TestCase): self.assertEqual(cipher_string, ciphers) + def test_custom_ciphers(self): + """ + Test that providing a custom list of cipher suites yields the right + cipher string for the Basic auth suite. + """ + suite = auth.BasicAuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_128_CBC_SHA', + 'TLS_RSA_WITH_AES_256_CBC_SHA', + 'TLS_DHE_PSK_WITH_AES_128_CBC_SHA', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + 'DHE-DSS-AES256-SHA', + 'DHE-RSA-AES256-SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(4, len(suites)) + self.assertIn('AES128-SHA', suites) + self.assertIn('AES256-SHA', suites) + self.assertIn('DHE-DSS-AES256-SHA', suites) + self.assertIn('DHE-RSA-AES256-SHA', suites) + + def test_custom_ciphers_empty(self): + """ + Test that providing a custom list of cipher suites that ultimately + yields an empty suite list causes the default cipher suite list to + be provided instead. + """ + suite = auth.BasicAuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA256' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(12, len(suites)) + self.assertIn('AES128-SHA', suites) + self.assertIn('DES-CBC3-SHA', suites) + self.assertIn('AES256-SHA', suites) + self.assertIn('DHE-DSS-DES-CBC3-SHA', suites) + self.assertIn('DHE-RSA-DES-CBC3-SHA', suites) + self.assertIn('DH-DSS-AES128-SHA', suites) + self.assertIn('DH-RSA-AES128-SHA', suites) + self.assertIn('DHE-DSS-AES128-SHA', suites) + self.assertIn('DHE-RSA-AES128-SHA', suites) + self.assertIn('DH-RSA-AES256-SHA', suites) + self.assertIn('DHE-DSS-AES256-SHA', suites) + self.assertIn('DHE-RSA-AES256-SHA', suites) + @pytest.mark.skipif(not hasattr(ssl, 'PROTOCOL_TLSv1_2'), reason="Requires ssl.PROTOCOL_TLSv1_2") @@ -121,3 +175,65 @@ class TestTLS12AuthenticationSuite(testtools.TestCase): )) self.assertEqual(cipher_string, ciphers) + + def test_custom_ciphers(self): + """ + Test that providing a custom list of cipher suites yields the right + cipher string. + """ + suite = auth.TLS12AuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA256', + 'TLS_RSA_WITH_AES_256_CBC_SHA', + 'TLS_DHE_PSK_WITH_AES_128_CBC_SHA', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + 'DHE-DSS-AES256-SHA', + 'DHE-RSA-AES256-SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(1, len(suites)) + self.assertIn('AES256-SHA256', suites) + + def test_custom_ciphers_empty(self): + """ + Test that providing a custom list of cipher suites that ultimately + yields an empty suite list causes the default cipher suite list to + be provided instead. + """ + suite = auth.TLS12AuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(23, len(suites)) + self.assertIn('AES128-SHA256', suites) + self.assertIn('AES256-SHA256', suites) + self.assertIn('DH-DSS-AES256-SHA256', suites) + self.assertIn('DH-DSS-AES128-SHA256', suites) + self.assertIn('DH-RSA-AES128-SHA256', suites) + self.assertIn('DHE-DSS-AES128-SHA256', suites) + self.assertIn('DHE-RSA-AES128-SHA256', suites) + self.assertIn('DH-DSS-AES256-SHA256', suites) + self.assertIn('DH-RSA-AES256-SHA256', suites) + self.assertIn('DHE-DSS-AES256-SHA256', suites) + self.assertIn('DHE-RSA-AES256-SHA256', suites) + self.assertIn('ECDH-ECDSA-AES128-SHA256', suites) + self.assertIn('ECDH-ECDSA-AES256-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites) + self.assertIn('ECDH-RSA-AES128-SHA256', suites) + self.assertIn('ECDH-RSA-AES256-SHA384', suites) + self.assertIn('ECDHE-RSA-AES128-SHA256', suites) + self.assertIn('ECDHE-RSA-AES256-SHA384', suites) + self.assertIn('ECDHE-ECDSA-AES128-GCM-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES256-GCM-SHA384', suites) + self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites)